test_filters.py 49 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635
  1. # test_filters.py -- Tests for filters
  2. # Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for filters."""
  22. import os
  23. import tempfile
  24. import threading
  25. from collections.abc import Iterator
  26. from contextlib import contextmanager
  27. from dulwich import porcelain
  28. from dulwich.filters import (
  29. FilterContext,
  30. FilterError,
  31. FilterRegistry,
  32. ProcessFilterDriver,
  33. )
  34. from dulwich.repo import Repo
  35. from . import TestCase
  36. class GitAttributesFilterIntegrationTests(TestCase):
  37. """Test gitattributes integration with filter drivers."""
  38. def setUp(self) -> None:
  39. super().setUp()
  40. self.test_dir = tempfile.mkdtemp()
  41. self.addCleanup(self._cleanup_test_dir)
  42. self.repo = Repo.init(self.test_dir)
  43. def _cleanup_test_dir(self) -> None:
  44. """Clean up test directory."""
  45. import shutil
  46. shutil.rmtree(self.test_dir)
  47. def test_gitattributes_text_filter(self) -> None:
  48. """Test that text attribute triggers line ending conversion."""
  49. # Configure autocrlf first
  50. config = self.repo.get_config()
  51. config.set((b"core",), b"autocrlf", b"true")
  52. config.write_to_path()
  53. # Create .gitattributes with text attribute
  54. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  55. with open(gitattributes_path, "wb") as f:
  56. f.write(b"*.txt text\n")
  57. f.write(b"*.bin -text\n")
  58. # Add .gitattributes
  59. porcelain.add(self.repo, paths=[".gitattributes"])
  60. porcelain.commit(self.repo, message=b"Add gitattributes")
  61. # Create text file with CRLF
  62. text_file = os.path.join(self.test_dir, "test.txt")
  63. with open(text_file, "wb") as f:
  64. f.write(b"line1\r\nline2\r\n")
  65. # Create binary file with CRLF
  66. bin_file = os.path.join(self.test_dir, "test.bin")
  67. with open(bin_file, "wb") as f:
  68. f.write(b"binary\r\ndata\r\n")
  69. # Add files
  70. porcelain.add(self.repo, paths=["test.txt", "test.bin"])
  71. # Check that text file was normalized
  72. index = self.repo.open_index()
  73. text_entry = index[b"test.txt"]
  74. text_blob = self.repo.object_store[text_entry.sha]
  75. self.assertEqual(text_blob.data, b"line1\nline2\n")
  76. # Check that binary file was not normalized
  77. bin_entry = index[b"test.bin"]
  78. bin_blob = self.repo.object_store[bin_entry.sha]
  79. self.assertEqual(bin_blob.data, b"binary\r\ndata\r\n")
  80. def test_gitattributes_custom_filter(self) -> None:
  81. """Test custom filter specified in gitattributes."""
  82. # Create a Python script that acts as our filter
  83. import sys
  84. filter_script = os.path.join(self.test_dir, "redact_filter.py")
  85. with open(filter_script, "w") as f:
  86. f.write(
  87. """#!/usr/bin/env python3
  88. import sys
  89. data = sys.stdin.buffer.read()
  90. # Replace all digits with X
  91. result = bytearray()
  92. for b in data:
  93. if chr(b).isdigit():
  94. result.append(ord('X'))
  95. else:
  96. result.append(b)
  97. sys.stdout.buffer.write(result)
  98. """
  99. )
  100. os.chmod(filter_script, 0o755)
  101. # Create .gitattributes with custom filter
  102. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  103. with open(gitattributes_path, "wb") as f:
  104. f.write(b"*.secret filter=redact\n")
  105. # Configure custom filter (use Python script for testing)
  106. config = self.repo.get_config()
  107. # This filter replaces all digits with X
  108. config.set(
  109. (b"filter", b"redact"),
  110. b"clean",
  111. f"{sys.executable} {filter_script}".encode(),
  112. )
  113. config.write_to_path()
  114. # Add .gitattributes
  115. porcelain.add(self.repo, paths=[".gitattributes"])
  116. # Create file with sensitive content
  117. secret_file = os.path.join(self.test_dir, "password.secret")
  118. with open(secret_file, "wb") as f:
  119. f.write(b"password123\ntoken456\n")
  120. # Add file
  121. porcelain.add(self.repo, paths=["password.secret"])
  122. # Check that content was filtered
  123. index = self.repo.open_index()
  124. entry = index[b"password.secret"]
  125. blob = self.repo.object_store[entry.sha]
  126. self.assertEqual(blob.data, b"passwordXXX\ntokenXXX\n")
  127. def test_gitattributes_from_tree(self) -> None:
  128. """Test that gitattributes from tree are used when no working tree exists."""
  129. # Create .gitattributes with text attribute
  130. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  131. with open(gitattributes_path, "wb") as f:
  132. f.write(b"*.txt text\n")
  133. # Add and commit .gitattributes
  134. porcelain.add(self.repo, paths=[".gitattributes"])
  135. porcelain.commit(self.repo, message=b"Add gitattributes")
  136. # Remove .gitattributes from working tree
  137. os.remove(gitattributes_path)
  138. # Get gitattributes - should still work from tree
  139. gitattributes = self.repo.get_gitattributes()
  140. attrs = gitattributes.match_path(b"test.txt")
  141. self.assertEqual(attrs.get(b"text"), True)
  142. def test_gitattributes_info_attributes(self) -> None:
  143. """Test that .git/info/attributes is read."""
  144. # Create info/attributes
  145. info_dir = os.path.join(self.repo.controldir(), "info")
  146. if not os.path.exists(info_dir):
  147. os.makedirs(info_dir)
  148. info_attrs_path = os.path.join(info_dir, "attributes")
  149. with open(info_attrs_path, "wb") as f:
  150. f.write(b"*.log text\n")
  151. # Get gitattributes
  152. gitattributes = self.repo.get_gitattributes()
  153. attrs = gitattributes.match_path(b"debug.log")
  154. self.assertEqual(attrs.get(b"text"), True)
  155. def test_filter_precedence(self) -> None:
  156. """Test that filter attribute takes precedence over text attribute."""
  157. # Create a Python script that converts to uppercase
  158. import sys
  159. filter_script = os.path.join(self.test_dir, "uppercase_filter.py")
  160. with open(filter_script, "w") as f:
  161. f.write(
  162. """#!/usr/bin/env python3
  163. import sys
  164. data = sys.stdin.buffer.read()
  165. # Convert bytes to string, uppercase, then back to bytes
  166. result = data.decode('utf-8', errors='replace').upper().encode('utf-8')
  167. sys.stdout.buffer.write(result)
  168. """
  169. )
  170. os.chmod(filter_script, 0o755)
  171. # Create .gitattributes with both text and filter
  172. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  173. with open(gitattributes_path, "wb") as f:
  174. f.write(b"*.txt text filter=custom\n")
  175. # Configure autocrlf and custom filter
  176. config = self.repo.get_config()
  177. config.set((b"core",), b"autocrlf", b"true")
  178. # This filter converts to uppercase
  179. config.set(
  180. (b"filter", b"custom"),
  181. b"clean",
  182. f"{sys.executable} {filter_script}".encode(),
  183. )
  184. config.write_to_path()
  185. # Add .gitattributes
  186. porcelain.add(self.repo, paths=[".gitattributes"])
  187. # Create text file with lowercase and CRLF
  188. text_file = os.path.join(self.test_dir, "test.txt")
  189. with open(text_file, "wb") as f:
  190. f.write(b"hello\r\nworld\r\n")
  191. # Add file
  192. porcelain.add(self.repo, paths=["test.txt"])
  193. # Check that custom filter was applied (not just line ending conversion)
  194. index = self.repo.open_index()
  195. entry = index[b"test.txt"]
  196. blob = self.repo.object_store[entry.sha]
  197. # Should be uppercase with LF endings
  198. self.assertEqual(blob.data, b"HELLO\nWORLD\n")
  199. def test_blob_normalizer_integration(self) -> None:
  200. """Test that get_blob_normalizer returns a FilterBlobNormalizer."""
  201. normalizer = self.repo.get_blob_normalizer()
  202. # Check it's the right type
  203. from dulwich.filters import FilterBlobNormalizer
  204. self.assertIsInstance(normalizer, FilterBlobNormalizer)
  205. # Check it has access to gitattributes
  206. self.assertIsNotNone(normalizer.gitattributes)
  207. self.assertIsNotNone(normalizer.filter_registry)
  208. def test_required_filter_missing(self) -> None:
  209. """Test that missing required filter raises an error."""
  210. # Create .gitattributes with required filter
  211. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  212. with open(gitattributes_path, "wb") as f:
  213. f.write(b"*.secret filter=required_filter\n")
  214. # Configure filter as required but without commands
  215. config = self.repo.get_config()
  216. config.set((b"filter", b"required_filter"), b"required", b"true")
  217. config.write_to_path()
  218. # Add .gitattributes
  219. porcelain.add(self.repo, paths=[".gitattributes"])
  220. # Create file that would use the filter
  221. secret_file = os.path.join(self.test_dir, "test.secret")
  222. with open(secret_file, "wb") as f:
  223. f.write(b"test content\n")
  224. # Adding file should raise error due to missing required filter
  225. with self.assertRaises(FilterError) as cm:
  226. porcelain.add(self.repo, paths=["test.secret"])
  227. self.assertIn(
  228. "Required filter 'required_filter' is not available", str(cm.exception)
  229. )
  230. def test_required_filter_clean_command_fails(self) -> None:
  231. """Test that required filter failure during clean raises an error."""
  232. # Create .gitattributes with required filter
  233. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  234. with open(gitattributes_path, "wb") as f:
  235. f.write(b"*.secret filter=failing_filter\n")
  236. # Configure filter as required with failing command
  237. config = self.repo.get_config()
  238. config.set(
  239. (b"filter", b"failing_filter"), b"clean", b"false"
  240. ) # false command always fails
  241. config.set((b"filter", b"failing_filter"), b"required", b"true")
  242. config.write_to_path()
  243. # Add .gitattributes
  244. porcelain.add(self.repo, paths=[".gitattributes"])
  245. # Create file that would use the filter
  246. secret_file = os.path.join(self.test_dir, "test.secret")
  247. with open(secret_file, "wb") as f:
  248. f.write(b"test content\n")
  249. # Adding file should raise error due to failing required filter
  250. with self.assertRaises(FilterError) as cm:
  251. porcelain.add(self.repo, paths=["test.secret"])
  252. self.assertIn("Required clean filter failed", str(cm.exception))
  253. def test_required_filter_success(self) -> None:
  254. """Test that required filter works when properly configured."""
  255. # Create .gitattributes with required filter
  256. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  257. with open(gitattributes_path, "wb") as f:
  258. f.write(b"*.secret filter=working_filter\n")
  259. # Configure filter as required with working command
  260. config = self.repo.get_config()
  261. config.set(
  262. (b"filter", b"working_filter"), b"clean", b"tr 'a-z' 'A-Z'"
  263. ) # uppercase
  264. config.set((b"filter", b"working_filter"), b"required", b"true")
  265. config.write_to_path()
  266. # Add .gitattributes
  267. porcelain.add(self.repo, paths=[".gitattributes"])
  268. # Create file that would use the filter
  269. secret_file = os.path.join(self.test_dir, "test.secret")
  270. with open(secret_file, "wb") as f:
  271. f.write(b"hello world\n")
  272. # Adding file should work and apply filter
  273. porcelain.add(self.repo, paths=["test.secret"])
  274. # Check that content was filtered
  275. index = self.repo.open_index()
  276. entry = index[b"test.secret"]
  277. blob = self.repo.object_store[entry.sha]
  278. self.assertEqual(blob.data, b"HELLO WORLD\n")
  279. def test_optional_filter_failure_fallback(self) -> None:
  280. """Test that optional filter failure falls back to original data."""
  281. # Create .gitattributes with optional filter
  282. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  283. with open(gitattributes_path, "wb") as f:
  284. f.write(b"*.txt filter=optional_filter\n")
  285. # Configure filter as optional (required=false) with failing command
  286. config = self.repo.get_config()
  287. config.set(
  288. (b"filter", b"optional_filter"), b"clean", b"false"
  289. ) # false command always fails
  290. config.set((b"filter", b"optional_filter"), b"required", b"false")
  291. config.write_to_path()
  292. # Add .gitattributes
  293. porcelain.add(self.repo, paths=[".gitattributes"])
  294. # Create file that would use the filter
  295. test_file = os.path.join(self.test_dir, "test.txt")
  296. with open(test_file, "wb") as f:
  297. f.write(b"test content\n")
  298. # Adding file should work and fallback to original content
  299. porcelain.add(self.repo, paths=["test.txt"])
  300. # Check that original content was preserved
  301. index = self.repo.open_index()
  302. entry = index[b"test.txt"]
  303. blob = self.repo.object_store[entry.sha]
  304. self.assertEqual(blob.data, b"test content\n")
  305. class ProcessFilterDriverTests(TestCase):
  306. """Tests for ProcessFilterDriver with real process filter."""
  307. def setUp(self):
  308. super().setUp()
  309. # Create a temporary test filter process dynamically
  310. self.test_filter_path = self._create_test_filter()
  311. def tearDown(self):
  312. # Clean up the test filter
  313. if hasattr(self, "test_filter_path") and os.path.exists(self.test_filter_path):
  314. os.unlink(self.test_filter_path)
  315. super().tearDown()
  316. def _create_test_filter(self):
  317. """Create a simple test filter process that works on all platforms."""
  318. import tempfile
  319. # Create filter script that uppercases on clean, lowercases on smudge
  320. filter_script = """import sys
  321. import os
  322. # Simple filter that doesn't use any external dependencies
  323. def read_exact(n):
  324. data = b""
  325. while len(data) < n:
  326. chunk = sys.stdin.buffer.read(n - len(data))
  327. if not chunk:
  328. break
  329. data += chunk
  330. return data
  331. def write_pkt(data):
  332. if data is None:
  333. sys.stdout.buffer.write(b"0000")
  334. else:
  335. length = len(data) + 4
  336. sys.stdout.buffer.write(("{:04x}".format(length)).encode())
  337. sys.stdout.buffer.write(data)
  338. sys.stdout.buffer.flush()
  339. def read_pkt():
  340. size_bytes = read_exact(4)
  341. if not size_bytes:
  342. return None
  343. size = int(size_bytes.decode(), 16)
  344. if size == 0:
  345. return None
  346. return read_exact(size - 4)
  347. # Handshake
  348. client_hello = read_pkt()
  349. version = read_pkt()
  350. flush = read_pkt()
  351. write_pkt(b"git-filter-server")
  352. write_pkt(b"version=2")
  353. write_pkt(None)
  354. # Read and echo capabilities
  355. caps = []
  356. while True:
  357. cap = read_pkt()
  358. if cap is None:
  359. break
  360. caps.append(cap)
  361. for cap in caps:
  362. write_pkt(cap)
  363. write_pkt(None)
  364. # Process commands
  365. while True:
  366. headers = {}
  367. while True:
  368. line = read_pkt()
  369. if line is None:
  370. break
  371. if b"=" in line:
  372. k, v = line.split(b"=", 1)
  373. headers[k.decode()] = v.decode()
  374. if not headers:
  375. break
  376. # Read data
  377. data_chunks = []
  378. while True:
  379. chunk = read_pkt()
  380. if chunk is None:
  381. break
  382. data_chunks.append(chunk)
  383. data = b"".join(data_chunks)
  384. # Process (uppercase for clean, lowercase for smudge)
  385. if headers.get("command") == "clean":
  386. result = data.upper()
  387. elif headers.get("command") == "smudge":
  388. result = data.lower()
  389. else:
  390. result = data
  391. # Send response
  392. write_pkt(b"status=success")
  393. write_pkt(None)
  394. # Send result
  395. chunk_size = 65516
  396. for i in range(0, len(result), chunk_size):
  397. write_pkt(result[i:i+chunk_size])
  398. write_pkt(None)
  399. # Send final headers (empty list to keep status=success)
  400. write_pkt(None)
  401. """
  402. # Create temporary file
  403. fd, path = tempfile.mkstemp(suffix=".py", prefix="test_filter_")
  404. try:
  405. os.write(fd, filter_script.encode())
  406. os.close(fd)
  407. # Make executable on Unix-like systems
  408. if os.name != "nt": # Not Windows
  409. os.chmod(path, 0o755)
  410. return path
  411. except:
  412. if os.path.exists(path):
  413. os.unlink(path)
  414. raise
  415. def test_process_filter_clean_operation(self):
  416. """Test clean operation using real process filter."""
  417. import sys
  418. driver = ProcessFilterDriver(
  419. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  420. )
  421. test_data = b"hello world"
  422. result = driver.clean(test_data)
  423. # Our test filter uppercases on clean
  424. self.assertEqual(result, b"HELLO WORLD")
  425. def test_process_filter_smudge_operation(self):
  426. """Test smudge operation using real process filter."""
  427. import sys
  428. driver = ProcessFilterDriver(
  429. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  430. )
  431. test_data = b"HELLO WORLD"
  432. result = driver.smudge(test_data, b"test.txt")
  433. # Our test filter lowercases on smudge
  434. self.assertEqual(result, b"hello world")
  435. def test_process_filter_large_data(self):
  436. """Test process filter with data larger than single pkt-line."""
  437. import sys
  438. driver = ProcessFilterDriver(
  439. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  440. )
  441. # Create data larger than max pkt-line payload (65516 bytes)
  442. test_data = b"a" * 70000
  443. result = driver.clean(test_data)
  444. # Should be uppercased
  445. self.assertEqual(result, b"A" * 70000)
  446. def test_fallback_to_individual_commands(self):
  447. """Test fallback when process filter fails."""
  448. driver = ProcessFilterDriver(
  449. clean_cmd="tr '[:lower:]' '[:upper:]'", # Shell command to uppercase
  450. process_cmd="/nonexistent/command", # This should fail
  451. required=False,
  452. )
  453. test_data = b"hello world\n"
  454. result = driver.clean(test_data)
  455. # Should fallback to tr command and uppercase
  456. self.assertEqual(result, b"HELLO WORLD\n")
  457. def test_process_reuse(self):
  458. """Test that process is reused across multiple operations."""
  459. import sys
  460. driver = ProcessFilterDriver(
  461. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  462. )
  463. # First operation
  464. result1 = driver.clean(b"test1")
  465. self.assertEqual(result1, b"TEST1")
  466. # Second operation should reuse the same process
  467. result2 = driver.clean(b"test2")
  468. self.assertEqual(result2, b"TEST2")
  469. # Process should still be alive
  470. self.assertIsNotNone(driver._process)
  471. self.assertIsNone(driver._process.poll()) # None means still running
  472. def test_error_handling_invalid_command(self):
  473. """Test error handling with invalid filter command."""
  474. driver = ProcessFilterDriver(process_cmd="/nonexistent/command", required=True)
  475. with self.assertRaises(FilterError) as cm:
  476. driver.clean(b"test data")
  477. self.assertIn("Failed to start process filter", str(cm.exception))
  478. class FilterContextTests(TestCase):
  479. """Tests for FilterContext class."""
  480. def test_filter_context_caches_long_running_drivers(self):
  481. """Test that FilterContext caches only long-running drivers."""
  482. # Create real filter drivers
  483. class UppercaseFilter:
  484. def clean(self, data):
  485. return data.upper()
  486. def smudge(self, data, path=b""):
  487. return data.lower()
  488. def cleanup(self):
  489. pass
  490. def reuse(self, config, filter_name):
  491. # Pretend it's a long-running filter that should be cached
  492. return True
  493. class IdentityFilter:
  494. def clean(self, data):
  495. return data
  496. def smudge(self, data, path=b""):
  497. return data
  498. def cleanup(self):
  499. pass
  500. def reuse(self, config, filter_name):
  501. # Lightweight filter, don't cache
  502. return False
  503. # Create registry and context
  504. # Need to provide a config for caching to work
  505. from dulwich.config import ConfigDict
  506. config = ConfigDict()
  507. # Add some dummy config to make it truthy (use proper format)
  508. config.set((b"filter", b"uppercase"), b"clean", b"dummy")
  509. registry = FilterRegistry(config=config)
  510. context = FilterContext(registry)
  511. # Register drivers
  512. long_running = UppercaseFilter()
  513. stateless = IdentityFilter()
  514. registry.register_driver("uppercase", long_running)
  515. registry.register_driver("identity", stateless)
  516. # Get drivers through context
  517. driver1 = context.get_driver("uppercase")
  518. driver2 = context.get_driver("uppercase")
  519. # Long-running driver should be cached
  520. self.assertIs(driver1, driver2)
  521. self.assertIs(driver1, long_running)
  522. # Get stateless driver
  523. stateless1 = context.get_driver("identity")
  524. stateless2 = context.get_driver("identity")
  525. # Stateless driver comes from registry but isn't cached in context
  526. self.assertIs(stateless1, stateless)
  527. self.assertIs(stateless2, stateless)
  528. self.assertNotIn("identity", context._active_drivers)
  529. self.assertIn("uppercase", context._active_drivers)
  530. def test_filter_context_cleanup(self):
  531. """Test that FilterContext properly cleans up resources."""
  532. cleanup_called = []
  533. class TrackableFilter:
  534. def __init__(self, name):
  535. self.name = name
  536. def clean(self, data):
  537. return data
  538. def smudge(self, data, path=b""):
  539. return data
  540. def cleanup(self):
  541. cleanup_called.append(self.name)
  542. def is_long_running(self):
  543. return True
  544. # Create registry and context
  545. registry = FilterRegistry()
  546. context = FilterContext(registry)
  547. # Register and use drivers
  548. filter1 = TrackableFilter("filter1")
  549. filter2 = TrackableFilter("filter2")
  550. filter3 = TrackableFilter("filter3")
  551. registry.register_driver("filter1", filter1)
  552. registry.register_driver("filter2", filter2)
  553. registry.register_driver("filter3", filter3)
  554. # Get only some drivers to cache them
  555. context.get_driver("filter1")
  556. context.get_driver("filter2")
  557. # Don't get filter3
  558. # Close context
  559. context.close()
  560. # Verify cleanup was called for all drivers (context closes registry too)
  561. self.assertEqual(set(cleanup_called), {"filter1", "filter2", "filter3"})
  562. def test_filter_context_get_driver_returns_none_for_missing(self):
  563. """Test that get_driver returns None for non-existent drivers."""
  564. registry = FilterRegistry()
  565. context = FilterContext(registry)
  566. result = context.get_driver("nonexistent")
  567. self.assertIsNone(result)
  568. def test_filter_context_with_real_process_filter(self):
  569. """Test FilterContext with real ProcessFilterDriver instances."""
  570. import sys
  571. # Use existing test filter from ProcessFilterDriverTests
  572. test_dir = tempfile.mkdtemp()
  573. self.addCleanup(lambda: __import__("shutil").rmtree(test_dir))
  574. # Create a simple test filter that just passes data through
  575. filter_script = _PASSTHROUGH_FILTER_SCRIPT
  576. filter_path = os.path.join(test_dir, "simple_filter.py")
  577. with open(filter_path, "w") as f:
  578. f.write(filter_script)
  579. # Create ProcessFilterDriver instances
  580. # One with process_cmd (long-running)
  581. process_driver = ProcessFilterDriver(
  582. process_cmd=None, # Don't use actual process to avoid complexity
  583. clean_cmd=f"{sys.executable} {filter_path}",
  584. smudge_cmd=f"{sys.executable} {filter_path}",
  585. )
  586. # Register in context
  587. from dulwich.config import ConfigDict
  588. config = ConfigDict()
  589. # Add some dummy config to make it truthy (use proper format)
  590. config.set(
  591. (b"filter", b"process"),
  592. b"clean",
  593. f"{sys.executable} {filter_path}".encode(),
  594. )
  595. config.set(
  596. (b"filter", b"process"),
  597. b"smudge",
  598. f"{sys.executable} {filter_path}".encode(),
  599. )
  600. registry = FilterRegistry(config=config)
  601. context = FilterContext(registry)
  602. registry.register_driver("process", process_driver)
  603. # Get driver - should not be cached since it's not long-running
  604. driver1 = context.get_driver("process")
  605. self.assertIsNotNone(driver1)
  606. # Check that it's not a long-running process (no process_cmd)
  607. self.assertIsNone(driver1.process_cmd)
  608. self.assertNotIn("process", context._active_drivers)
  609. # Test with a long-running driver that should be cached
  610. # Create a mock driver that always wants to be reused
  611. class CacheableProcessDriver:
  612. def __init__(self):
  613. self.process_cmd = "dummy"
  614. self.clean_cmd = None
  615. self.smudge_cmd = None
  616. self.required = False
  617. def clean(self, data):
  618. return data
  619. def smudge(self, data, path=b""):
  620. return data
  621. def cleanup(self):
  622. pass
  623. def reuse(self, config, filter_name):
  624. # This driver always wants to be cached (simulates a long-running process)
  625. return True
  626. cacheable_driver = CacheableProcessDriver()
  627. registry.register_driver("long_process", cacheable_driver)
  628. driver2 = context.get_driver("long_process")
  629. # Check that it has a process_cmd (long-running)
  630. self.assertIsNotNone(driver2.process_cmd)
  631. self.assertIn("long_process", context._active_drivers)
  632. context.close()
  633. def test_filter_context_closes_registry(self):
  634. """Test that closing FilterContext also closes the registry."""
  635. # Track if registry.close() is called
  636. registry_closed = []
  637. class TrackingRegistry(FilterRegistry):
  638. def close(self):
  639. registry_closed.append(True)
  640. super().close()
  641. registry = TrackingRegistry()
  642. context = FilterContext(registry)
  643. # Close context should also close registry
  644. context.close()
  645. self.assertTrue(registry_closed)
  646. class ProcessFilterProtocolTests(TestCase):
  647. """Tests for ProcessFilterDriver protocol compliance."""
  648. def setUp(self):
  649. super().setUp()
  650. # Create a spec-compliant test filter process dynamically
  651. self.test_filter_path = self._create_spec_compliant_filter()
  652. def tearDown(self):
  653. # Clean up the test filter
  654. if hasattr(self, "test_filter_path") and os.path.exists(self.test_filter_path):
  655. os.unlink(self.test_filter_path)
  656. super().tearDown()
  657. def _create_spec_compliant_filter(self):
  658. """Create a spec-compliant test filter that works on all platforms."""
  659. import tempfile
  660. # This filter strictly follows Git spec - no newlines in packets
  661. filter_script = """import sys
  662. def read_exact(n):
  663. data = b""
  664. while len(data) < n:
  665. chunk = sys.stdin.buffer.read(n - len(data))
  666. if not chunk:
  667. break
  668. data += chunk
  669. return data
  670. def write_pkt(data):
  671. if data is None:
  672. sys.stdout.buffer.write(b"0000")
  673. else:
  674. length = len(data) + 4
  675. sys.stdout.buffer.write(("{:04x}".format(length)).encode())
  676. sys.stdout.buffer.write(data)
  677. sys.stdout.buffer.flush()
  678. def read_pkt():
  679. size_bytes = read_exact(4)
  680. if not size_bytes:
  681. return None
  682. size = int(size_bytes.decode(), 16)
  683. if size == 0:
  684. return None
  685. return read_exact(size - 4)
  686. # Handshake - exact format, no newlines
  687. client_hello = read_pkt()
  688. version = read_pkt()
  689. flush = read_pkt()
  690. if client_hello != b"git-filter-client":
  691. sys.exit(1)
  692. if version != b"version=2":
  693. sys.exit(1)
  694. write_pkt(b"git-filter-server") # No newline
  695. write_pkt(b"version=2") # No newline
  696. write_pkt(None)
  697. # Read and echo capabilities
  698. caps = []
  699. while True:
  700. cap = read_pkt()
  701. if cap is None:
  702. break
  703. caps.append(cap)
  704. for cap in caps:
  705. if cap in [b"capability=clean", b"capability=smudge"]:
  706. write_pkt(cap)
  707. write_pkt(None)
  708. # Process commands
  709. while True:
  710. headers = {}
  711. while True:
  712. line = read_pkt()
  713. if line is None:
  714. break
  715. if b"=" in line:
  716. k, v = line.split(b"=", 1)
  717. headers[k.decode()] = v.decode()
  718. if not headers:
  719. break
  720. # Read data
  721. data_chunks = []
  722. while True:
  723. chunk = read_pkt()
  724. if chunk is None:
  725. break
  726. data_chunks.append(chunk)
  727. data = b"".join(data_chunks)
  728. # Process
  729. if headers.get("command") == "clean":
  730. result = data.upper()
  731. elif headers.get("command") == "smudge":
  732. result = data.lower()
  733. else:
  734. result = data
  735. # Send response
  736. write_pkt(b"status=success")
  737. write_pkt(None)
  738. # Send result
  739. chunk_size = 65516
  740. for i in range(0, len(result), chunk_size):
  741. write_pkt(result[i:i+chunk_size])
  742. write_pkt(None)
  743. # Send final headers (empty list to keep status=success)
  744. write_pkt(None)
  745. """
  746. fd, path = tempfile.mkstemp(suffix=".py", prefix="test_filter_spec_")
  747. try:
  748. os.write(fd, filter_script.encode())
  749. os.close(fd)
  750. if os.name != "nt": # Not Windows
  751. os.chmod(path, 0o755)
  752. return path
  753. except:
  754. if os.path.exists(path):
  755. os.unlink(path)
  756. raise
  757. def test_protocol_handshake_exact_format(self):
  758. """Test that handshake uses exact format without newlines."""
  759. import sys
  760. driver = ProcessFilterDriver(
  761. process_cmd=f"{sys.executable} {self.test_filter_path}",
  762. required=True, # Require success to test protocol compliance
  763. )
  764. # This should work with exact protocol format
  765. test_data = b"hello world"
  766. result = driver.clean(test_data)
  767. # Our test filter uppercases on clean
  768. self.assertEqual(result, b"HELLO WORLD")
  769. def test_capability_negotiation_exact_format(self):
  770. """Test that capabilities are sent and received in exact format."""
  771. import sys
  772. driver = ProcessFilterDriver(
  773. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  774. )
  775. # Force capability negotiation by using both clean and smudge
  776. clean_result = driver.clean(b"test")
  777. smudge_result = driver.smudge(b"TEST", b"test.txt")
  778. self.assertEqual(clean_result, b"TEST")
  779. self.assertEqual(smudge_result, b"test")
  780. def test_binary_data_handling(self):
  781. """Test handling of binary data through the protocol."""
  782. import sys
  783. driver = ProcessFilterDriver(
  784. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  785. )
  786. # Binary data with null bytes, high bytes, etc.
  787. binary_data = bytes(range(256))
  788. result = driver.clean(binary_data)
  789. # Should handle binary data without crashing
  790. self.assertIsInstance(result, bytes)
  791. # Our test filter uppercases bytes directly, which works for binary data
  792. # The fix ensures headers are kept as bytes, so binary content doesn't cause decode errors
  793. def test_binary_data_with_invalid_utf8_sequences(self):
  794. """Test handling of binary data with invalid UTF-8 sequences.
  795. Regression test for https://github.com/jelmer/dulwich/issues/2023
  796. where binary files (like .ogg, .jpg) caused UTF-8 decode errors.
  797. """
  798. import sys
  799. driver = ProcessFilterDriver(
  800. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  801. )
  802. # Create binary data with the specific byte that caused the issue (0xe5 at position 14)
  803. # plus other invalid UTF-8 sequences
  804. binary_data = b"some header \xe5\xff\xfe binary data"
  805. result = driver.clean(binary_data)
  806. # Should handle binary data without UTF-8 decode errors
  807. self.assertIsInstance(result, bytes)
  808. # The filter should process it successfully
  809. self.assertEqual(result, binary_data.upper())
  810. def test_large_file_chunking(self):
  811. """Test proper chunking of large files."""
  812. import sys
  813. driver = ProcessFilterDriver(
  814. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  815. )
  816. # Create data larger than max pkt-line payload (65516 bytes)
  817. large_data = b"a" * 100000
  818. result = driver.clean(large_data)
  819. # Should be properly processed (uppercased)
  820. expected = b"A" * 100000
  821. self.assertEqual(result, expected)
  822. def test_empty_file_handling(self):
  823. """Test handling of empty files."""
  824. import sys
  825. driver = ProcessFilterDriver(
  826. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  827. )
  828. result = driver.clean(b"")
  829. self.assertEqual(result, b"")
  830. def test_special_characters_in_pathname(self):
  831. """Test paths with special characters are handled correctly."""
  832. import sys
  833. # Test various special characters in paths
  834. special_paths = [
  835. b"file with spaces.txt",
  836. b"path/with/slashes.txt",
  837. b"file=with=equals.txt",
  838. b"file\nwith\nnewlines.txt",
  839. b"filew&with&ampersand.txt",
  840. ]
  841. test_data = b"test data"
  842. with create_passthrough_filter() as passthrough_filter_path:
  843. for process_cmd, smudge_cmd in [
  844. (f"{sys.executable} {self.test_filter_path}", None),
  845. (None, f"{sys.executable} {passthrough_filter_path} %f"),
  846. ]:
  847. driver = ProcessFilterDriver(
  848. process_cmd=process_cmd,
  849. smudge_cmd=smudge_cmd,
  850. required=True,
  851. )
  852. for path in special_paths:
  853. with self.subTest(
  854. process_cmd=process_cmd, smudge_cmd=smudge_cmd, path=path
  855. ):
  856. result = driver.smudge(test_data, path)
  857. self.assertEqual(result, b"test data")
  858. def test_process_crash_recovery(self):
  859. """Test that process is properly restarted after crash."""
  860. import sys
  861. driver = ProcessFilterDriver(
  862. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  863. )
  864. # First operation
  865. result = driver.clean(b"test1")
  866. self.assertEqual(result, b"TEST1")
  867. # Kill the process
  868. if driver._process:
  869. driver._process.kill()
  870. driver._process.wait()
  871. driver.cleanup()
  872. # Should restart and work again
  873. result = driver.clean(b"test2")
  874. self.assertEqual(result, b"TEST2")
  875. def test_malformed_process_response_handling(self):
  876. """Test handling of malformed responses from process."""
  877. # Create a filter that sends malformed responses
  878. malformed_filter = """#!/usr/bin/env python3
  879. import sys
  880. import os
  881. sys.path.insert(0, os.path.dirname(__file__))
  882. from dulwich.protocol import Protocol
  883. protocol = Protocol(
  884. lambda n: sys.stdin.buffer.read(n),
  885. lambda d: sys.stdout.buffer.write(d) or len(d)
  886. )
  887. # Read handshake
  888. protocol.read_pkt_line()
  889. protocol.read_pkt_line()
  890. protocol.read_pkt_line()
  891. # Send invalid handshake
  892. protocol.write_pkt_line(b"invalid-welcome")
  893. protocol.write_pkt_line(b"version=2")
  894. protocol.write_pkt_line(None)
  895. """
  896. import tempfile
  897. fd, script_path = tempfile.mkstemp(suffix=".py")
  898. try:
  899. os.write(fd, malformed_filter.encode())
  900. os.close(fd)
  901. os.chmod(script_path, 0o755)
  902. driver = ProcessFilterDriver(
  903. process_cmd=f"python3 {script_path}",
  904. clean_cmd="cat", # Fallback
  905. required=False,
  906. )
  907. # Should fallback to clean_cmd when process fails
  908. result = driver.clean(b"test data")
  909. self.assertEqual(result, b"test data")
  910. finally:
  911. os.unlink(script_path)
  912. def test_concurrent_filter_operations(self):
  913. """Test that concurrent operations work correctly."""
  914. import sys
  915. driver = ProcessFilterDriver(
  916. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  917. )
  918. results = []
  919. errors = []
  920. def worker(data):
  921. try:
  922. result = driver.clean(data)
  923. results.append(result)
  924. except Exception as e:
  925. errors.append(e)
  926. # Start 5 concurrent operations
  927. threads = []
  928. test_data = [f"test{i}".encode() for i in range(5)]
  929. for data in test_data:
  930. t = threading.Thread(target=worker, args=(data,))
  931. threads.append(t)
  932. t.start()
  933. for t in threads:
  934. t.join()
  935. # Should have no errors
  936. self.assertEqual(len(errors), 0, f"Errors: {errors}")
  937. self.assertEqual(len(results), 5)
  938. # All results should be uppercase versions
  939. expected = [data.upper() for data in test_data]
  940. self.assertEqual(sorted(results), sorted(expected))
  941. def test_process_resource_cleanup(self):
  942. """Test that process resources are properly cleaned up."""
  943. import sys
  944. driver = ProcessFilterDriver(
  945. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  946. )
  947. # Use the driver
  948. result = driver.clean(b"test")
  949. self.assertEqual(result, b"TEST")
  950. # Process should be running
  951. self.assertIsNotNone(driver._process)
  952. self.assertIsNone(driver._process.poll()) # None means still running
  953. # Remember the old process to check it was terminated
  954. old_process = driver._process
  955. # Manually clean up (simulates __del__)
  956. driver.cleanup()
  957. # Process reference should be cleared
  958. self.assertIsNone(driver._process)
  959. self.assertIsNone(driver._protocol)
  960. # Old process should be terminated
  961. self.assertIsNotNone(old_process.poll()) # Not None means terminated
  962. def test_required_filter_error_propagation(self):
  963. """Test that errors are properly propagated when filter is required."""
  964. driver = ProcessFilterDriver(
  965. process_cmd="/definitely/nonexistent/command", required=True
  966. )
  967. with self.assertRaises(FilterError) as cm:
  968. driver.clean(b"test data")
  969. self.assertIn("Failed to start process filter", str(cm.exception))
  970. def test_two_phase_response_protocol(self):
  971. """Test filter protocol with two-phase response (initial + final headers).
  972. This test verifies that the filter correctly handles the Git LFS protocol
  973. where filters send:
  974. 1. Initial headers with status
  975. 2. Content data
  976. 3. Final headers with status
  977. This is the format used by git-lfs and documented in the Git filter protocol.
  978. """
  979. import sys
  980. import tempfile
  981. # Create a filter that follows the two-phase protocol
  982. filter_script = """import sys
  983. def read_exact(n):
  984. data = b""
  985. while len(data) < n:
  986. chunk = sys.stdin.buffer.read(n - len(data))
  987. if not chunk:
  988. break
  989. data += chunk
  990. return data
  991. def write_pkt(data):
  992. if data is None:
  993. sys.stdout.buffer.write(b"0000")
  994. else:
  995. length = len(data) + 4
  996. sys.stdout.buffer.write(("{:04x}".format(length)).encode())
  997. sys.stdout.buffer.write(data)
  998. sys.stdout.buffer.flush()
  999. def read_pkt():
  1000. size_bytes = read_exact(4)
  1001. if not size_bytes:
  1002. return None
  1003. size = int(size_bytes.decode(), 16)
  1004. if size == 0:
  1005. return None
  1006. return read_exact(size - 4)
  1007. # Handshake
  1008. client_hello = read_pkt()
  1009. version = read_pkt()
  1010. flush = read_pkt()
  1011. write_pkt(b"git-filter-server")
  1012. write_pkt(b"version=2")
  1013. write_pkt(None)
  1014. # Read and echo capabilities
  1015. caps = []
  1016. while True:
  1017. cap = read_pkt()
  1018. if cap is None:
  1019. break
  1020. caps.append(cap)
  1021. for cap in caps:
  1022. write_pkt(cap)
  1023. write_pkt(None)
  1024. # Process commands
  1025. while True:
  1026. headers = {}
  1027. while True:
  1028. line = read_pkt()
  1029. if line is None:
  1030. break
  1031. if b"=" in line:
  1032. k, v = line.split(b"=", 1)
  1033. headers[k.decode()] = v.decode()
  1034. if not headers:
  1035. break
  1036. # Read data
  1037. data_chunks = []
  1038. while True:
  1039. chunk = read_pkt()
  1040. if chunk is None:
  1041. break
  1042. data_chunks.append(chunk)
  1043. data = b"".join(data_chunks)
  1044. # Process
  1045. if headers.get("command") == "clean":
  1046. result = data.upper()
  1047. elif headers.get("command") == "smudge":
  1048. result = data.lower()
  1049. else:
  1050. result = data
  1051. # TWO-PHASE RESPONSE: Send initial headers
  1052. write_pkt(b"status=success")
  1053. write_pkt(None)
  1054. # Send result data
  1055. chunk_size = 65516
  1056. for i in range(0, len(result), chunk_size):
  1057. write_pkt(result[i:i+chunk_size])
  1058. write_pkt(None)
  1059. # TWO-PHASE RESPONSE: Send final headers (empty list to keep status=success)
  1060. write_pkt(None)
  1061. """
  1062. fd, filter_path = tempfile.mkstemp(
  1063. suffix=".py", prefix="test_filter_two_phase_"
  1064. )
  1065. try:
  1066. os.write(fd, filter_script.encode())
  1067. os.close(fd)
  1068. if os.name != "nt":
  1069. os.chmod(filter_path, 0o755)
  1070. driver = ProcessFilterDriver(
  1071. process_cmd=f"{sys.executable} {filter_path}", required=True
  1072. )
  1073. # Test clean operation
  1074. test_data = b"hello world"
  1075. result = driver.clean(test_data)
  1076. self.assertEqual(result, b"HELLO WORLD")
  1077. # Test smudge operation
  1078. result = driver.smudge(b"HELLO WORLD", b"test.txt")
  1079. self.assertEqual(result, b"hello world")
  1080. driver.cleanup()
  1081. finally:
  1082. if os.path.exists(filter_path):
  1083. os.unlink(filter_path)
  1084. def test_two_phase_response_with_status_messages(self):
  1085. """Test filter that sends status messages in final headers.
  1086. Some filters (like git-lfs) may send progress or status messages
  1087. in the final headers. This test verifies that we can handle those.
  1088. """
  1089. import sys
  1090. import tempfile
  1091. # Create a filter that sends extra status info in final headers
  1092. filter_script = """import sys
  1093. def read_exact(n):
  1094. data = b""
  1095. while len(data) < n:
  1096. chunk = sys.stdin.buffer.read(n - len(data))
  1097. if not chunk:
  1098. break
  1099. data += chunk
  1100. return data
  1101. def write_pkt(data):
  1102. if data is None:
  1103. sys.stdout.buffer.write(b"0000")
  1104. else:
  1105. length = len(data) + 4
  1106. sys.stdout.buffer.write(("{:04x}".format(length)).encode())
  1107. sys.stdout.buffer.write(data)
  1108. sys.stdout.buffer.flush()
  1109. def read_pkt():
  1110. size_bytes = read_exact(4)
  1111. if not size_bytes:
  1112. return None
  1113. size = int(size_bytes.decode(), 16)
  1114. if size == 0:
  1115. return None
  1116. return read_exact(size - 4)
  1117. # Handshake
  1118. client_hello = read_pkt()
  1119. version = read_pkt()
  1120. flush = read_pkt()
  1121. write_pkt(b"git-filter-server")
  1122. write_pkt(b"version=2")
  1123. write_pkt(None)
  1124. # Read and echo capabilities
  1125. caps = []
  1126. while True:
  1127. cap = read_pkt()
  1128. if cap is None:
  1129. break
  1130. caps.append(cap)
  1131. for cap in caps:
  1132. write_pkt(cap)
  1133. write_pkt(None)
  1134. # Process commands
  1135. while True:
  1136. headers = {}
  1137. while True:
  1138. line = read_pkt()
  1139. if line is None:
  1140. break
  1141. if b"=" in line:
  1142. k, v = line.split(b"=", 1)
  1143. headers[k.decode()] = v.decode()
  1144. if not headers:
  1145. break
  1146. # Read data
  1147. data_chunks = []
  1148. while True:
  1149. chunk = read_pkt()
  1150. if chunk is None:
  1151. break
  1152. data_chunks.append(chunk)
  1153. data = b"".join(data_chunks)
  1154. # Process
  1155. result = data.upper()
  1156. # Send initial headers
  1157. write_pkt(b"status=success")
  1158. write_pkt(None)
  1159. # Send result data
  1160. chunk_size = 65516
  1161. for i in range(0, len(result), chunk_size):
  1162. write_pkt(result[i:i+chunk_size])
  1163. write_pkt(None)
  1164. # Send final headers with progress messages (like git-lfs does)
  1165. write_pkt(b"status=success")
  1166. write_pkt(None)
  1167. """
  1168. fd, filter_path = tempfile.mkstemp(suffix=".py", prefix="test_filter_status_")
  1169. try:
  1170. os.write(fd, filter_script.encode())
  1171. os.close(fd)
  1172. if os.name != "nt":
  1173. os.chmod(filter_path, 0o755)
  1174. driver = ProcessFilterDriver(
  1175. process_cmd=f"{sys.executable} {filter_path}", required=True
  1176. )
  1177. # Test clean operation with status messages
  1178. test_data = b"test data with status"
  1179. result = driver.clean(test_data)
  1180. self.assertEqual(result, b"TEST DATA WITH STATUS")
  1181. driver.cleanup()
  1182. finally:
  1183. if os.path.exists(filter_path):
  1184. os.unlink(filter_path)
  1185. def test_two_phase_response_with_final_error(self):
  1186. """Test filter that reports error in final headers.
  1187. The Git protocol allows filters to report success initially,
  1188. then report an error in the final headers. This test ensures
  1189. we handle that correctly.
  1190. """
  1191. import sys
  1192. import tempfile
  1193. # Create a filter that sends error in final headers
  1194. filter_script = """import sys
  1195. def read_exact(n):
  1196. data = b""
  1197. while len(data) < n:
  1198. chunk = sys.stdin.buffer.read(n - len(data))
  1199. if not chunk:
  1200. break
  1201. data += chunk
  1202. return data
  1203. def write_pkt(data):
  1204. if data is None:
  1205. sys.stdout.buffer.write(b"0000")
  1206. else:
  1207. length = len(data) + 4
  1208. sys.stdout.buffer.write(("{:04x}".format(length)).encode())
  1209. sys.stdout.buffer.write(data)
  1210. sys.stdout.buffer.flush()
  1211. def read_pkt():
  1212. size_bytes = read_exact(4)
  1213. if not size_bytes:
  1214. return None
  1215. size = int(size_bytes.decode(), 16)
  1216. if size == 0:
  1217. return None
  1218. return read_exact(size - 4)
  1219. # Handshake
  1220. client_hello = read_pkt()
  1221. version = read_pkt()
  1222. flush = read_pkt()
  1223. write_pkt(b"git-filter-server")
  1224. write_pkt(b"version=2")
  1225. write_pkt(None)
  1226. # Read and echo capabilities
  1227. caps = []
  1228. while True:
  1229. cap = read_pkt()
  1230. if cap is None:
  1231. break
  1232. caps.append(cap)
  1233. for cap in caps:
  1234. write_pkt(cap)
  1235. write_pkt(None)
  1236. # Process commands
  1237. while True:
  1238. headers = {}
  1239. while True:
  1240. line = read_pkt()
  1241. if line is None:
  1242. break
  1243. if b"=" in line:
  1244. k, v = line.split(b"=", 1)
  1245. headers[k.decode()] = v.decode()
  1246. if not headers:
  1247. break
  1248. # Read data
  1249. data_chunks = []
  1250. while True:
  1251. chunk = read_pkt()
  1252. if chunk is None:
  1253. break
  1254. data_chunks.append(chunk)
  1255. data = b"".join(data_chunks)
  1256. # Send initial headers with success
  1257. write_pkt(b"status=success")
  1258. write_pkt(None)
  1259. # Send partial result
  1260. write_pkt(b"PARTIAL")
  1261. write_pkt(None)
  1262. # Send final headers with error (simulating processing failure)
  1263. write_pkt(b"status=error")
  1264. write_pkt(None)
  1265. """
  1266. fd, filter_path = tempfile.mkstemp(suffix=".py", prefix="test_filter_error_")
  1267. try:
  1268. os.write(fd, filter_script.encode())
  1269. os.close(fd)
  1270. if os.name != "nt":
  1271. os.chmod(filter_path, 0o755)
  1272. driver = ProcessFilterDriver(
  1273. process_cmd=f"{sys.executable} {filter_path}", required=True
  1274. )
  1275. # Should raise FilterError due to final status being error
  1276. with self.assertRaises(FilterError) as cm:
  1277. driver.clean(b"test data")
  1278. self.assertIn("final status: error", str(cm.exception))
  1279. driver.cleanup()
  1280. finally:
  1281. if os.path.exists(filter_path):
  1282. os.unlink(filter_path)
  1283. _PASSTHROUGH_FILTER_SCRIPT = """import sys
  1284. while True:
  1285. line = sys.stdin.buffer.read()
  1286. if not line:
  1287. break
  1288. sys.stdout.buffer.write(line)
  1289. sys.stdout.buffer.flush()
  1290. """
  1291. @contextmanager
  1292. def create_passthrough_filter() -> Iterator[str]:
  1293. filter_script = _PASSTHROUGH_FILTER_SCRIPT
  1294. with tempfile.NamedTemporaryFile(
  1295. suffix=".py", delete=False, prefix="test_filter_passthrough_"
  1296. ) as f:
  1297. f.write(filter_script.encode())
  1298. path = f.name
  1299. try:
  1300. if os.name != "nt": # Not Windows
  1301. os.chmod(path, 0o755)
  1302. yield path
  1303. finally:
  1304. try:
  1305. os.unlink(path)
  1306. except FileNotFoundError:
  1307. pass