test_filters.py 48 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596
  1. # test_filters.py -- Tests for filters
  2. # Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for filters."""
  22. import os
  23. import tempfile
  24. import threading
  25. from dulwich import porcelain
  26. from dulwich.filters import (
  27. FilterContext,
  28. FilterError,
  29. FilterRegistry,
  30. ProcessFilterDriver,
  31. )
  32. from dulwich.repo import Repo
  33. from . import TestCase
  34. class GitAttributesFilterIntegrationTests(TestCase):
  35. """Test gitattributes integration with filter drivers."""
  36. def setUp(self) -> None:
  37. super().setUp()
  38. self.test_dir = tempfile.mkdtemp()
  39. self.addCleanup(self._cleanup_test_dir)
  40. self.repo = Repo.init(self.test_dir)
  41. def _cleanup_test_dir(self) -> None:
  42. """Clean up test directory."""
  43. import shutil
  44. shutil.rmtree(self.test_dir)
  45. def test_gitattributes_text_filter(self) -> None:
  46. """Test that text attribute triggers line ending conversion."""
  47. # Configure autocrlf first
  48. config = self.repo.get_config()
  49. config.set((b"core",), b"autocrlf", b"true")
  50. config.write_to_path()
  51. # Create .gitattributes with text attribute
  52. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  53. with open(gitattributes_path, "wb") as f:
  54. f.write(b"*.txt text\n")
  55. f.write(b"*.bin -text\n")
  56. # Add .gitattributes
  57. porcelain.add(self.repo, paths=[".gitattributes"])
  58. porcelain.commit(self.repo, message=b"Add gitattributes")
  59. # Create text file with CRLF
  60. text_file = os.path.join(self.test_dir, "test.txt")
  61. with open(text_file, "wb") as f:
  62. f.write(b"line1\r\nline2\r\n")
  63. # Create binary file with CRLF
  64. bin_file = os.path.join(self.test_dir, "test.bin")
  65. with open(bin_file, "wb") as f:
  66. f.write(b"binary\r\ndata\r\n")
  67. # Add files
  68. porcelain.add(self.repo, paths=["test.txt", "test.bin"])
  69. # Check that text file was normalized
  70. index = self.repo.open_index()
  71. text_entry = index[b"test.txt"]
  72. text_blob = self.repo.object_store[text_entry.sha]
  73. self.assertEqual(text_blob.data, b"line1\nline2\n")
  74. # Check that binary file was not normalized
  75. bin_entry = index[b"test.bin"]
  76. bin_blob = self.repo.object_store[bin_entry.sha]
  77. self.assertEqual(bin_blob.data, b"binary\r\ndata\r\n")
  78. def test_gitattributes_custom_filter(self) -> None:
  79. """Test custom filter specified in gitattributes."""
  80. # Create a Python script that acts as our filter
  81. import sys
  82. filter_script = os.path.join(self.test_dir, "redact_filter.py")
  83. with open(filter_script, "w") as f:
  84. f.write("""#!/usr/bin/env python3
  85. import sys
  86. data = sys.stdin.buffer.read()
  87. # Replace all digits with X
  88. result = bytearray()
  89. for b in data:
  90. if chr(b).isdigit():
  91. result.append(ord('X'))
  92. else:
  93. result.append(b)
  94. sys.stdout.buffer.write(result)
  95. """)
  96. os.chmod(filter_script, 0o755)
  97. # Create .gitattributes with custom filter
  98. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  99. with open(gitattributes_path, "wb") as f:
  100. f.write(b"*.secret filter=redact\n")
  101. # Configure custom filter (use Python script for testing)
  102. config = self.repo.get_config()
  103. # This filter replaces all digits with X
  104. config.set(
  105. (b"filter", b"redact"),
  106. b"clean",
  107. f"{sys.executable} {filter_script}".encode(),
  108. )
  109. config.write_to_path()
  110. # Add .gitattributes
  111. porcelain.add(self.repo, paths=[".gitattributes"])
  112. # Create file with sensitive content
  113. secret_file = os.path.join(self.test_dir, "password.secret")
  114. with open(secret_file, "wb") as f:
  115. f.write(b"password123\ntoken456\n")
  116. # Add file
  117. porcelain.add(self.repo, paths=["password.secret"])
  118. # Check that content was filtered
  119. index = self.repo.open_index()
  120. entry = index[b"password.secret"]
  121. blob = self.repo.object_store[entry.sha]
  122. self.assertEqual(blob.data, b"passwordXXX\ntokenXXX\n")
  123. def test_gitattributes_from_tree(self) -> None:
  124. """Test that gitattributes from tree are used when no working tree exists."""
  125. # Create .gitattributes with text attribute
  126. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  127. with open(gitattributes_path, "wb") as f:
  128. f.write(b"*.txt text\n")
  129. # Add and commit .gitattributes
  130. porcelain.add(self.repo, paths=[".gitattributes"])
  131. porcelain.commit(self.repo, message=b"Add gitattributes")
  132. # Remove .gitattributes from working tree
  133. os.remove(gitattributes_path)
  134. # Get gitattributes - should still work from tree
  135. gitattributes = self.repo.get_gitattributes()
  136. attrs = gitattributes.match_path(b"test.txt")
  137. self.assertEqual(attrs.get(b"text"), True)
  138. def test_gitattributes_info_attributes(self) -> None:
  139. """Test that .git/info/attributes is read."""
  140. # Create info/attributes
  141. info_dir = os.path.join(self.repo.controldir(), "info")
  142. if not os.path.exists(info_dir):
  143. os.makedirs(info_dir)
  144. info_attrs_path = os.path.join(info_dir, "attributes")
  145. with open(info_attrs_path, "wb") as f:
  146. f.write(b"*.log text\n")
  147. # Get gitattributes
  148. gitattributes = self.repo.get_gitattributes()
  149. attrs = gitattributes.match_path(b"debug.log")
  150. self.assertEqual(attrs.get(b"text"), True)
  151. def test_filter_precedence(self) -> None:
  152. """Test that filter attribute takes precedence over text attribute."""
  153. # Create a Python script that converts to uppercase
  154. import sys
  155. filter_script = os.path.join(self.test_dir, "uppercase_filter.py")
  156. with open(filter_script, "w") as f:
  157. f.write("""#!/usr/bin/env python3
  158. import sys
  159. data = sys.stdin.buffer.read()
  160. # Convert bytes to string, uppercase, then back to bytes
  161. result = data.decode('utf-8', errors='replace').upper().encode('utf-8')
  162. sys.stdout.buffer.write(result)
  163. """)
  164. os.chmod(filter_script, 0o755)
  165. # Create .gitattributes with both text and filter
  166. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  167. with open(gitattributes_path, "wb") as f:
  168. f.write(b"*.txt text filter=custom\n")
  169. # Configure autocrlf and custom filter
  170. config = self.repo.get_config()
  171. config.set((b"core",), b"autocrlf", b"true")
  172. # This filter converts to uppercase
  173. config.set(
  174. (b"filter", b"custom"),
  175. b"clean",
  176. f"{sys.executable} {filter_script}".encode(),
  177. )
  178. config.write_to_path()
  179. # Add .gitattributes
  180. porcelain.add(self.repo, paths=[".gitattributes"])
  181. # Create text file with lowercase and CRLF
  182. text_file = os.path.join(self.test_dir, "test.txt")
  183. with open(text_file, "wb") as f:
  184. f.write(b"hello\r\nworld\r\n")
  185. # Add file
  186. porcelain.add(self.repo, paths=["test.txt"])
  187. # Check that custom filter was applied (not just line ending conversion)
  188. index = self.repo.open_index()
  189. entry = index[b"test.txt"]
  190. blob = self.repo.object_store[entry.sha]
  191. # Should be uppercase with LF endings
  192. self.assertEqual(blob.data, b"HELLO\nWORLD\n")
  193. def test_blob_normalizer_integration(self) -> None:
  194. """Test that get_blob_normalizer returns a FilterBlobNormalizer."""
  195. normalizer = self.repo.get_blob_normalizer()
  196. # Check it's the right type
  197. from dulwich.filters import FilterBlobNormalizer
  198. self.assertIsInstance(normalizer, FilterBlobNormalizer)
  199. # Check it has access to gitattributes
  200. self.assertIsNotNone(normalizer.gitattributes)
  201. self.assertIsNotNone(normalizer.filter_registry)
  202. def test_required_filter_missing(self) -> None:
  203. """Test that missing required filter raises an error."""
  204. # Create .gitattributes with required filter
  205. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  206. with open(gitattributes_path, "wb") as f:
  207. f.write(b"*.secret filter=required_filter\n")
  208. # Configure filter as required but without commands
  209. config = self.repo.get_config()
  210. config.set((b"filter", b"required_filter"), b"required", b"true")
  211. config.write_to_path()
  212. # Add .gitattributes
  213. porcelain.add(self.repo, paths=[".gitattributes"])
  214. # Create file that would use the filter
  215. secret_file = os.path.join(self.test_dir, "test.secret")
  216. with open(secret_file, "wb") as f:
  217. f.write(b"test content\n")
  218. # Adding file should raise error due to missing required filter
  219. with self.assertRaises(FilterError) as cm:
  220. porcelain.add(self.repo, paths=["test.secret"])
  221. self.assertIn(
  222. "Required filter 'required_filter' is not available", str(cm.exception)
  223. )
  224. def test_required_filter_clean_command_fails(self) -> None:
  225. """Test that required filter failure during clean raises an error."""
  226. # Create .gitattributes with required filter
  227. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  228. with open(gitattributes_path, "wb") as f:
  229. f.write(b"*.secret filter=failing_filter\n")
  230. # Configure filter as required with failing command
  231. config = self.repo.get_config()
  232. config.set(
  233. (b"filter", b"failing_filter"), b"clean", b"false"
  234. ) # false command always fails
  235. config.set((b"filter", b"failing_filter"), b"required", b"true")
  236. config.write_to_path()
  237. # Add .gitattributes
  238. porcelain.add(self.repo, paths=[".gitattributes"])
  239. # Create file that would use the filter
  240. secret_file = os.path.join(self.test_dir, "test.secret")
  241. with open(secret_file, "wb") as f:
  242. f.write(b"test content\n")
  243. # Adding file should raise error due to failing required filter
  244. with self.assertRaises(FilterError) as cm:
  245. porcelain.add(self.repo, paths=["test.secret"])
  246. self.assertIn("Required clean filter failed", str(cm.exception))
  247. def test_required_filter_success(self) -> None:
  248. """Test that required filter works when properly configured."""
  249. # Create .gitattributes with required filter
  250. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  251. with open(gitattributes_path, "wb") as f:
  252. f.write(b"*.secret filter=working_filter\n")
  253. # Configure filter as required with working command
  254. config = self.repo.get_config()
  255. config.set(
  256. (b"filter", b"working_filter"), b"clean", b"tr 'a-z' 'A-Z'"
  257. ) # uppercase
  258. config.set((b"filter", b"working_filter"), b"required", b"true")
  259. config.write_to_path()
  260. # Add .gitattributes
  261. porcelain.add(self.repo, paths=[".gitattributes"])
  262. # Create file that would use the filter
  263. secret_file = os.path.join(self.test_dir, "test.secret")
  264. with open(secret_file, "wb") as f:
  265. f.write(b"hello world\n")
  266. # Adding file should work and apply filter
  267. porcelain.add(self.repo, paths=["test.secret"])
  268. # Check that content was filtered
  269. index = self.repo.open_index()
  270. entry = index[b"test.secret"]
  271. blob = self.repo.object_store[entry.sha]
  272. self.assertEqual(blob.data, b"HELLO WORLD\n")
  273. def test_optional_filter_failure_fallback(self) -> None:
  274. """Test that optional filter failure falls back to original data."""
  275. # Create .gitattributes with optional filter
  276. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  277. with open(gitattributes_path, "wb") as f:
  278. f.write(b"*.txt filter=optional_filter\n")
  279. # Configure filter as optional (required=false) with failing command
  280. config = self.repo.get_config()
  281. config.set(
  282. (b"filter", b"optional_filter"), b"clean", b"false"
  283. ) # false command always fails
  284. config.set((b"filter", b"optional_filter"), b"required", b"false")
  285. config.write_to_path()
  286. # Add .gitattributes
  287. porcelain.add(self.repo, paths=[".gitattributes"])
  288. # Create file that would use the filter
  289. test_file = os.path.join(self.test_dir, "test.txt")
  290. with open(test_file, "wb") as f:
  291. f.write(b"test content\n")
  292. # Adding file should work and fallback to original content
  293. porcelain.add(self.repo, paths=["test.txt"])
  294. # Check that original content was preserved
  295. index = self.repo.open_index()
  296. entry = index[b"test.txt"]
  297. blob = self.repo.object_store[entry.sha]
  298. self.assertEqual(blob.data, b"test content\n")
  299. class ProcessFilterDriverTests(TestCase):
  300. """Tests for ProcessFilterDriver with real process filter."""
  301. def setUp(self):
  302. super().setUp()
  303. # Create a temporary test filter process dynamically
  304. self.test_filter_path = self._create_test_filter()
  305. def tearDown(self):
  306. # Clean up the test filter
  307. if hasattr(self, "test_filter_path") and os.path.exists(self.test_filter_path):
  308. os.unlink(self.test_filter_path)
  309. super().tearDown()
  310. def _create_test_filter(self):
  311. """Create a simple test filter process that works on all platforms."""
  312. import tempfile
  313. # Create filter script that uppercases on clean, lowercases on smudge
  314. filter_script = """import sys
  315. import os
  316. # Simple filter that doesn't use any external dependencies
  317. def read_exact(n):
  318. data = b""
  319. while len(data) < n:
  320. chunk = sys.stdin.buffer.read(n - len(data))
  321. if not chunk:
  322. break
  323. data += chunk
  324. return data
  325. def write_pkt(data):
  326. if data is None:
  327. sys.stdout.buffer.write(b"0000")
  328. else:
  329. length = len(data) + 4
  330. sys.stdout.buffer.write(("{:04x}".format(length)).encode())
  331. sys.stdout.buffer.write(data)
  332. sys.stdout.buffer.flush()
  333. def read_pkt():
  334. size_bytes = read_exact(4)
  335. if not size_bytes:
  336. return None
  337. size = int(size_bytes.decode(), 16)
  338. if size == 0:
  339. return None
  340. return read_exact(size - 4)
  341. # Handshake
  342. client_hello = read_pkt()
  343. version = read_pkt()
  344. flush = read_pkt()
  345. write_pkt(b"git-filter-server")
  346. write_pkt(b"version=2")
  347. write_pkt(None)
  348. # Read and echo capabilities
  349. caps = []
  350. while True:
  351. cap = read_pkt()
  352. if cap is None:
  353. break
  354. caps.append(cap)
  355. for cap in caps:
  356. write_pkt(cap)
  357. write_pkt(None)
  358. # Process commands
  359. while True:
  360. headers = {}
  361. while True:
  362. line = read_pkt()
  363. if line is None:
  364. break
  365. if b"=" in line:
  366. k, v = line.split(b"=", 1)
  367. headers[k.decode()] = v.decode()
  368. if not headers:
  369. break
  370. # Read data
  371. data_chunks = []
  372. while True:
  373. chunk = read_pkt()
  374. if chunk is None:
  375. break
  376. data_chunks.append(chunk)
  377. data = b"".join(data_chunks)
  378. # Process (uppercase for clean, lowercase for smudge)
  379. if headers.get("command") == "clean":
  380. result = data.upper()
  381. elif headers.get("command") == "smudge":
  382. result = data.lower()
  383. else:
  384. result = data
  385. # Send response
  386. write_pkt(b"status=success")
  387. write_pkt(None)
  388. # Send result
  389. chunk_size = 65516
  390. for i in range(0, len(result), chunk_size):
  391. write_pkt(result[i:i+chunk_size])
  392. write_pkt(None)
  393. # Send final headers (empty list to keep status=success)
  394. write_pkt(None)
  395. """
  396. # Create temporary file
  397. fd, path = tempfile.mkstemp(suffix=".py", prefix="test_filter_")
  398. try:
  399. os.write(fd, filter_script.encode())
  400. os.close(fd)
  401. # Make executable on Unix-like systems
  402. if os.name != "nt": # Not Windows
  403. os.chmod(path, 0o755)
  404. return path
  405. except:
  406. if os.path.exists(path):
  407. os.unlink(path)
  408. raise
  409. def test_process_filter_clean_operation(self):
  410. """Test clean operation using real process filter."""
  411. import sys
  412. driver = ProcessFilterDriver(
  413. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  414. )
  415. test_data = b"hello world"
  416. result = driver.clean(test_data)
  417. # Our test filter uppercases on clean
  418. self.assertEqual(result, b"HELLO WORLD")
  419. def test_process_filter_smudge_operation(self):
  420. """Test smudge operation using real process filter."""
  421. import sys
  422. driver = ProcessFilterDriver(
  423. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  424. )
  425. test_data = b"HELLO WORLD"
  426. result = driver.smudge(test_data, b"test.txt")
  427. # Our test filter lowercases on smudge
  428. self.assertEqual(result, b"hello world")
  429. def test_process_filter_large_data(self):
  430. """Test process filter with data larger than single pkt-line."""
  431. import sys
  432. driver = ProcessFilterDriver(
  433. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  434. )
  435. # Create data larger than max pkt-line payload (65516 bytes)
  436. test_data = b"a" * 70000
  437. result = driver.clean(test_data)
  438. # Should be uppercased
  439. self.assertEqual(result, b"A" * 70000)
  440. def test_fallback_to_individual_commands(self):
  441. """Test fallback when process filter fails."""
  442. driver = ProcessFilterDriver(
  443. clean_cmd="tr '[:lower:]' '[:upper:]'", # Shell command to uppercase
  444. process_cmd="/nonexistent/command", # This should fail
  445. required=False,
  446. )
  447. test_data = b"hello world\n"
  448. result = driver.clean(test_data)
  449. # Should fallback to tr command and uppercase
  450. self.assertEqual(result, b"HELLO WORLD\n")
  451. def test_process_reuse(self):
  452. """Test that process is reused across multiple operations."""
  453. import sys
  454. driver = ProcessFilterDriver(
  455. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  456. )
  457. # First operation
  458. result1 = driver.clean(b"test1")
  459. self.assertEqual(result1, b"TEST1")
  460. # Second operation should reuse the same process
  461. result2 = driver.clean(b"test2")
  462. self.assertEqual(result2, b"TEST2")
  463. # Process should still be alive
  464. self.assertIsNotNone(driver._process)
  465. self.assertIsNone(driver._process.poll()) # None means still running
  466. def test_error_handling_invalid_command(self):
  467. """Test error handling with invalid filter command."""
  468. driver = ProcessFilterDriver(process_cmd="/nonexistent/command", required=True)
  469. with self.assertRaises(FilterError) as cm:
  470. driver.clean(b"test data")
  471. self.assertIn("Failed to start process filter", str(cm.exception))
  472. class FilterContextTests(TestCase):
  473. """Tests for FilterContext class."""
  474. def test_filter_context_caches_long_running_drivers(self):
  475. """Test that FilterContext caches only long-running drivers."""
  476. # Create real filter drivers
  477. class UppercaseFilter:
  478. def clean(self, data):
  479. return data.upper()
  480. def smudge(self, data, path=b""):
  481. return data.lower()
  482. def cleanup(self):
  483. pass
  484. def reuse(self, config, filter_name):
  485. # Pretend it's a long-running filter that should be cached
  486. return True
  487. class IdentityFilter:
  488. def clean(self, data):
  489. return data
  490. def smudge(self, data, path=b""):
  491. return data
  492. def cleanup(self):
  493. pass
  494. def reuse(self, config, filter_name):
  495. # Lightweight filter, don't cache
  496. return False
  497. # Create registry and context
  498. # Need to provide a config for caching to work
  499. from dulwich.config import ConfigDict
  500. config = ConfigDict()
  501. # Add some dummy config to make it truthy (use proper format)
  502. config.set((b"filter", b"uppercase"), b"clean", b"dummy")
  503. registry = FilterRegistry(config=config)
  504. context = FilterContext(registry)
  505. # Register drivers
  506. long_running = UppercaseFilter()
  507. stateless = IdentityFilter()
  508. registry.register_driver("uppercase", long_running)
  509. registry.register_driver("identity", stateless)
  510. # Get drivers through context
  511. driver1 = context.get_driver("uppercase")
  512. driver2 = context.get_driver("uppercase")
  513. # Long-running driver should be cached
  514. self.assertIs(driver1, driver2)
  515. self.assertIs(driver1, long_running)
  516. # Get stateless driver
  517. stateless1 = context.get_driver("identity")
  518. stateless2 = context.get_driver("identity")
  519. # Stateless driver comes from registry but isn't cached in context
  520. self.assertIs(stateless1, stateless)
  521. self.assertIs(stateless2, stateless)
  522. self.assertNotIn("identity", context._active_drivers)
  523. self.assertIn("uppercase", context._active_drivers)
  524. def test_filter_context_cleanup(self):
  525. """Test that FilterContext properly cleans up resources."""
  526. cleanup_called = []
  527. class TrackableFilter:
  528. def __init__(self, name):
  529. self.name = name
  530. def clean(self, data):
  531. return data
  532. def smudge(self, data, path=b""):
  533. return data
  534. def cleanup(self):
  535. cleanup_called.append(self.name)
  536. def is_long_running(self):
  537. return True
  538. # Create registry and context
  539. registry = FilterRegistry()
  540. context = FilterContext(registry)
  541. # Register and use drivers
  542. filter1 = TrackableFilter("filter1")
  543. filter2 = TrackableFilter("filter2")
  544. filter3 = TrackableFilter("filter3")
  545. registry.register_driver("filter1", filter1)
  546. registry.register_driver("filter2", filter2)
  547. registry.register_driver("filter3", filter3)
  548. # Get only some drivers to cache them
  549. context.get_driver("filter1")
  550. context.get_driver("filter2")
  551. # Don't get filter3
  552. # Close context
  553. context.close()
  554. # Verify cleanup was called for all drivers (context closes registry too)
  555. self.assertEqual(set(cleanup_called), {"filter1", "filter2", "filter3"})
  556. def test_filter_context_get_driver_returns_none_for_missing(self):
  557. """Test that get_driver returns None for non-existent drivers."""
  558. registry = FilterRegistry()
  559. context = FilterContext(registry)
  560. result = context.get_driver("nonexistent")
  561. self.assertIsNone(result)
  562. def test_filter_context_with_real_process_filter(self):
  563. """Test FilterContext with real ProcessFilterDriver instances."""
  564. import sys
  565. # Use existing test filter from ProcessFilterDriverTests
  566. test_dir = tempfile.mkdtemp()
  567. self.addCleanup(lambda: __import__("shutil").rmtree(test_dir))
  568. # Create a simple test filter that just passes data through
  569. filter_script = """import sys
  570. while True:
  571. line = sys.stdin.buffer.read()
  572. if not line:
  573. break
  574. sys.stdout.buffer.write(line)
  575. sys.stdout.buffer.flush()
  576. """
  577. filter_path = os.path.join(test_dir, "simple_filter.py")
  578. with open(filter_path, "w") as f:
  579. f.write(filter_script)
  580. # Create ProcessFilterDriver instances
  581. # One with process_cmd (long-running)
  582. process_driver = ProcessFilterDriver(
  583. process_cmd=None, # Don't use actual process to avoid complexity
  584. clean_cmd=f"{sys.executable} {filter_path}",
  585. smudge_cmd=f"{sys.executable} {filter_path}",
  586. )
  587. # Register in context
  588. from dulwich.config import ConfigDict
  589. config = ConfigDict()
  590. # Add some dummy config to make it truthy (use proper format)
  591. config.set(
  592. (b"filter", b"process"),
  593. b"clean",
  594. f"{sys.executable} {filter_path}".encode(),
  595. )
  596. config.set(
  597. (b"filter", b"process"),
  598. b"smudge",
  599. f"{sys.executable} {filter_path}".encode(),
  600. )
  601. registry = FilterRegistry(config=config)
  602. context = FilterContext(registry)
  603. registry.register_driver("process", process_driver)
  604. # Get driver - should not be cached since it's not long-running
  605. driver1 = context.get_driver("process")
  606. self.assertIsNotNone(driver1)
  607. # Check that it's not a long-running process (no process_cmd)
  608. self.assertIsNone(driver1.process_cmd)
  609. self.assertNotIn("process", context._active_drivers)
  610. # Test with a long-running driver that should be cached
  611. # Create a mock driver that always wants to be reused
  612. class CacheableProcessDriver:
  613. def __init__(self):
  614. self.process_cmd = "dummy"
  615. self.clean_cmd = None
  616. self.smudge_cmd = None
  617. self.required = False
  618. def clean(self, data):
  619. return data
  620. def smudge(self, data, path=b""):
  621. return data
  622. def cleanup(self):
  623. pass
  624. def reuse(self, config, filter_name):
  625. # This driver always wants to be cached (simulates a long-running process)
  626. return True
  627. cacheable_driver = CacheableProcessDriver()
  628. registry.register_driver("long_process", cacheable_driver)
  629. driver2 = context.get_driver("long_process")
  630. # Check that it has a process_cmd (long-running)
  631. self.assertIsNotNone(driver2.process_cmd)
  632. self.assertIn("long_process", context._active_drivers)
  633. context.close()
  634. def test_filter_context_closes_registry(self):
  635. """Test that closing FilterContext also closes the registry."""
  636. # Track if registry.close() is called
  637. registry_closed = []
  638. class TrackingRegistry(FilterRegistry):
  639. def close(self):
  640. registry_closed.append(True)
  641. super().close()
  642. registry = TrackingRegistry()
  643. context = FilterContext(registry)
  644. # Close context should also close registry
  645. context.close()
  646. self.assertTrue(registry_closed)
  647. class ProcessFilterProtocolTests(TestCase):
  648. """Tests for ProcessFilterDriver protocol compliance."""
  649. def setUp(self):
  650. super().setUp()
  651. # Create a spec-compliant test filter process dynamically
  652. self.test_filter_path = self._create_spec_compliant_filter()
  653. def tearDown(self):
  654. # Clean up the test filter
  655. if hasattr(self, "test_filter_path") and os.path.exists(self.test_filter_path):
  656. os.unlink(self.test_filter_path)
  657. super().tearDown()
  658. def _create_spec_compliant_filter(self):
  659. """Create a spec-compliant test filter that works on all platforms."""
  660. import tempfile
  661. # This filter strictly follows Git spec - no newlines in packets
  662. filter_script = """import sys
  663. def read_exact(n):
  664. data = b""
  665. while len(data) < n:
  666. chunk = sys.stdin.buffer.read(n - len(data))
  667. if not chunk:
  668. break
  669. data += chunk
  670. return data
  671. def write_pkt(data):
  672. if data is None:
  673. sys.stdout.buffer.write(b"0000")
  674. else:
  675. length = len(data) + 4
  676. sys.stdout.buffer.write(("{:04x}".format(length)).encode())
  677. sys.stdout.buffer.write(data)
  678. sys.stdout.buffer.flush()
  679. def read_pkt():
  680. size_bytes = read_exact(4)
  681. if not size_bytes:
  682. return None
  683. size = int(size_bytes.decode(), 16)
  684. if size == 0:
  685. return None
  686. return read_exact(size - 4)
  687. # Handshake - exact format, no newlines
  688. client_hello = read_pkt()
  689. version = read_pkt()
  690. flush = read_pkt()
  691. if client_hello != b"git-filter-client":
  692. sys.exit(1)
  693. if version != b"version=2":
  694. sys.exit(1)
  695. write_pkt(b"git-filter-server") # No newline
  696. write_pkt(b"version=2") # No newline
  697. write_pkt(None)
  698. # Read and echo capabilities
  699. caps = []
  700. while True:
  701. cap = read_pkt()
  702. if cap is None:
  703. break
  704. caps.append(cap)
  705. for cap in caps:
  706. if cap in [b"capability=clean", b"capability=smudge"]:
  707. write_pkt(cap)
  708. write_pkt(None)
  709. # Process commands
  710. while True:
  711. headers = {}
  712. while True:
  713. line = read_pkt()
  714. if line is None:
  715. break
  716. if b"=" in line:
  717. k, v = line.split(b"=", 1)
  718. headers[k.decode()] = v.decode()
  719. if not headers:
  720. break
  721. # Read data
  722. data_chunks = []
  723. while True:
  724. chunk = read_pkt()
  725. if chunk is None:
  726. break
  727. data_chunks.append(chunk)
  728. data = b"".join(data_chunks)
  729. # Process
  730. if headers.get("command") == "clean":
  731. result = data.upper()
  732. elif headers.get("command") == "smudge":
  733. result = data.lower()
  734. else:
  735. result = data
  736. # Send response
  737. write_pkt(b"status=success")
  738. write_pkt(None)
  739. # Send result
  740. chunk_size = 65516
  741. for i in range(0, len(result), chunk_size):
  742. write_pkt(result[i:i+chunk_size])
  743. write_pkt(None)
  744. # Send final headers (empty list to keep status=success)
  745. write_pkt(None)
  746. """
  747. fd, path = tempfile.mkstemp(suffix=".py", prefix="test_filter_spec_")
  748. try:
  749. os.write(fd, filter_script.encode())
  750. os.close(fd)
  751. if os.name != "nt": # Not Windows
  752. os.chmod(path, 0o755)
  753. return path
  754. except:
  755. if os.path.exists(path):
  756. os.unlink(path)
  757. raise
  758. def test_protocol_handshake_exact_format(self):
  759. """Test that handshake uses exact format without newlines."""
  760. import sys
  761. driver = ProcessFilterDriver(
  762. process_cmd=f"{sys.executable} {self.test_filter_path}",
  763. required=True, # Require success to test protocol compliance
  764. )
  765. # This should work with exact protocol format
  766. test_data = b"hello world"
  767. result = driver.clean(test_data)
  768. # Our test filter uppercases on clean
  769. self.assertEqual(result, b"HELLO WORLD")
  770. def test_capability_negotiation_exact_format(self):
  771. """Test that capabilities are sent and received in exact format."""
  772. import sys
  773. driver = ProcessFilterDriver(
  774. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  775. )
  776. # Force capability negotiation by using both clean and smudge
  777. clean_result = driver.clean(b"test")
  778. smudge_result = driver.smudge(b"TEST", b"test.txt")
  779. self.assertEqual(clean_result, b"TEST")
  780. self.assertEqual(smudge_result, b"test")
  781. def test_binary_data_handling(self):
  782. """Test handling of binary data through the protocol."""
  783. import sys
  784. driver = ProcessFilterDriver(
  785. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  786. )
  787. # Binary data with null bytes, high bytes, etc.
  788. binary_data = bytes(range(256))
  789. result = driver.clean(binary_data)
  790. # Should handle binary data without crashing
  791. self.assertIsInstance(result, bytes)
  792. # Our test filter uppercases bytes directly, which works for binary data
  793. # The fix ensures headers are kept as bytes, so binary content doesn't cause decode errors
  794. def test_binary_data_with_invalid_utf8_sequences(self):
  795. """Test handling of binary data with invalid UTF-8 sequences.
  796. Regression test for https://github.com/jelmer/dulwich/issues/2023
  797. where binary files (like .ogg, .jpg) caused UTF-8 decode errors.
  798. """
  799. import sys
  800. driver = ProcessFilterDriver(
  801. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  802. )
  803. # Create binary data with the specific byte that caused the issue (0xe5 at position 14)
  804. # plus other invalid UTF-8 sequences
  805. binary_data = b"some header \xe5\xff\xfe binary data"
  806. result = driver.clean(binary_data)
  807. # Should handle binary data without UTF-8 decode errors
  808. self.assertIsInstance(result, bytes)
  809. # The filter should process it successfully
  810. self.assertEqual(result, binary_data.upper())
  811. def test_large_file_chunking(self):
  812. """Test proper chunking of large files."""
  813. import sys
  814. driver = ProcessFilterDriver(
  815. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  816. )
  817. # Create data larger than max pkt-line payload (65516 bytes)
  818. large_data = b"a" * 100000
  819. result = driver.clean(large_data)
  820. # Should be properly processed (uppercased)
  821. expected = b"A" * 100000
  822. self.assertEqual(result, expected)
  823. def test_empty_file_handling(self):
  824. """Test handling of empty files."""
  825. import sys
  826. driver = ProcessFilterDriver(
  827. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  828. )
  829. result = driver.clean(b"")
  830. self.assertEqual(result, b"")
  831. def test_special_characters_in_pathname(self):
  832. """Test paths with special characters are handled correctly."""
  833. import sys
  834. driver = ProcessFilterDriver(
  835. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  836. )
  837. # Test various special characters in paths
  838. special_paths = [
  839. b"file with spaces.txt",
  840. b"path/with/slashes.txt",
  841. b"file=with=equals.txt",
  842. b"file\nwith\nnewlines.txt",
  843. ]
  844. test_data = b"test data"
  845. for path in special_paths:
  846. result = driver.smudge(test_data, path)
  847. self.assertEqual(result, b"test data")
  848. def test_process_crash_recovery(self):
  849. """Test that process is properly restarted after crash."""
  850. import sys
  851. driver = ProcessFilterDriver(
  852. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  853. )
  854. # First operation
  855. result = driver.clean(b"test1")
  856. self.assertEqual(result, b"TEST1")
  857. # Kill the process
  858. if driver._process:
  859. driver._process.kill()
  860. driver._process.wait()
  861. driver.cleanup()
  862. # Should restart and work again
  863. result = driver.clean(b"test2")
  864. self.assertEqual(result, b"TEST2")
  865. def test_malformed_process_response_handling(self):
  866. """Test handling of malformed responses from process."""
  867. # Create a filter that sends malformed responses
  868. malformed_filter = """#!/usr/bin/env python3
  869. import sys
  870. import os
  871. sys.path.insert(0, os.path.dirname(__file__))
  872. from dulwich.protocol import Protocol
  873. protocol = Protocol(
  874. lambda n: sys.stdin.buffer.read(n),
  875. lambda d: sys.stdout.buffer.write(d) or len(d)
  876. )
  877. # Read handshake
  878. protocol.read_pkt_line()
  879. protocol.read_pkt_line()
  880. protocol.read_pkt_line()
  881. # Send invalid handshake
  882. protocol.write_pkt_line(b"invalid-welcome")
  883. protocol.write_pkt_line(b"version=2")
  884. protocol.write_pkt_line(None)
  885. """
  886. import tempfile
  887. fd, script_path = tempfile.mkstemp(suffix=".py")
  888. try:
  889. os.write(fd, malformed_filter.encode())
  890. os.close(fd)
  891. os.chmod(script_path, 0o755)
  892. driver = ProcessFilterDriver(
  893. process_cmd=f"python3 {script_path}",
  894. clean_cmd="cat", # Fallback
  895. required=False,
  896. )
  897. # Should fallback to clean_cmd when process fails
  898. result = driver.clean(b"test data")
  899. self.assertEqual(result, b"test data")
  900. finally:
  901. os.unlink(script_path)
  902. def test_concurrent_filter_operations(self):
  903. """Test that concurrent operations work correctly."""
  904. import sys
  905. driver = ProcessFilterDriver(
  906. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  907. )
  908. results = []
  909. errors = []
  910. def worker(data):
  911. try:
  912. result = driver.clean(data)
  913. results.append(result)
  914. except Exception as e:
  915. errors.append(e)
  916. # Start 5 concurrent operations
  917. threads = []
  918. test_data = [f"test{i}".encode() for i in range(5)]
  919. for data in test_data:
  920. t = threading.Thread(target=worker, args=(data,))
  921. threads.append(t)
  922. t.start()
  923. for t in threads:
  924. t.join()
  925. # Should have no errors
  926. self.assertEqual(len(errors), 0, f"Errors: {errors}")
  927. self.assertEqual(len(results), 5)
  928. # All results should be uppercase versions
  929. expected = [data.upper() for data in test_data]
  930. self.assertEqual(sorted(results), sorted(expected))
  931. def test_process_resource_cleanup(self):
  932. """Test that process resources are properly cleaned up."""
  933. import sys
  934. driver = ProcessFilterDriver(
  935. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  936. )
  937. # Use the driver
  938. result = driver.clean(b"test")
  939. self.assertEqual(result, b"TEST")
  940. # Process should be running
  941. self.assertIsNotNone(driver._process)
  942. self.assertIsNone(driver._process.poll()) # None means still running
  943. # Remember the old process to check it was terminated
  944. old_process = driver._process
  945. # Manually clean up (simulates __del__)
  946. driver.cleanup()
  947. # Process reference should be cleared
  948. self.assertIsNone(driver._process)
  949. self.assertIsNone(driver._protocol)
  950. # Old process should be terminated
  951. self.assertIsNotNone(old_process.poll()) # Not None means terminated
  952. def test_required_filter_error_propagation(self):
  953. """Test that errors are properly propagated when filter is required."""
  954. driver = ProcessFilterDriver(
  955. process_cmd="/definitely/nonexistent/command", required=True
  956. )
  957. with self.assertRaises(FilterError) as cm:
  958. driver.clean(b"test data")
  959. self.assertIn("Failed to start process filter", str(cm.exception))
  960. def test_two_phase_response_protocol(self):
  961. """Test filter protocol with two-phase response (initial + final headers).
  962. This test verifies that the filter correctly handles the Git LFS protocol
  963. where filters send:
  964. 1. Initial headers with status
  965. 2. Content data
  966. 3. Final headers with status
  967. This is the format used by git-lfs and documented in the Git filter protocol.
  968. """
  969. import sys
  970. import tempfile
  971. # Create a filter that follows the two-phase protocol
  972. filter_script = """import sys
  973. def read_exact(n):
  974. data = b""
  975. while len(data) < n:
  976. chunk = sys.stdin.buffer.read(n - len(data))
  977. if not chunk:
  978. break
  979. data += chunk
  980. return data
  981. def write_pkt(data):
  982. if data is None:
  983. sys.stdout.buffer.write(b"0000")
  984. else:
  985. length = len(data) + 4
  986. sys.stdout.buffer.write(("{:04x}".format(length)).encode())
  987. sys.stdout.buffer.write(data)
  988. sys.stdout.buffer.flush()
  989. def read_pkt():
  990. size_bytes = read_exact(4)
  991. if not size_bytes:
  992. return None
  993. size = int(size_bytes.decode(), 16)
  994. if size == 0:
  995. return None
  996. return read_exact(size - 4)
  997. # Handshake
  998. client_hello = read_pkt()
  999. version = read_pkt()
  1000. flush = read_pkt()
  1001. write_pkt(b"git-filter-server")
  1002. write_pkt(b"version=2")
  1003. write_pkt(None)
  1004. # Read and echo capabilities
  1005. caps = []
  1006. while True:
  1007. cap = read_pkt()
  1008. if cap is None:
  1009. break
  1010. caps.append(cap)
  1011. for cap in caps:
  1012. write_pkt(cap)
  1013. write_pkt(None)
  1014. # Process commands
  1015. while True:
  1016. headers = {}
  1017. while True:
  1018. line = read_pkt()
  1019. if line is None:
  1020. break
  1021. if b"=" in line:
  1022. k, v = line.split(b"=", 1)
  1023. headers[k.decode()] = v.decode()
  1024. if not headers:
  1025. break
  1026. # Read data
  1027. data_chunks = []
  1028. while True:
  1029. chunk = read_pkt()
  1030. if chunk is None:
  1031. break
  1032. data_chunks.append(chunk)
  1033. data = b"".join(data_chunks)
  1034. # Process
  1035. if headers.get("command") == "clean":
  1036. result = data.upper()
  1037. elif headers.get("command") == "smudge":
  1038. result = data.lower()
  1039. else:
  1040. result = data
  1041. # TWO-PHASE RESPONSE: Send initial headers
  1042. write_pkt(b"status=success")
  1043. write_pkt(None)
  1044. # Send result data
  1045. chunk_size = 65516
  1046. for i in range(0, len(result), chunk_size):
  1047. write_pkt(result[i:i+chunk_size])
  1048. write_pkt(None)
  1049. # TWO-PHASE RESPONSE: Send final headers (empty list to keep status=success)
  1050. write_pkt(None)
  1051. """
  1052. fd, filter_path = tempfile.mkstemp(
  1053. suffix=".py", prefix="test_filter_two_phase_"
  1054. )
  1055. try:
  1056. os.write(fd, filter_script.encode())
  1057. os.close(fd)
  1058. if os.name != "nt":
  1059. os.chmod(filter_path, 0o755)
  1060. driver = ProcessFilterDriver(
  1061. process_cmd=f"{sys.executable} {filter_path}", required=True
  1062. )
  1063. # Test clean operation
  1064. test_data = b"hello world"
  1065. result = driver.clean(test_data)
  1066. self.assertEqual(result, b"HELLO WORLD")
  1067. # Test smudge operation
  1068. result = driver.smudge(b"HELLO WORLD", b"test.txt")
  1069. self.assertEqual(result, b"hello world")
  1070. driver.cleanup()
  1071. finally:
  1072. if os.path.exists(filter_path):
  1073. os.unlink(filter_path)
  1074. def test_two_phase_response_with_status_messages(self):
  1075. """Test filter that sends status messages in final headers.
  1076. Some filters (like git-lfs) may send progress or status messages
  1077. in the final headers. This test verifies that we can handle those.
  1078. """
  1079. import sys
  1080. import tempfile
  1081. # Create a filter that sends extra status info in final headers
  1082. filter_script = """import sys
  1083. def read_exact(n):
  1084. data = b""
  1085. while len(data) < n:
  1086. chunk = sys.stdin.buffer.read(n - len(data))
  1087. if not chunk:
  1088. break
  1089. data += chunk
  1090. return data
  1091. def write_pkt(data):
  1092. if data is None:
  1093. sys.stdout.buffer.write(b"0000")
  1094. else:
  1095. length = len(data) + 4
  1096. sys.stdout.buffer.write(("{:04x}".format(length)).encode())
  1097. sys.stdout.buffer.write(data)
  1098. sys.stdout.buffer.flush()
  1099. def read_pkt():
  1100. size_bytes = read_exact(4)
  1101. if not size_bytes:
  1102. return None
  1103. size = int(size_bytes.decode(), 16)
  1104. if size == 0:
  1105. return None
  1106. return read_exact(size - 4)
  1107. # Handshake
  1108. client_hello = read_pkt()
  1109. version = read_pkt()
  1110. flush = read_pkt()
  1111. write_pkt(b"git-filter-server")
  1112. write_pkt(b"version=2")
  1113. write_pkt(None)
  1114. # Read and echo capabilities
  1115. caps = []
  1116. while True:
  1117. cap = read_pkt()
  1118. if cap is None:
  1119. break
  1120. caps.append(cap)
  1121. for cap in caps:
  1122. write_pkt(cap)
  1123. write_pkt(None)
  1124. # Process commands
  1125. while True:
  1126. headers = {}
  1127. while True:
  1128. line = read_pkt()
  1129. if line is None:
  1130. break
  1131. if b"=" in line:
  1132. k, v = line.split(b"=", 1)
  1133. headers[k.decode()] = v.decode()
  1134. if not headers:
  1135. break
  1136. # Read data
  1137. data_chunks = []
  1138. while True:
  1139. chunk = read_pkt()
  1140. if chunk is None:
  1141. break
  1142. data_chunks.append(chunk)
  1143. data = b"".join(data_chunks)
  1144. # Process
  1145. result = data.upper()
  1146. # Send initial headers
  1147. write_pkt(b"status=success")
  1148. write_pkt(None)
  1149. # Send result data
  1150. chunk_size = 65516
  1151. for i in range(0, len(result), chunk_size):
  1152. write_pkt(result[i:i+chunk_size])
  1153. write_pkt(None)
  1154. # Send final headers with progress messages (like git-lfs does)
  1155. write_pkt(b"status=success")
  1156. write_pkt(None)
  1157. """
  1158. fd, filter_path = tempfile.mkstemp(suffix=".py", prefix="test_filter_status_")
  1159. try:
  1160. os.write(fd, filter_script.encode())
  1161. os.close(fd)
  1162. if os.name != "nt":
  1163. os.chmod(filter_path, 0o755)
  1164. driver = ProcessFilterDriver(
  1165. process_cmd=f"{sys.executable} {filter_path}", required=True
  1166. )
  1167. # Test clean operation with status messages
  1168. test_data = b"test data with status"
  1169. result = driver.clean(test_data)
  1170. self.assertEqual(result, b"TEST DATA WITH STATUS")
  1171. driver.cleanup()
  1172. finally:
  1173. if os.path.exists(filter_path):
  1174. os.unlink(filter_path)
  1175. def test_two_phase_response_with_final_error(self):
  1176. """Test filter that reports error in final headers.
  1177. The Git protocol allows filters to report success initially,
  1178. then report an error in the final headers. This test ensures
  1179. we handle that correctly.
  1180. """
  1181. import sys
  1182. import tempfile
  1183. # Create a filter that sends error in final headers
  1184. filter_script = """import sys
  1185. def read_exact(n):
  1186. data = b""
  1187. while len(data) < n:
  1188. chunk = sys.stdin.buffer.read(n - len(data))
  1189. if not chunk:
  1190. break
  1191. data += chunk
  1192. return data
  1193. def write_pkt(data):
  1194. if data is None:
  1195. sys.stdout.buffer.write(b"0000")
  1196. else:
  1197. length = len(data) + 4
  1198. sys.stdout.buffer.write(("{:04x}".format(length)).encode())
  1199. sys.stdout.buffer.write(data)
  1200. sys.stdout.buffer.flush()
  1201. def read_pkt():
  1202. size_bytes = read_exact(4)
  1203. if not size_bytes:
  1204. return None
  1205. size = int(size_bytes.decode(), 16)
  1206. if size == 0:
  1207. return None
  1208. return read_exact(size - 4)
  1209. # Handshake
  1210. client_hello = read_pkt()
  1211. version = read_pkt()
  1212. flush = read_pkt()
  1213. write_pkt(b"git-filter-server")
  1214. write_pkt(b"version=2")
  1215. write_pkt(None)
  1216. # Read and echo capabilities
  1217. caps = []
  1218. while True:
  1219. cap = read_pkt()
  1220. if cap is None:
  1221. break
  1222. caps.append(cap)
  1223. for cap in caps:
  1224. write_pkt(cap)
  1225. write_pkt(None)
  1226. # Process commands
  1227. while True:
  1228. headers = {}
  1229. while True:
  1230. line = read_pkt()
  1231. if line is None:
  1232. break
  1233. if b"=" in line:
  1234. k, v = line.split(b"=", 1)
  1235. headers[k.decode()] = v.decode()
  1236. if not headers:
  1237. break
  1238. # Read data
  1239. data_chunks = []
  1240. while True:
  1241. chunk = read_pkt()
  1242. if chunk is None:
  1243. break
  1244. data_chunks.append(chunk)
  1245. data = b"".join(data_chunks)
  1246. # Send initial headers with success
  1247. write_pkt(b"status=success")
  1248. write_pkt(None)
  1249. # Send partial result
  1250. write_pkt(b"PARTIAL")
  1251. write_pkt(None)
  1252. # Send final headers with error (simulating processing failure)
  1253. write_pkt(b"status=error")
  1254. write_pkt(None)
  1255. """
  1256. fd, filter_path = tempfile.mkstemp(suffix=".py", prefix="test_filter_error_")
  1257. try:
  1258. os.write(fd, filter_script.encode())
  1259. os.close(fd)
  1260. if os.name != "nt":
  1261. os.chmod(filter_path, 0o755)
  1262. driver = ProcessFilterDriver(
  1263. process_cmd=f"{sys.executable} {filter_path}", required=True
  1264. )
  1265. # Should raise FilterError due to final status being error
  1266. with self.assertRaises(FilterError) as cm:
  1267. driver.clean(b"test data")
  1268. self.assertIn("final status: error", str(cm.exception))
  1269. driver.cleanup()
  1270. finally:
  1271. if os.path.exists(filter_path):
  1272. os.unlink(filter_path)