test_filters.py 49 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635
  1. # test_filters.py -- Tests for filters
  2. # Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for filters."""
  22. import os
  23. import shutil
  24. import sys
  25. import tempfile
  26. import threading
  27. from collections.abc import Iterator
  28. from contextlib import contextmanager
  29. from dulwich import porcelain
  30. from dulwich.filters import (
  31. FilterContext,
  32. FilterError,
  33. FilterRegistry,
  34. ProcessFilterDriver,
  35. )
  36. from dulwich.repo import Repo
  37. from . import TestCase
  38. class GitAttributesFilterIntegrationTests(TestCase):
  39. """Test gitattributes integration with filter drivers."""
  40. def setUp(self) -> None:
  41. super().setUp()
  42. self.test_dir = tempfile.mkdtemp()
  43. self.addCleanup(self._cleanup_test_dir)
  44. self.repo = Repo.init(self.test_dir)
  45. def _cleanup_test_dir(self) -> None:
  46. """Clean up test directory."""
  47. import shutil
  48. shutil.rmtree(self.test_dir)
  49. def test_gitattributes_text_filter(self) -> None:
  50. """Test that text attribute triggers line ending conversion."""
  51. # Configure autocrlf first
  52. config = self.repo.get_config()
  53. config.set((b"core",), b"autocrlf", b"true")
  54. config.write_to_path()
  55. # Create .gitattributes with text attribute
  56. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  57. with open(gitattributes_path, "wb") as f:
  58. f.write(b"*.txt text\n")
  59. f.write(b"*.bin -text\n")
  60. # Add .gitattributes
  61. porcelain.add(self.repo, paths=[".gitattributes"])
  62. porcelain.commit(self.repo, message=b"Add gitattributes")
  63. # Create text file with CRLF
  64. text_file = os.path.join(self.test_dir, "test.txt")
  65. with open(text_file, "wb") as f:
  66. f.write(b"line1\r\nline2\r\n")
  67. # Create binary file with CRLF
  68. bin_file = os.path.join(self.test_dir, "test.bin")
  69. with open(bin_file, "wb") as f:
  70. f.write(b"binary\r\ndata\r\n")
  71. # Add files
  72. porcelain.add(self.repo, paths=["test.txt", "test.bin"])
  73. # Check that text file was normalized
  74. index = self.repo.open_index()
  75. text_entry = index[b"test.txt"]
  76. text_blob = self.repo.object_store[text_entry.sha]
  77. self.assertEqual(text_blob.data, b"line1\nline2\n")
  78. # Check that binary file was not normalized
  79. bin_entry = index[b"test.bin"]
  80. bin_blob = self.repo.object_store[bin_entry.sha]
  81. self.assertEqual(bin_blob.data, b"binary\r\ndata\r\n")
  82. def test_gitattributes_custom_filter(self) -> None:
  83. """Test custom filter specified in gitattributes."""
  84. # Create a Python script that acts as our filter
  85. import sys
  86. filter_script = os.path.join(self.test_dir, "redact_filter.py")
  87. with open(filter_script, "w") as f:
  88. f.write(
  89. """#!/usr/bin/env python3
  90. import sys
  91. data = sys.stdin.buffer.read()
  92. # Replace all digits with X
  93. result = bytearray()
  94. for b in data:
  95. if chr(b).isdigit():
  96. result.append(ord('X'))
  97. else:
  98. result.append(b)
  99. sys.stdout.buffer.write(result)
  100. """
  101. )
  102. os.chmod(filter_script, 0o755)
  103. # Create .gitattributes with custom filter
  104. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  105. with open(gitattributes_path, "wb") as f:
  106. f.write(b"*.secret filter=redact\n")
  107. # Configure custom filter (use Python script for testing)
  108. config = self.repo.get_config()
  109. # This filter replaces all digits with X
  110. config.set(
  111. (b"filter", b"redact"),
  112. b"clean",
  113. f"{sys.executable} {filter_script}".encode(),
  114. )
  115. config.write_to_path()
  116. # Add .gitattributes
  117. porcelain.add(self.repo, paths=[".gitattributes"])
  118. # Create file with sensitive content
  119. secret_file = os.path.join(self.test_dir, "password.secret")
  120. with open(secret_file, "wb") as f:
  121. f.write(b"password123\ntoken456\n")
  122. # Add file
  123. porcelain.add(self.repo, paths=["password.secret"])
  124. # Check that content was filtered
  125. index = self.repo.open_index()
  126. entry = index[b"password.secret"]
  127. blob = self.repo.object_store[entry.sha]
  128. self.assertEqual(blob.data, b"passwordXXX\ntokenXXX\n")
  129. def test_gitattributes_from_tree(self) -> None:
  130. """Test that gitattributes from tree are used when no working tree exists."""
  131. # Create .gitattributes with text attribute
  132. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  133. with open(gitattributes_path, "wb") as f:
  134. f.write(b"*.txt text\n")
  135. # Add and commit .gitattributes
  136. porcelain.add(self.repo, paths=[".gitattributes"])
  137. porcelain.commit(self.repo, message=b"Add gitattributes")
  138. # Remove .gitattributes from working tree
  139. os.remove(gitattributes_path)
  140. # Get gitattributes - should still work from tree
  141. gitattributes = self.repo.get_gitattributes()
  142. attrs = gitattributes.match_path(b"test.txt")
  143. self.assertEqual(attrs.get(b"text"), True)
  144. def test_gitattributes_info_attributes(self) -> None:
  145. """Test that .git/info/attributes is read."""
  146. # Create info/attributes
  147. info_dir = os.path.join(self.repo.controldir(), "info")
  148. if not os.path.exists(info_dir):
  149. os.makedirs(info_dir)
  150. info_attrs_path = os.path.join(info_dir, "attributes")
  151. with open(info_attrs_path, "wb") as f:
  152. f.write(b"*.log text\n")
  153. # Get gitattributes
  154. gitattributes = self.repo.get_gitattributes()
  155. attrs = gitattributes.match_path(b"debug.log")
  156. self.assertEqual(attrs.get(b"text"), True)
  157. def test_filter_precedence(self) -> None:
  158. """Test that filter attribute takes precedence over text attribute."""
  159. # Create a Python script that converts to uppercase
  160. import sys
  161. filter_script = os.path.join(self.test_dir, "uppercase_filter.py")
  162. with open(filter_script, "w") as f:
  163. f.write(
  164. """#!/usr/bin/env python3
  165. import sys
  166. data = sys.stdin.buffer.read()
  167. # Convert bytes to string, uppercase, then back to bytes
  168. result = data.decode('utf-8', errors='replace').upper().encode('utf-8')
  169. sys.stdout.buffer.write(result)
  170. """
  171. )
  172. os.chmod(filter_script, 0o755)
  173. # Create .gitattributes with both text and filter
  174. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  175. with open(gitattributes_path, "wb") as f:
  176. f.write(b"*.txt text filter=custom\n")
  177. # Configure autocrlf and custom filter
  178. config = self.repo.get_config()
  179. config.set((b"core",), b"autocrlf", b"true")
  180. # This filter converts to uppercase
  181. config.set(
  182. (b"filter", b"custom"),
  183. b"clean",
  184. f"{sys.executable} {filter_script}".encode(),
  185. )
  186. config.write_to_path()
  187. # Add .gitattributes
  188. porcelain.add(self.repo, paths=[".gitattributes"])
  189. # Create text file with lowercase and CRLF
  190. text_file = os.path.join(self.test_dir, "test.txt")
  191. with open(text_file, "wb") as f:
  192. f.write(b"hello\r\nworld\r\n")
  193. # Add file
  194. porcelain.add(self.repo, paths=["test.txt"])
  195. # Check that custom filter was applied (not just line ending conversion)
  196. index = self.repo.open_index()
  197. entry = index[b"test.txt"]
  198. blob = self.repo.object_store[entry.sha]
  199. # Should be uppercase with LF endings
  200. self.assertEqual(blob.data, b"HELLO\nWORLD\n")
  201. def test_blob_normalizer_integration(self) -> None:
  202. """Test that get_blob_normalizer returns a FilterBlobNormalizer."""
  203. normalizer = self.repo.get_blob_normalizer()
  204. # Check it's the right type
  205. from dulwich.filters import FilterBlobNormalizer
  206. self.assertIsInstance(normalizer, FilterBlobNormalizer)
  207. # Check it has access to gitattributes
  208. self.assertIsNotNone(normalizer.gitattributes)
  209. self.assertIsNotNone(normalizer.filter_registry)
  210. def test_required_filter_missing(self) -> None:
  211. """Test that missing required filter raises an error."""
  212. # Create .gitattributes with required filter
  213. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  214. with open(gitattributes_path, "wb") as f:
  215. f.write(b"*.secret filter=required_filter\n")
  216. # Configure filter as required but without commands
  217. config = self.repo.get_config()
  218. config.set((b"filter", b"required_filter"), b"required", b"true")
  219. config.write_to_path()
  220. # Add .gitattributes
  221. porcelain.add(self.repo, paths=[".gitattributes"])
  222. # Create file that would use the filter
  223. secret_file = os.path.join(self.test_dir, "test.secret")
  224. with open(secret_file, "wb") as f:
  225. f.write(b"test content\n")
  226. # Adding file should raise error due to missing required filter
  227. with self.assertRaises(FilterError) as cm:
  228. porcelain.add(self.repo, paths=["test.secret"])
  229. self.assertIn(
  230. "Required filter 'required_filter' is not available", str(cm.exception)
  231. )
  232. def test_required_filter_clean_command_fails(self) -> None:
  233. """Test that required filter failure during clean raises an error."""
  234. # Create .gitattributes with required filter
  235. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  236. with open(gitattributes_path, "wb") as f:
  237. f.write(b"*.secret filter=failing_filter\n")
  238. # Configure filter as required with failing command
  239. config = self.repo.get_config()
  240. config.set(
  241. (b"filter", b"failing_filter"), b"clean", b"false"
  242. ) # false command always fails
  243. config.set((b"filter", b"failing_filter"), b"required", b"true")
  244. config.write_to_path()
  245. # Add .gitattributes
  246. porcelain.add(self.repo, paths=[".gitattributes"])
  247. # Create file that would use the filter
  248. secret_file = os.path.join(self.test_dir, "test.secret")
  249. with open(secret_file, "wb") as f:
  250. f.write(b"test content\n")
  251. # Adding file should raise error due to failing required filter
  252. with self.assertRaises(FilterError) as cm:
  253. porcelain.add(self.repo, paths=["test.secret"])
  254. self.assertIn("Required clean filter failed", str(cm.exception))
  255. def test_required_filter_success(self) -> None:
  256. """Test that required filter works when properly configured."""
  257. # Create .gitattributes with required filter
  258. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  259. with open(gitattributes_path, "wb") as f:
  260. f.write(b"*.secret filter=working_filter\n")
  261. # Configure filter as required with working command
  262. config = self.repo.get_config()
  263. config.set(
  264. (b"filter", b"working_filter"), b"clean", b"tr 'a-z' 'A-Z'"
  265. ) # uppercase
  266. config.set((b"filter", b"working_filter"), b"required", b"true")
  267. config.write_to_path()
  268. # Add .gitattributes
  269. porcelain.add(self.repo, paths=[".gitattributes"])
  270. # Create file that would use the filter
  271. secret_file = os.path.join(self.test_dir, "test.secret")
  272. with open(secret_file, "wb") as f:
  273. f.write(b"hello world\n")
  274. # Adding file should work and apply filter
  275. porcelain.add(self.repo, paths=["test.secret"])
  276. # Check that content was filtered
  277. index = self.repo.open_index()
  278. entry = index[b"test.secret"]
  279. blob = self.repo.object_store[entry.sha]
  280. self.assertEqual(blob.data, b"HELLO WORLD\n")
  281. def test_optional_filter_failure_fallback(self) -> None:
  282. """Test that optional filter failure falls back to original data."""
  283. # Create .gitattributes with optional filter
  284. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  285. with open(gitattributes_path, "wb") as f:
  286. f.write(b"*.txt filter=optional_filter\n")
  287. # Configure filter as optional (required=false) with failing command
  288. config = self.repo.get_config()
  289. config.set(
  290. (b"filter", b"optional_filter"), b"clean", b"false"
  291. ) # false command always fails
  292. config.set((b"filter", b"optional_filter"), b"required", b"false")
  293. config.write_to_path()
  294. # Add .gitattributes
  295. porcelain.add(self.repo, paths=[".gitattributes"])
  296. # Create file that would use the filter
  297. test_file = os.path.join(self.test_dir, "test.txt")
  298. with open(test_file, "wb") as f:
  299. f.write(b"test content\n")
  300. # Adding file should work and fallback to original content
  301. porcelain.add(self.repo, paths=["test.txt"])
  302. # Check that original content was preserved
  303. index = self.repo.open_index()
  304. entry = index[b"test.txt"]
  305. blob = self.repo.object_store[entry.sha]
  306. self.assertEqual(blob.data, b"test content\n")
  307. class ProcessFilterDriverTests(TestCase):
  308. """Tests for ProcessFilterDriver with real process filter."""
  309. def setUp(self):
  310. super().setUp()
  311. # Create a temporary test filter process dynamically
  312. self.test_filter_path = self._create_test_filter()
  313. def tearDown(self):
  314. # Clean up the test filter
  315. if hasattr(self, "test_filter_path") and os.path.exists(self.test_filter_path):
  316. os.unlink(self.test_filter_path)
  317. super().tearDown()
  318. def _create_test_filter(self):
  319. """Create a simple test filter process that works on all platforms."""
  320. import tempfile
  321. # Create filter script that uppercases on clean, lowercases on smudge
  322. filter_script = """import sys
  323. import os
  324. # Simple filter that doesn't use any external dependencies
  325. def read_exact(n):
  326. data = b""
  327. while len(data) < n:
  328. chunk = sys.stdin.buffer.read(n - len(data))
  329. if not chunk:
  330. break
  331. data += chunk
  332. return data
  333. def write_pkt(data):
  334. if data is None:
  335. sys.stdout.buffer.write(b"0000")
  336. else:
  337. length = len(data) + 4
  338. sys.stdout.buffer.write(("{:04x}".format(length)).encode())
  339. sys.stdout.buffer.write(data)
  340. sys.stdout.buffer.flush()
  341. def read_pkt():
  342. size_bytes = read_exact(4)
  343. if not size_bytes:
  344. return None
  345. size = int(size_bytes.decode(), 16)
  346. if size == 0:
  347. return None
  348. return read_exact(size - 4)
  349. # Handshake
  350. client_hello = read_pkt()
  351. version = read_pkt()
  352. flush = read_pkt()
  353. write_pkt(b"git-filter-server")
  354. write_pkt(b"version=2")
  355. write_pkt(None)
  356. # Read and echo capabilities
  357. caps = []
  358. while True:
  359. cap = read_pkt()
  360. if cap is None:
  361. break
  362. caps.append(cap)
  363. for cap in caps:
  364. write_pkt(cap)
  365. write_pkt(None)
  366. # Process commands
  367. while True:
  368. headers = {}
  369. while True:
  370. line = read_pkt()
  371. if line is None:
  372. break
  373. if b"=" in line:
  374. k, v = line.split(b"=", 1)
  375. headers[k.decode()] = v.decode()
  376. if not headers:
  377. break
  378. # Read data
  379. data_chunks = []
  380. while True:
  381. chunk = read_pkt()
  382. if chunk is None:
  383. break
  384. data_chunks.append(chunk)
  385. data = b"".join(data_chunks)
  386. # Process (uppercase for clean, lowercase for smudge)
  387. if headers.get("command") == "clean":
  388. result = data.upper()
  389. elif headers.get("command") == "smudge":
  390. result = data.lower()
  391. else:
  392. result = data
  393. # Send response
  394. write_pkt(b"status=success")
  395. write_pkt(None)
  396. # Send result
  397. chunk_size = 65516
  398. for i in range(0, len(result), chunk_size):
  399. write_pkt(result[i:i+chunk_size])
  400. write_pkt(None)
  401. # Send final headers (empty list to keep status=success)
  402. write_pkt(None)
  403. """
  404. # Create temporary file
  405. fd, path = tempfile.mkstemp(suffix=".py", prefix="test_filter_")
  406. try:
  407. os.write(fd, filter_script.encode())
  408. os.close(fd)
  409. # Make executable on Unix-like systems
  410. if os.name != "nt": # Not Windows
  411. os.chmod(path, 0o755)
  412. return path
  413. except:
  414. if os.path.exists(path):
  415. os.unlink(path)
  416. raise
  417. def test_process_filter_clean_operation(self):
  418. """Test clean operation using real process filter."""
  419. import sys
  420. driver = ProcessFilterDriver(
  421. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  422. )
  423. test_data = b"hello world"
  424. result = driver.clean(test_data)
  425. # Our test filter uppercases on clean
  426. self.assertEqual(result, b"HELLO WORLD")
  427. def test_process_filter_smudge_operation(self):
  428. """Test smudge operation using real process filter."""
  429. import sys
  430. driver = ProcessFilterDriver(
  431. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  432. )
  433. test_data = b"HELLO WORLD"
  434. result = driver.smudge(test_data, b"test.txt")
  435. # Our test filter lowercases on smudge
  436. self.assertEqual(result, b"hello world")
  437. def test_process_filter_large_data(self):
  438. """Test process filter with data larger than single pkt-line."""
  439. import sys
  440. driver = ProcessFilterDriver(
  441. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  442. )
  443. # Create data larger than max pkt-line payload (65516 bytes)
  444. test_data = b"a" * 70000
  445. result = driver.clean(test_data)
  446. # Should be uppercased
  447. self.assertEqual(result, b"A" * 70000)
  448. def test_fallback_to_individual_commands(self):
  449. """Test fallback when process filter fails."""
  450. driver = ProcessFilterDriver(
  451. clean_cmd="tr '[:lower:]' '[:upper:]'", # Shell command to uppercase
  452. process_cmd="/nonexistent/command", # This should fail
  453. required=False,
  454. )
  455. test_data = b"hello world\n"
  456. result = driver.clean(test_data)
  457. # Should fallback to tr command and uppercase
  458. self.assertEqual(result, b"HELLO WORLD\n")
  459. def test_process_reuse(self):
  460. """Test that process is reused across multiple operations."""
  461. import sys
  462. driver = ProcessFilterDriver(
  463. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  464. )
  465. # First operation
  466. result1 = driver.clean(b"test1")
  467. self.assertEqual(result1, b"TEST1")
  468. # Second operation should reuse the same process
  469. result2 = driver.clean(b"test2")
  470. self.assertEqual(result2, b"TEST2")
  471. # Process should still be alive
  472. self.assertIsNotNone(driver._process)
  473. self.assertIsNone(driver._process.poll()) # None means still running
  474. def test_error_handling_invalid_command(self):
  475. """Test error handling with invalid filter command."""
  476. driver = ProcessFilterDriver(process_cmd="/nonexistent/command", required=True)
  477. with self.assertRaises(FilterError) as cm:
  478. driver.clean(b"test data")
  479. self.assertIn("Failed to start process filter", str(cm.exception))
  480. class FilterContextTests(TestCase):
  481. """Tests for FilterContext class."""
  482. def test_filter_context_caches_long_running_drivers(self):
  483. """Test that FilterContext caches only long-running drivers."""
  484. # Create real filter drivers
  485. class UppercaseFilter:
  486. def clean(self, data):
  487. return data.upper()
  488. def smudge(self, data, path=b""):
  489. return data.lower()
  490. def cleanup(self):
  491. pass
  492. def reuse(self, config, filter_name):
  493. # Pretend it's a long-running filter that should be cached
  494. return True
  495. class IdentityFilter:
  496. def clean(self, data):
  497. return data
  498. def smudge(self, data, path=b""):
  499. return data
  500. def cleanup(self):
  501. pass
  502. def reuse(self, config, filter_name):
  503. # Lightweight filter, don't cache
  504. return False
  505. # Create registry and context
  506. # Need to provide a config for caching to work
  507. from dulwich.config import ConfigDict
  508. config = ConfigDict()
  509. # Add some dummy config to make it truthy (use proper format)
  510. config.set((b"filter", b"uppercase"), b"clean", b"dummy")
  511. registry = FilterRegistry(config=config)
  512. context = FilterContext(registry)
  513. # Register drivers
  514. long_running = UppercaseFilter()
  515. stateless = IdentityFilter()
  516. registry.register_driver("uppercase", long_running)
  517. registry.register_driver("identity", stateless)
  518. # Get drivers through context
  519. driver1 = context.get_driver("uppercase")
  520. driver2 = context.get_driver("uppercase")
  521. # Long-running driver should be cached
  522. self.assertIs(driver1, driver2)
  523. self.assertIs(driver1, long_running)
  524. # Get stateless driver
  525. stateless1 = context.get_driver("identity")
  526. stateless2 = context.get_driver("identity")
  527. # Stateless driver comes from registry but isn't cached in context
  528. self.assertIs(stateless1, stateless)
  529. self.assertIs(stateless2, stateless)
  530. self.assertNotIn("identity", context._active_drivers)
  531. self.assertIn("uppercase", context._active_drivers)
  532. def test_filter_context_cleanup(self):
  533. """Test that FilterContext properly cleans up resources."""
  534. cleanup_called = []
  535. class TrackableFilter:
  536. def __init__(self, name):
  537. self.name = name
  538. def clean(self, data):
  539. return data
  540. def smudge(self, data, path=b""):
  541. return data
  542. def cleanup(self):
  543. cleanup_called.append(self.name)
  544. def is_long_running(self):
  545. return True
  546. # Create registry and context
  547. registry = FilterRegistry()
  548. context = FilterContext(registry)
  549. # Register and use drivers
  550. filter1 = TrackableFilter("filter1")
  551. filter2 = TrackableFilter("filter2")
  552. filter3 = TrackableFilter("filter3")
  553. registry.register_driver("filter1", filter1)
  554. registry.register_driver("filter2", filter2)
  555. registry.register_driver("filter3", filter3)
  556. # Get only some drivers to cache them
  557. context.get_driver("filter1")
  558. context.get_driver("filter2")
  559. # Don't get filter3
  560. # Close context
  561. context.close()
  562. # Verify cleanup was called for all drivers (context closes registry too)
  563. self.assertEqual(set(cleanup_called), {"filter1", "filter2", "filter3"})
  564. def test_filter_context_get_driver_returns_none_for_missing(self):
  565. """Test that get_driver returns None for non-existent drivers."""
  566. registry = FilterRegistry()
  567. context = FilterContext(registry)
  568. result = context.get_driver("nonexistent")
  569. self.assertIsNone(result)
  570. def test_filter_context_with_real_process_filter(self):
  571. """Test FilterContext with real ProcessFilterDriver instances."""
  572. # Use existing test filter from ProcessFilterDriverTests
  573. test_dir = tempfile.mkdtemp()
  574. self.addCleanup(shutil.rmtree, test_dir)
  575. # Create a simple test filter that just passes data through
  576. filter_script = _PASSTHROUGH_FILTER_SCRIPT
  577. filter_path = os.path.join(test_dir, "simple_filter.py")
  578. with open(filter_path, "w") as f:
  579. f.write(filter_script)
  580. # Create ProcessFilterDriver instances
  581. # One with process_cmd (long-running)
  582. process_driver = ProcessFilterDriver(
  583. process_cmd=None, # Don't use actual process to avoid complexity
  584. clean_cmd=f"{sys.executable} {filter_path}",
  585. smudge_cmd=f"{sys.executable} {filter_path}",
  586. )
  587. # Register in context
  588. from dulwich.config import ConfigDict
  589. config = ConfigDict()
  590. # Add some dummy config to make it truthy (use proper format)
  591. config.set(
  592. (b"filter", b"process"),
  593. b"clean",
  594. f"{sys.executable} {filter_path}".encode(),
  595. )
  596. config.set(
  597. (b"filter", b"process"),
  598. b"smudge",
  599. f"{sys.executable} {filter_path}".encode(),
  600. )
  601. registry = FilterRegistry(config=config)
  602. context = FilterContext(registry)
  603. registry.register_driver("process", process_driver)
  604. # Get driver - should not be cached since it's not long-running
  605. driver1 = context.get_driver("process")
  606. self.assertIsNotNone(driver1)
  607. # Check that it's not a long-running process (no process_cmd)
  608. self.assertIsNone(driver1.process_cmd)
  609. self.assertNotIn("process", context._active_drivers)
  610. # Test with a long-running driver that should be cached
  611. # Create a mock driver that always wants to be reused
  612. class CacheableProcessDriver:
  613. def __init__(self):
  614. self.process_cmd = "dummy"
  615. self.clean_cmd = None
  616. self.smudge_cmd = None
  617. self.required = False
  618. def clean(self, data):
  619. return data
  620. def smudge(self, data, path=b""):
  621. return data
  622. def cleanup(self):
  623. pass
  624. def reuse(self, config, filter_name):
  625. # This driver always wants to be cached (simulates a long-running process)
  626. return True
  627. cacheable_driver = CacheableProcessDriver()
  628. registry.register_driver("long_process", cacheable_driver)
  629. driver2 = context.get_driver("long_process")
  630. # Check that it has a process_cmd (long-running)
  631. self.assertIsNotNone(driver2.process_cmd)
  632. self.assertIn("long_process", context._active_drivers)
  633. context.close()
  634. def test_filter_context_closes_registry(self):
  635. """Test that closing FilterContext also closes the registry."""
  636. # Track if registry.close() is called
  637. registry_closed = []
  638. class TrackingRegistry(FilterRegistry):
  639. def close(self):
  640. registry_closed.append(True)
  641. super().close()
  642. registry = TrackingRegistry()
  643. context = FilterContext(registry)
  644. # Close context should also close registry
  645. context.close()
  646. self.assertTrue(registry_closed)
  647. class ProcessFilterProtocolTests(TestCase):
  648. """Tests for ProcessFilterDriver protocol compliance."""
  649. def setUp(self):
  650. super().setUp()
  651. # Create a spec-compliant test filter process dynamically
  652. self.test_filter_path = self._create_spec_compliant_filter()
  653. def tearDown(self):
  654. # Clean up the test filter
  655. if hasattr(self, "test_filter_path") and os.path.exists(self.test_filter_path):
  656. os.unlink(self.test_filter_path)
  657. super().tearDown()
  658. def _create_spec_compliant_filter(self):
  659. """Create a spec-compliant test filter that works on all platforms."""
  660. import tempfile
  661. # This filter strictly follows Git spec - no newlines in packets
  662. filter_script = """import sys
  663. def read_exact(n):
  664. data = b""
  665. while len(data) < n:
  666. chunk = sys.stdin.buffer.read(n - len(data))
  667. if not chunk:
  668. break
  669. data += chunk
  670. return data
  671. def write_pkt(data):
  672. if data is None:
  673. sys.stdout.buffer.write(b"0000")
  674. else:
  675. length = len(data) + 4
  676. sys.stdout.buffer.write(("{:04x}".format(length)).encode())
  677. sys.stdout.buffer.write(data)
  678. sys.stdout.buffer.flush()
  679. def read_pkt():
  680. size_bytes = read_exact(4)
  681. if not size_bytes:
  682. return None
  683. size = int(size_bytes.decode(), 16)
  684. if size == 0:
  685. return None
  686. return read_exact(size - 4)
  687. # Handshake - exact format, no newlines
  688. client_hello = read_pkt()
  689. version = read_pkt()
  690. flush = read_pkt()
  691. if client_hello != b"git-filter-client":
  692. sys.exit(1)
  693. if version != b"version=2":
  694. sys.exit(1)
  695. write_pkt(b"git-filter-server") # No newline
  696. write_pkt(b"version=2") # No newline
  697. write_pkt(None)
  698. # Read and echo capabilities
  699. caps = []
  700. while True:
  701. cap = read_pkt()
  702. if cap is None:
  703. break
  704. caps.append(cap)
  705. for cap in caps:
  706. if cap in [b"capability=clean", b"capability=smudge"]:
  707. write_pkt(cap)
  708. write_pkt(None)
  709. # Process commands
  710. while True:
  711. headers = {}
  712. while True:
  713. line = read_pkt()
  714. if line is None:
  715. break
  716. if b"=" in line:
  717. k, v = line.split(b"=", 1)
  718. headers[k.decode()] = v.decode()
  719. if not headers:
  720. break
  721. # Read data
  722. data_chunks = []
  723. while True:
  724. chunk = read_pkt()
  725. if chunk is None:
  726. break
  727. data_chunks.append(chunk)
  728. data = b"".join(data_chunks)
  729. # Process
  730. if headers.get("command") == "clean":
  731. result = data.upper()
  732. elif headers.get("command") == "smudge":
  733. result = data.lower()
  734. else:
  735. result = data
  736. # Send response
  737. write_pkt(b"status=success")
  738. write_pkt(None)
  739. # Send result
  740. chunk_size = 65516
  741. for i in range(0, len(result), chunk_size):
  742. write_pkt(result[i:i+chunk_size])
  743. write_pkt(None)
  744. # Send final headers (empty list to keep status=success)
  745. write_pkt(None)
  746. """
  747. fd, path = tempfile.mkstemp(suffix=".py", prefix="test_filter_spec_")
  748. try:
  749. os.write(fd, filter_script.encode())
  750. os.close(fd)
  751. if os.name != "nt": # Not Windows
  752. os.chmod(path, 0o755)
  753. return path
  754. except:
  755. if os.path.exists(path):
  756. os.unlink(path)
  757. raise
  758. def test_protocol_handshake_exact_format(self):
  759. """Test that handshake uses exact format without newlines."""
  760. import sys
  761. driver = ProcessFilterDriver(
  762. process_cmd=f"{sys.executable} {self.test_filter_path}",
  763. required=True, # Require success to test protocol compliance
  764. )
  765. # This should work with exact protocol format
  766. test_data = b"hello world"
  767. result = driver.clean(test_data)
  768. # Our test filter uppercases on clean
  769. self.assertEqual(result, b"HELLO WORLD")
  770. def test_capability_negotiation_exact_format(self):
  771. """Test that capabilities are sent and received in exact format."""
  772. import sys
  773. driver = ProcessFilterDriver(
  774. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  775. )
  776. # Force capability negotiation by using both clean and smudge
  777. clean_result = driver.clean(b"test")
  778. smudge_result = driver.smudge(b"TEST", b"test.txt")
  779. self.assertEqual(clean_result, b"TEST")
  780. self.assertEqual(smudge_result, b"test")
  781. def test_binary_data_handling(self):
  782. """Test handling of binary data through the protocol."""
  783. import sys
  784. driver = ProcessFilterDriver(
  785. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  786. )
  787. # Binary data with null bytes, high bytes, etc.
  788. binary_data = bytes(range(256))
  789. result = driver.clean(binary_data)
  790. # Should handle binary data without crashing
  791. self.assertIsInstance(result, bytes)
  792. # Our test filter uppercases bytes directly, which works for binary data
  793. # The fix ensures headers are kept as bytes, so binary content doesn't cause decode errors
  794. def test_binary_data_with_invalid_utf8_sequences(self):
  795. """Test handling of binary data with invalid UTF-8 sequences.
  796. Regression test for https://github.com/jelmer/dulwich/issues/2023
  797. where binary files (like .ogg, .jpg) caused UTF-8 decode errors.
  798. """
  799. import sys
  800. driver = ProcessFilterDriver(
  801. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  802. )
  803. # Create binary data with the specific byte that caused the issue (0xe5 at position 14)
  804. # plus other invalid UTF-8 sequences
  805. binary_data = b"some header \xe5\xff\xfe binary data"
  806. result = driver.clean(binary_data)
  807. # Should handle binary data without UTF-8 decode errors
  808. self.assertIsInstance(result, bytes)
  809. # The filter should process it successfully
  810. self.assertEqual(result, binary_data.upper())
  811. def test_large_file_chunking(self):
  812. """Test proper chunking of large files."""
  813. import sys
  814. driver = ProcessFilterDriver(
  815. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  816. )
  817. # Create data larger than max pkt-line payload (65516 bytes)
  818. large_data = b"a" * 100000
  819. result = driver.clean(large_data)
  820. # Should be properly processed (uppercased)
  821. expected = b"A" * 100000
  822. self.assertEqual(result, expected)
  823. def test_empty_file_handling(self):
  824. """Test handling of empty files."""
  825. import sys
  826. driver = ProcessFilterDriver(
  827. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  828. )
  829. result = driver.clean(b"")
  830. self.assertEqual(result, b"")
  831. def test_special_characters_in_pathname(self):
  832. """Test paths with special characters are handled correctly."""
  833. import sys
  834. # Test various special characters in paths
  835. special_paths = [
  836. b"file with spaces.txt",
  837. b"path/with/slashes.txt",
  838. b"file=with=equals.txt",
  839. b"file\nwith\nnewlines.txt",
  840. b"filew&with&ampersand.txt",
  841. ]
  842. test_data = b"test data"
  843. with create_passthrough_filter() as passthrough_filter_path:
  844. for process_cmd, smudge_cmd in [
  845. (f"{sys.executable} {self.test_filter_path}", None),
  846. (None, f"{sys.executable} {passthrough_filter_path} %f"),
  847. ]:
  848. driver = ProcessFilterDriver(
  849. process_cmd=process_cmd,
  850. smudge_cmd=smudge_cmd,
  851. required=True,
  852. )
  853. for path in special_paths:
  854. with self.subTest(
  855. process_cmd=process_cmd, smudge_cmd=smudge_cmd, path=path
  856. ):
  857. result = driver.smudge(test_data, path)
  858. self.assertEqual(result, b"test data")
  859. def test_process_crash_recovery(self):
  860. """Test that process is properly restarted after crash."""
  861. import sys
  862. driver = ProcessFilterDriver(
  863. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  864. )
  865. # First operation
  866. result = driver.clean(b"test1")
  867. self.assertEqual(result, b"TEST1")
  868. # Kill the process
  869. if driver._process:
  870. driver._process.kill()
  871. driver._process.wait()
  872. driver.cleanup()
  873. # Should restart and work again
  874. result = driver.clean(b"test2")
  875. self.assertEqual(result, b"TEST2")
  876. def test_malformed_process_response_handling(self):
  877. """Test handling of malformed responses from process."""
  878. # Create a filter that sends malformed responses
  879. malformed_filter = """#!/usr/bin/env python3
  880. import sys
  881. import os
  882. sys.path.insert(0, os.path.dirname(__file__))
  883. from dulwich.protocol import Protocol
  884. protocol = Protocol(
  885. lambda n: sys.stdin.buffer.read(n),
  886. lambda d: sys.stdout.buffer.write(d) or len(d)
  887. )
  888. # Read handshake
  889. protocol.read_pkt_line()
  890. protocol.read_pkt_line()
  891. protocol.read_pkt_line()
  892. # Send invalid handshake
  893. protocol.write_pkt_line(b"invalid-welcome")
  894. protocol.write_pkt_line(b"version=2")
  895. protocol.write_pkt_line(None)
  896. """
  897. import tempfile
  898. fd, script_path = tempfile.mkstemp(suffix=".py")
  899. try:
  900. os.write(fd, malformed_filter.encode())
  901. os.close(fd)
  902. os.chmod(script_path, 0o755)
  903. driver = ProcessFilterDriver(
  904. process_cmd=f"python3 {script_path}",
  905. clean_cmd="cat", # Fallback
  906. required=False,
  907. )
  908. # Should fallback to clean_cmd when process fails
  909. result = driver.clean(b"test data")
  910. self.assertEqual(result, b"test data")
  911. finally:
  912. os.unlink(script_path)
  913. def test_concurrent_filter_operations(self):
  914. """Test that concurrent operations work correctly."""
  915. import sys
  916. driver = ProcessFilterDriver(
  917. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  918. )
  919. results = []
  920. errors = []
  921. def worker(data):
  922. try:
  923. result = driver.clean(data)
  924. results.append(result)
  925. except Exception as e:
  926. errors.append(e)
  927. # Start 5 concurrent operations
  928. threads = []
  929. test_data = [f"test{i}".encode() for i in range(5)]
  930. for data in test_data:
  931. t = threading.Thread(target=worker, args=(data,))
  932. threads.append(t)
  933. t.start()
  934. for t in threads:
  935. t.join()
  936. # Should have no errors
  937. self.assertEqual(len(errors), 0, f"Errors: {errors}")
  938. self.assertEqual(len(results), 5)
  939. # All results should be uppercase versions
  940. expected = [data.upper() for data in test_data]
  941. self.assertEqual(sorted(results), sorted(expected))
  942. def test_process_resource_cleanup(self):
  943. """Test that process resources are properly cleaned up."""
  944. import sys
  945. driver = ProcessFilterDriver(
  946. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  947. )
  948. # Use the driver
  949. result = driver.clean(b"test")
  950. self.assertEqual(result, b"TEST")
  951. # Process should be running
  952. self.assertIsNotNone(driver._process)
  953. self.assertIsNone(driver._process.poll()) # None means still running
  954. # Remember the old process to check it was terminated
  955. old_process = driver._process
  956. # Manually clean up (simulates __del__)
  957. driver.cleanup()
  958. # Process reference should be cleared
  959. self.assertIsNone(driver._process)
  960. self.assertIsNone(driver._protocol)
  961. # Old process should be terminated
  962. self.assertIsNotNone(old_process.poll()) # Not None means terminated
  963. def test_required_filter_error_propagation(self):
  964. """Test that errors are properly propagated when filter is required."""
  965. driver = ProcessFilterDriver(
  966. process_cmd="/definitely/nonexistent/command", required=True
  967. )
  968. with self.assertRaises(FilterError) as cm:
  969. driver.clean(b"test data")
  970. self.assertIn("Failed to start process filter", str(cm.exception))
  971. def test_two_phase_response_protocol(self):
  972. """Test filter protocol with two-phase response (initial + final headers).
  973. This test verifies that the filter correctly handles the Git LFS protocol
  974. where filters send:
  975. 1. Initial headers with status
  976. 2. Content data
  977. 3. Final headers with status
  978. This is the format used by git-lfs and documented in the Git filter protocol.
  979. """
  980. import sys
  981. import tempfile
  982. # Create a filter that follows the two-phase protocol
  983. filter_script = """import sys
  984. def read_exact(n):
  985. data = b""
  986. while len(data) < n:
  987. chunk = sys.stdin.buffer.read(n - len(data))
  988. if not chunk:
  989. break
  990. data += chunk
  991. return data
  992. def write_pkt(data):
  993. if data is None:
  994. sys.stdout.buffer.write(b"0000")
  995. else:
  996. length = len(data) + 4
  997. sys.stdout.buffer.write(("{:04x}".format(length)).encode())
  998. sys.stdout.buffer.write(data)
  999. sys.stdout.buffer.flush()
  1000. def read_pkt():
  1001. size_bytes = read_exact(4)
  1002. if not size_bytes:
  1003. return None
  1004. size = int(size_bytes.decode(), 16)
  1005. if size == 0:
  1006. return None
  1007. return read_exact(size - 4)
  1008. # Handshake
  1009. client_hello = read_pkt()
  1010. version = read_pkt()
  1011. flush = read_pkt()
  1012. write_pkt(b"git-filter-server")
  1013. write_pkt(b"version=2")
  1014. write_pkt(None)
  1015. # Read and echo capabilities
  1016. caps = []
  1017. while True:
  1018. cap = read_pkt()
  1019. if cap is None:
  1020. break
  1021. caps.append(cap)
  1022. for cap in caps:
  1023. write_pkt(cap)
  1024. write_pkt(None)
  1025. # Process commands
  1026. while True:
  1027. headers = {}
  1028. while True:
  1029. line = read_pkt()
  1030. if line is None:
  1031. break
  1032. if b"=" in line:
  1033. k, v = line.split(b"=", 1)
  1034. headers[k.decode()] = v.decode()
  1035. if not headers:
  1036. break
  1037. # Read data
  1038. data_chunks = []
  1039. while True:
  1040. chunk = read_pkt()
  1041. if chunk is None:
  1042. break
  1043. data_chunks.append(chunk)
  1044. data = b"".join(data_chunks)
  1045. # Process
  1046. if headers.get("command") == "clean":
  1047. result = data.upper()
  1048. elif headers.get("command") == "smudge":
  1049. result = data.lower()
  1050. else:
  1051. result = data
  1052. # TWO-PHASE RESPONSE: Send initial headers
  1053. write_pkt(b"status=success")
  1054. write_pkt(None)
  1055. # Send result data
  1056. chunk_size = 65516
  1057. for i in range(0, len(result), chunk_size):
  1058. write_pkt(result[i:i+chunk_size])
  1059. write_pkt(None)
  1060. # TWO-PHASE RESPONSE: Send final headers (empty list to keep status=success)
  1061. write_pkt(None)
  1062. """
  1063. fd, filter_path = tempfile.mkstemp(
  1064. suffix=".py", prefix="test_filter_two_phase_"
  1065. )
  1066. try:
  1067. os.write(fd, filter_script.encode())
  1068. os.close(fd)
  1069. if os.name != "nt":
  1070. os.chmod(filter_path, 0o755)
  1071. driver = ProcessFilterDriver(
  1072. process_cmd=f"{sys.executable} {filter_path}", required=True
  1073. )
  1074. # Test clean operation
  1075. test_data = b"hello world"
  1076. result = driver.clean(test_data)
  1077. self.assertEqual(result, b"HELLO WORLD")
  1078. # Test smudge operation
  1079. result = driver.smudge(b"HELLO WORLD", b"test.txt")
  1080. self.assertEqual(result, b"hello world")
  1081. driver.cleanup()
  1082. finally:
  1083. if os.path.exists(filter_path):
  1084. os.unlink(filter_path)
  1085. def test_two_phase_response_with_status_messages(self):
  1086. """Test filter that sends status messages in final headers.
  1087. Some filters (like git-lfs) may send progress or status messages
  1088. in the final headers. This test verifies that we can handle those.
  1089. """
  1090. import sys
  1091. import tempfile
  1092. # Create a filter that sends extra status info in final headers
  1093. filter_script = """import sys
  1094. def read_exact(n):
  1095. data = b""
  1096. while len(data) < n:
  1097. chunk = sys.stdin.buffer.read(n - len(data))
  1098. if not chunk:
  1099. break
  1100. data += chunk
  1101. return data
  1102. def write_pkt(data):
  1103. if data is None:
  1104. sys.stdout.buffer.write(b"0000")
  1105. else:
  1106. length = len(data) + 4
  1107. sys.stdout.buffer.write(("{:04x}".format(length)).encode())
  1108. sys.stdout.buffer.write(data)
  1109. sys.stdout.buffer.flush()
  1110. def read_pkt():
  1111. size_bytes = read_exact(4)
  1112. if not size_bytes:
  1113. return None
  1114. size = int(size_bytes.decode(), 16)
  1115. if size == 0:
  1116. return None
  1117. return read_exact(size - 4)
  1118. # Handshake
  1119. client_hello = read_pkt()
  1120. version = read_pkt()
  1121. flush = read_pkt()
  1122. write_pkt(b"git-filter-server")
  1123. write_pkt(b"version=2")
  1124. write_pkt(None)
  1125. # Read and echo capabilities
  1126. caps = []
  1127. while True:
  1128. cap = read_pkt()
  1129. if cap is None:
  1130. break
  1131. caps.append(cap)
  1132. for cap in caps:
  1133. write_pkt(cap)
  1134. write_pkt(None)
  1135. # Process commands
  1136. while True:
  1137. headers = {}
  1138. while True:
  1139. line = read_pkt()
  1140. if line is None:
  1141. break
  1142. if b"=" in line:
  1143. k, v = line.split(b"=", 1)
  1144. headers[k.decode()] = v.decode()
  1145. if not headers:
  1146. break
  1147. # Read data
  1148. data_chunks = []
  1149. while True:
  1150. chunk = read_pkt()
  1151. if chunk is None:
  1152. break
  1153. data_chunks.append(chunk)
  1154. data = b"".join(data_chunks)
  1155. # Process
  1156. result = data.upper()
  1157. # Send initial headers
  1158. write_pkt(b"status=success")
  1159. write_pkt(None)
  1160. # Send result data
  1161. chunk_size = 65516
  1162. for i in range(0, len(result), chunk_size):
  1163. write_pkt(result[i:i+chunk_size])
  1164. write_pkt(None)
  1165. # Send final headers with progress messages (like git-lfs does)
  1166. write_pkt(b"status=success")
  1167. write_pkt(None)
  1168. """
  1169. fd, filter_path = tempfile.mkstemp(suffix=".py", prefix="test_filter_status_")
  1170. try:
  1171. os.write(fd, filter_script.encode())
  1172. os.close(fd)
  1173. if os.name != "nt":
  1174. os.chmod(filter_path, 0o755)
  1175. driver = ProcessFilterDriver(
  1176. process_cmd=f"{sys.executable} {filter_path}", required=True
  1177. )
  1178. # Test clean operation with status messages
  1179. test_data = b"test data with status"
  1180. result = driver.clean(test_data)
  1181. self.assertEqual(result, b"TEST DATA WITH STATUS")
  1182. driver.cleanup()
  1183. finally:
  1184. if os.path.exists(filter_path):
  1185. os.unlink(filter_path)
  1186. def test_two_phase_response_with_final_error(self):
  1187. """Test filter that reports error in final headers.
  1188. The Git protocol allows filters to report success initially,
  1189. then report an error in the final headers. This test ensures
  1190. we handle that correctly.
  1191. """
  1192. import sys
  1193. import tempfile
  1194. # Create a filter that sends error in final headers
  1195. filter_script = """import sys
  1196. def read_exact(n):
  1197. data = b""
  1198. while len(data) < n:
  1199. chunk = sys.stdin.buffer.read(n - len(data))
  1200. if not chunk:
  1201. break
  1202. data += chunk
  1203. return data
  1204. def write_pkt(data):
  1205. if data is None:
  1206. sys.stdout.buffer.write(b"0000")
  1207. else:
  1208. length = len(data) + 4
  1209. sys.stdout.buffer.write(("{:04x}".format(length)).encode())
  1210. sys.stdout.buffer.write(data)
  1211. sys.stdout.buffer.flush()
  1212. def read_pkt():
  1213. size_bytes = read_exact(4)
  1214. if not size_bytes:
  1215. return None
  1216. size = int(size_bytes.decode(), 16)
  1217. if size == 0:
  1218. return None
  1219. return read_exact(size - 4)
  1220. # Handshake
  1221. client_hello = read_pkt()
  1222. version = read_pkt()
  1223. flush = read_pkt()
  1224. write_pkt(b"git-filter-server")
  1225. write_pkt(b"version=2")
  1226. write_pkt(None)
  1227. # Read and echo capabilities
  1228. caps = []
  1229. while True:
  1230. cap = read_pkt()
  1231. if cap is None:
  1232. break
  1233. caps.append(cap)
  1234. for cap in caps:
  1235. write_pkt(cap)
  1236. write_pkt(None)
  1237. # Process commands
  1238. while True:
  1239. headers = {}
  1240. while True:
  1241. line = read_pkt()
  1242. if line is None:
  1243. break
  1244. if b"=" in line:
  1245. k, v = line.split(b"=", 1)
  1246. headers[k.decode()] = v.decode()
  1247. if not headers:
  1248. break
  1249. # Read data
  1250. data_chunks = []
  1251. while True:
  1252. chunk = read_pkt()
  1253. if chunk is None:
  1254. break
  1255. data_chunks.append(chunk)
  1256. data = b"".join(data_chunks)
  1257. # Send initial headers with success
  1258. write_pkt(b"status=success")
  1259. write_pkt(None)
  1260. # Send partial result
  1261. write_pkt(b"PARTIAL")
  1262. write_pkt(None)
  1263. # Send final headers with error (simulating processing failure)
  1264. write_pkt(b"status=error")
  1265. write_pkt(None)
  1266. """
  1267. fd, filter_path = tempfile.mkstemp(suffix=".py", prefix="test_filter_error_")
  1268. try:
  1269. os.write(fd, filter_script.encode())
  1270. os.close(fd)
  1271. if os.name != "nt":
  1272. os.chmod(filter_path, 0o755)
  1273. driver = ProcessFilterDriver(
  1274. process_cmd=f"{sys.executable} {filter_path}", required=True
  1275. )
  1276. # Should raise FilterError due to final status being error
  1277. with self.assertRaises(FilterError) as cm:
  1278. driver.clean(b"test data")
  1279. self.assertIn("final status: error", str(cm.exception))
  1280. driver.cleanup()
  1281. finally:
  1282. if os.path.exists(filter_path):
  1283. os.unlink(filter_path)
  1284. _PASSTHROUGH_FILTER_SCRIPT = """import sys
  1285. while True:
  1286. line = sys.stdin.buffer.read()
  1287. if not line:
  1288. break
  1289. sys.stdout.buffer.write(line)
  1290. sys.stdout.buffer.flush()
  1291. """
  1292. @contextmanager
  1293. def create_passthrough_filter() -> Iterator[str]:
  1294. filter_script = _PASSTHROUGH_FILTER_SCRIPT
  1295. with tempfile.NamedTemporaryFile(
  1296. suffix=".py", delete=False, prefix="test_filter_passthrough_"
  1297. ) as f:
  1298. f.write(filter_script.encode())
  1299. path = f.name
  1300. try:
  1301. if os.name != "nt": # Not Windows
  1302. os.chmod(path, 0o755)
  1303. yield path
  1304. finally:
  1305. try:
  1306. os.unlink(path)
  1307. except FileNotFoundError:
  1308. pass