test_filters.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955
  1. # test_filters.py -- Tests for filters
  2. # Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for filters."""
  22. import os
  23. import tempfile
  24. import threading
  25. import unittest
  26. from dulwich import porcelain
  27. from dulwich.filters import FilterError, ProcessFilterDriver
  28. from dulwich.repo import Repo
  29. from . import TestCase
  30. class GitAttributesFilterIntegrationTests(TestCase):
  31. """Test gitattributes integration with filter drivers."""
  32. def setUp(self) -> None:
  33. super().setUp()
  34. self.test_dir = tempfile.mkdtemp()
  35. self.addCleanup(self._cleanup_test_dir)
  36. self.repo = Repo.init(self.test_dir)
  37. def _cleanup_test_dir(self) -> None:
  38. """Clean up test directory."""
  39. import shutil
  40. shutil.rmtree(self.test_dir)
  41. def test_gitattributes_text_filter(self) -> None:
  42. """Test that text attribute triggers line ending conversion."""
  43. # Configure autocrlf first
  44. config = self.repo.get_config()
  45. config.set((b"core",), b"autocrlf", b"true")
  46. config.write_to_path()
  47. # Create .gitattributes with text attribute
  48. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  49. with open(gitattributes_path, "wb") as f:
  50. f.write(b"*.txt text\n")
  51. f.write(b"*.bin -text\n")
  52. # Add .gitattributes
  53. porcelain.add(self.repo, paths=[".gitattributes"])
  54. porcelain.commit(self.repo, message=b"Add gitattributes")
  55. # Create text file with CRLF
  56. text_file = os.path.join(self.test_dir, "test.txt")
  57. with open(text_file, "wb") as f:
  58. f.write(b"line1\r\nline2\r\n")
  59. # Create binary file with CRLF
  60. bin_file = os.path.join(self.test_dir, "test.bin")
  61. with open(bin_file, "wb") as f:
  62. f.write(b"binary\r\ndata\r\n")
  63. # Add files
  64. porcelain.add(self.repo, paths=["test.txt", "test.bin"])
  65. # Check that text file was normalized
  66. index = self.repo.open_index()
  67. text_entry = index[b"test.txt"]
  68. text_blob = self.repo.object_store[text_entry.sha]
  69. self.assertEqual(text_blob.data, b"line1\nline2\n")
  70. # Check that binary file was not normalized
  71. bin_entry = index[b"test.bin"]
  72. bin_blob = self.repo.object_store[bin_entry.sha]
  73. self.assertEqual(bin_blob.data, b"binary\r\ndata\r\n")
  74. @unittest.skip("Custom process filters require external commands")
  75. def test_gitattributes_custom_filter(self) -> None:
  76. """Test custom filter specified in gitattributes."""
  77. # Create .gitattributes with custom filter
  78. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  79. with open(gitattributes_path, "wb") as f:
  80. f.write(b"*.secret filter=redact\n")
  81. # Configure custom filter (use tr command for testing)
  82. config = self.repo.get_config()
  83. # This filter replaces all digits with X
  84. config.set((b"filter", b"redact"), b"clean", b"tr '0-9' 'X'")
  85. config.write_to_path()
  86. # Add .gitattributes
  87. porcelain.add(self.repo, paths=[".gitattributes"])
  88. # Create file with sensitive content
  89. secret_file = os.path.join(self.test_dir, "password.secret")
  90. with open(secret_file, "wb") as f:
  91. f.write(b"password123\ntoken456\n")
  92. # Add file
  93. porcelain.add(self.repo, paths=["password.secret"])
  94. # Check that content was filtered
  95. index = self.repo.open_index()
  96. entry = index[b"password.secret"]
  97. blob = self.repo.object_store[entry.sha]
  98. self.assertEqual(blob.data, b"passwordXXX\ntokenXXX\n")
  99. def test_gitattributes_from_tree(self) -> None:
  100. """Test that gitattributes from tree are used when no working tree exists."""
  101. # Create .gitattributes with text attribute
  102. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  103. with open(gitattributes_path, "wb") as f:
  104. f.write(b"*.txt text\n")
  105. # Add and commit .gitattributes
  106. porcelain.add(self.repo, paths=[".gitattributes"])
  107. porcelain.commit(self.repo, message=b"Add gitattributes")
  108. # Remove .gitattributes from working tree
  109. os.remove(gitattributes_path)
  110. # Get gitattributes - should still work from tree
  111. gitattributes = self.repo.get_gitattributes()
  112. attrs = gitattributes.match_path(b"test.txt")
  113. self.assertEqual(attrs.get(b"text"), True)
  114. def test_gitattributes_info_attributes(self) -> None:
  115. """Test that .git/info/attributes is read."""
  116. # Create info/attributes
  117. info_dir = os.path.join(self.repo.controldir(), "info")
  118. if not os.path.exists(info_dir):
  119. os.makedirs(info_dir)
  120. info_attrs_path = os.path.join(info_dir, "attributes")
  121. with open(info_attrs_path, "wb") as f:
  122. f.write(b"*.log text\n")
  123. # Get gitattributes
  124. gitattributes = self.repo.get_gitattributes()
  125. attrs = gitattributes.match_path(b"debug.log")
  126. self.assertEqual(attrs.get(b"text"), True)
  127. @unittest.skip("Custom process filters require external commands")
  128. def test_filter_precedence(self) -> None:
  129. """Test that filter attribute takes precedence over text attribute."""
  130. # Create .gitattributes with both text and filter
  131. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  132. with open(gitattributes_path, "wb") as f:
  133. f.write(b"*.txt text filter=custom\n")
  134. # Configure autocrlf and custom filter
  135. config = self.repo.get_config()
  136. config.set((b"core",), b"autocrlf", b"true")
  137. # This filter converts to uppercase
  138. config.set((b"filter", b"custom"), b"clean", b"tr '[:lower:]' '[:upper:]'")
  139. config.write_to_path()
  140. # Add .gitattributes
  141. porcelain.add(self.repo, paths=[".gitattributes"])
  142. # Create text file with lowercase and CRLF
  143. text_file = os.path.join(self.test_dir, "test.txt")
  144. with open(text_file, "wb") as f:
  145. f.write(b"hello\r\nworld\r\n")
  146. # Add file
  147. porcelain.add(self.repo, paths=["test.txt"])
  148. # Check that custom filter was applied (not just line ending conversion)
  149. index = self.repo.open_index()
  150. entry = index[b"test.txt"]
  151. blob = self.repo.object_store[entry.sha]
  152. # Should be uppercase with LF endings
  153. self.assertEqual(blob.data, b"HELLO\nWORLD\n")
  154. def test_blob_normalizer_integration(self) -> None:
  155. """Test that get_blob_normalizer returns a FilterBlobNormalizer."""
  156. normalizer = self.repo.get_blob_normalizer()
  157. # Check it's the right type
  158. from dulwich.filters import FilterBlobNormalizer
  159. self.assertIsInstance(normalizer, FilterBlobNormalizer)
  160. # Check it has access to gitattributes
  161. self.assertIsNotNone(normalizer.gitattributes)
  162. self.assertIsNotNone(normalizer.filter_registry)
  163. def test_required_filter_missing(self) -> None:
  164. """Test that missing required filter raises an error."""
  165. # Create .gitattributes with required filter
  166. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  167. with open(gitattributes_path, "wb") as f:
  168. f.write(b"*.secret filter=required_filter\n")
  169. # Configure filter as required but without commands
  170. config = self.repo.get_config()
  171. config.set((b"filter", b"required_filter"), b"required", b"true")
  172. config.write_to_path()
  173. # Add .gitattributes
  174. porcelain.add(self.repo, paths=[".gitattributes"])
  175. # Create file that would use the filter
  176. secret_file = os.path.join(self.test_dir, "test.secret")
  177. with open(secret_file, "wb") as f:
  178. f.write(b"test content\n")
  179. # Adding file should raise error due to missing required filter
  180. with self.assertRaises(FilterError) as cm:
  181. porcelain.add(self.repo, paths=["test.secret"])
  182. self.assertIn(
  183. "Required filter 'required_filter' is not available", str(cm.exception)
  184. )
  185. def test_required_filter_clean_command_fails(self) -> None:
  186. """Test that required filter failure during clean raises an error."""
  187. # Create .gitattributes with required filter
  188. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  189. with open(gitattributes_path, "wb") as f:
  190. f.write(b"*.secret filter=failing_filter\n")
  191. # Configure filter as required with failing command
  192. config = self.repo.get_config()
  193. config.set(
  194. (b"filter", b"failing_filter"), b"clean", b"false"
  195. ) # false command always fails
  196. config.set((b"filter", b"failing_filter"), b"required", b"true")
  197. config.write_to_path()
  198. # Add .gitattributes
  199. porcelain.add(self.repo, paths=[".gitattributes"])
  200. # Create file that would use the filter
  201. secret_file = os.path.join(self.test_dir, "test.secret")
  202. with open(secret_file, "wb") as f:
  203. f.write(b"test content\n")
  204. # Adding file should raise error due to failing required filter
  205. with self.assertRaises(FilterError) as cm:
  206. porcelain.add(self.repo, paths=["test.secret"])
  207. self.assertIn("Required clean filter failed", str(cm.exception))
  208. def test_required_filter_success(self) -> None:
  209. """Test that required filter works when properly configured."""
  210. # Create .gitattributes with required filter
  211. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  212. with open(gitattributes_path, "wb") as f:
  213. f.write(b"*.secret filter=working_filter\n")
  214. # Configure filter as required with working command
  215. config = self.repo.get_config()
  216. config.set(
  217. (b"filter", b"working_filter"), b"clean", b"tr 'a-z' 'A-Z'"
  218. ) # uppercase
  219. config.set((b"filter", b"working_filter"), b"required", b"true")
  220. config.write_to_path()
  221. # Add .gitattributes
  222. porcelain.add(self.repo, paths=[".gitattributes"])
  223. # Create file that would use the filter
  224. secret_file = os.path.join(self.test_dir, "test.secret")
  225. with open(secret_file, "wb") as f:
  226. f.write(b"hello world\n")
  227. # Adding file should work and apply filter
  228. porcelain.add(self.repo, paths=["test.secret"])
  229. # Check that content was filtered
  230. index = self.repo.open_index()
  231. entry = index[b"test.secret"]
  232. blob = self.repo.object_store[entry.sha]
  233. self.assertEqual(blob.data, b"HELLO WORLD\n")
  234. def test_optional_filter_failure_fallback(self) -> None:
  235. """Test that optional filter failure falls back to original data."""
  236. # Create .gitattributes with optional filter
  237. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  238. with open(gitattributes_path, "wb") as f:
  239. f.write(b"*.txt filter=optional_filter\n")
  240. # Configure filter as optional (required=false) with failing command
  241. config = self.repo.get_config()
  242. config.set(
  243. (b"filter", b"optional_filter"), b"clean", b"false"
  244. ) # false command always fails
  245. config.set((b"filter", b"optional_filter"), b"required", b"false")
  246. config.write_to_path()
  247. # Add .gitattributes
  248. porcelain.add(self.repo, paths=[".gitattributes"])
  249. # Create file that would use the filter
  250. test_file = os.path.join(self.test_dir, "test.txt")
  251. with open(test_file, "wb") as f:
  252. f.write(b"test content\n")
  253. # Adding file should work and fallback to original content
  254. porcelain.add(self.repo, paths=["test.txt"])
  255. # Check that original content was preserved
  256. index = self.repo.open_index()
  257. entry = index[b"test.txt"]
  258. blob = self.repo.object_store[entry.sha]
  259. self.assertEqual(blob.data, b"test content\n")
  260. class ProcessFilterDriverTests(TestCase):
  261. """Tests for ProcessFilterDriver with real process filter."""
  262. def setUp(self):
  263. super().setUp()
  264. # Create a temporary test filter process dynamically
  265. self.test_filter_path = self._create_test_filter()
  266. def tearDown(self):
  267. # Clean up the test filter
  268. if hasattr(self, "test_filter_path") and os.path.exists(self.test_filter_path):
  269. os.unlink(self.test_filter_path)
  270. super().tearDown()
  271. def _create_test_filter(self):
  272. """Create a simple test filter process that works on all platforms."""
  273. import tempfile
  274. # Create filter script that uppercases on clean, lowercases on smudge
  275. filter_script = """import sys
  276. import os
  277. # Simple filter that doesn't use any external dependencies
  278. def read_exact(n):
  279. data = b""
  280. while len(data) < n:
  281. chunk = sys.stdin.buffer.read(n - len(data))
  282. if not chunk:
  283. break
  284. data += chunk
  285. return data
  286. def write_pkt(data):
  287. if data is None:
  288. sys.stdout.buffer.write(b"0000")
  289. else:
  290. length = len(data) + 4
  291. sys.stdout.buffer.write(("{:04x}".format(length)).encode())
  292. sys.stdout.buffer.write(data)
  293. sys.stdout.buffer.flush()
  294. def read_pkt():
  295. size_bytes = read_exact(4)
  296. if not size_bytes:
  297. return None
  298. size = int(size_bytes.decode(), 16)
  299. if size == 0:
  300. return None
  301. return read_exact(size - 4)
  302. # Handshake
  303. client_hello = read_pkt()
  304. version = read_pkt()
  305. flush = read_pkt()
  306. write_pkt(b"git-filter-server")
  307. write_pkt(b"version=2")
  308. write_pkt(None)
  309. # Read and echo capabilities
  310. caps = []
  311. while True:
  312. cap = read_pkt()
  313. if cap is None:
  314. break
  315. caps.append(cap)
  316. for cap in caps:
  317. write_pkt(cap)
  318. write_pkt(None)
  319. # Process commands
  320. while True:
  321. headers = {}
  322. while True:
  323. line = read_pkt()
  324. if line is None:
  325. break
  326. if b"=" in line:
  327. k, v = line.split(b"=", 1)
  328. headers[k.decode()] = v.decode()
  329. if not headers:
  330. break
  331. # Read data
  332. data_chunks = []
  333. while True:
  334. chunk = read_pkt()
  335. if chunk is None:
  336. break
  337. data_chunks.append(chunk)
  338. data = b"".join(data_chunks)
  339. # Process (uppercase for clean, lowercase for smudge)
  340. if headers.get("command") == "clean":
  341. result = data.upper()
  342. elif headers.get("command") == "smudge":
  343. result = data.lower()
  344. else:
  345. result = data
  346. # Send response
  347. write_pkt(b"status=success")
  348. write_pkt(None)
  349. # Send result
  350. chunk_size = 65516
  351. for i in range(0, len(result), chunk_size):
  352. write_pkt(result[i:i+chunk_size])
  353. write_pkt(None)
  354. """
  355. # Create temporary file
  356. fd, path = tempfile.mkstemp(suffix=".py", prefix="test_filter_")
  357. try:
  358. os.write(fd, filter_script.encode())
  359. os.close(fd)
  360. # Make executable on Unix-like systems
  361. if os.name != "nt": # Not Windows
  362. os.chmod(path, 0o755)
  363. return path
  364. except:
  365. if os.path.exists(path):
  366. os.unlink(path)
  367. raise
  368. def test_process_filter_clean_operation(self):
  369. """Test clean operation using real process filter."""
  370. import sys
  371. driver = ProcessFilterDriver(
  372. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  373. )
  374. test_data = b"hello world"
  375. result = driver.clean(test_data)
  376. # Our test filter uppercases on clean
  377. self.assertEqual(result, b"HELLO WORLD")
  378. def test_process_filter_smudge_operation(self):
  379. """Test smudge operation using real process filter."""
  380. import sys
  381. driver = ProcessFilterDriver(
  382. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  383. )
  384. test_data = b"HELLO WORLD"
  385. result = driver.smudge(test_data, b"test.txt")
  386. # Our test filter lowercases on smudge
  387. self.assertEqual(result, b"hello world")
  388. def test_process_filter_large_data(self):
  389. """Test process filter with data larger than single pkt-line."""
  390. import sys
  391. driver = ProcessFilterDriver(
  392. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  393. )
  394. # Create data larger than max pkt-line payload (65516 bytes)
  395. test_data = b"a" * 70000
  396. result = driver.clean(test_data)
  397. # Should be uppercased
  398. self.assertEqual(result, b"A" * 70000)
  399. def test_fallback_to_individual_commands(self):
  400. """Test fallback when process filter fails."""
  401. driver = ProcessFilterDriver(
  402. clean_cmd="tr '[:lower:]' '[:upper:]'", # Shell command to uppercase
  403. process_cmd="/nonexistent/command", # This should fail
  404. required=False,
  405. )
  406. test_data = b"hello world\n"
  407. result = driver.clean(test_data)
  408. # Should fallback to tr command and uppercase
  409. self.assertEqual(result, b"HELLO WORLD\n")
  410. def test_process_reuse(self):
  411. """Test that process is reused across multiple operations."""
  412. import sys
  413. driver = ProcessFilterDriver(
  414. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  415. )
  416. # First operation
  417. result1 = driver.clean(b"test1")
  418. self.assertEqual(result1, b"TEST1")
  419. # Second operation should reuse the same process
  420. result2 = driver.clean(b"test2")
  421. self.assertEqual(result2, b"TEST2")
  422. # Process should still be alive
  423. self.assertIsNotNone(driver._process)
  424. self.assertIsNone(driver._process.poll()) # None means still running
  425. def test_error_handling_invalid_command(self):
  426. """Test error handling with invalid filter command."""
  427. driver = ProcessFilterDriver(process_cmd="/nonexistent/command", required=True)
  428. with self.assertRaises(FilterError) as cm:
  429. driver.clean(b"test data")
  430. self.assertIn("Failed to start process filter", str(cm.exception))
  431. def test_thread_safety_with_process_filter(self):
  432. """Test thread safety with actual process filter."""
  433. import sys
  434. driver = ProcessFilterDriver(
  435. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  436. )
  437. results = []
  438. errors = []
  439. def worker(data):
  440. try:
  441. result = driver.clean(data)
  442. results.append(result)
  443. except Exception as e:
  444. errors.append(e)
  445. # Start multiple threads
  446. threads = []
  447. for i in range(3):
  448. data = f"test{i}".encode()
  449. t = threading.Thread(target=worker, args=(data,))
  450. threads.append(t)
  451. t.start()
  452. # Wait for all threads
  453. for t in threads:
  454. t.join()
  455. # Should have no errors and correct results
  456. self.assertEqual(len(errors), 0, f"Errors: {errors}")
  457. self.assertEqual(len(results), 3)
  458. # Check results are correct (uppercased)
  459. expected = [b"TEST0", b"TEST1", b"TEST2"]
  460. self.assertEqual(sorted(results), sorted(expected))
  461. class ProcessFilterProtocolTests(TestCase):
  462. """Tests for ProcessFilterDriver protocol compliance."""
  463. def setUp(self):
  464. super().setUp()
  465. # Create a spec-compliant test filter process dynamically
  466. self.test_filter_path = self._create_spec_compliant_filter()
  467. def tearDown(self):
  468. # Clean up the test filter
  469. if hasattr(self, "test_filter_path") and os.path.exists(self.test_filter_path):
  470. os.unlink(self.test_filter_path)
  471. super().tearDown()
  472. def _create_spec_compliant_filter(self):
  473. """Create a spec-compliant test filter that works on all platforms."""
  474. import tempfile
  475. # This filter strictly follows Git spec - no newlines in packets
  476. filter_script = """import sys
  477. def read_exact(n):
  478. data = b""
  479. while len(data) < n:
  480. chunk = sys.stdin.buffer.read(n - len(data))
  481. if not chunk:
  482. break
  483. data += chunk
  484. return data
  485. def write_pkt(data):
  486. if data is None:
  487. sys.stdout.buffer.write(b"0000")
  488. else:
  489. length = len(data) + 4
  490. sys.stdout.buffer.write(("{:04x}".format(length)).encode())
  491. sys.stdout.buffer.write(data)
  492. sys.stdout.buffer.flush()
  493. def read_pkt():
  494. size_bytes = read_exact(4)
  495. if not size_bytes:
  496. return None
  497. size = int(size_bytes.decode(), 16)
  498. if size == 0:
  499. return None
  500. return read_exact(size - 4)
  501. # Handshake - exact format, no newlines
  502. client_hello = read_pkt()
  503. version = read_pkt()
  504. flush = read_pkt()
  505. if client_hello != b"git-filter-client":
  506. sys.exit(1)
  507. if version != b"version=2":
  508. sys.exit(1)
  509. write_pkt(b"git-filter-server") # No newline
  510. write_pkt(b"version=2") # No newline
  511. write_pkt(None)
  512. # Read and echo capabilities
  513. caps = []
  514. while True:
  515. cap = read_pkt()
  516. if cap is None:
  517. break
  518. caps.append(cap)
  519. for cap in caps:
  520. if cap in [b"capability=clean", b"capability=smudge"]:
  521. write_pkt(cap)
  522. write_pkt(None)
  523. # Process commands
  524. while True:
  525. headers = {}
  526. while True:
  527. line = read_pkt()
  528. if line is None:
  529. break
  530. if b"=" in line:
  531. k, v = line.split(b"=", 1)
  532. headers[k.decode()] = v.decode()
  533. if not headers:
  534. break
  535. # Read data
  536. data_chunks = []
  537. while True:
  538. chunk = read_pkt()
  539. if chunk is None:
  540. break
  541. data_chunks.append(chunk)
  542. data = b"".join(data_chunks)
  543. # Process
  544. if headers.get("command") == "clean":
  545. result = data.upper()
  546. elif headers.get("command") == "smudge":
  547. result = data.lower()
  548. else:
  549. result = data
  550. # Send response
  551. write_pkt(b"status=success")
  552. write_pkt(None)
  553. # Send result
  554. chunk_size = 65516
  555. for i in range(0, len(result), chunk_size):
  556. write_pkt(result[i:i+chunk_size])
  557. write_pkt(None)
  558. """
  559. fd, path = tempfile.mkstemp(suffix=".py", prefix="test_filter_spec_")
  560. try:
  561. os.write(fd, filter_script.encode())
  562. os.close(fd)
  563. if os.name != "nt": # Not Windows
  564. os.chmod(path, 0o755)
  565. return path
  566. except:
  567. if os.path.exists(path):
  568. os.unlink(path)
  569. raise
  570. def test_protocol_handshake_exact_format(self):
  571. """Test that handshake uses exact format without newlines."""
  572. import sys
  573. driver = ProcessFilterDriver(
  574. process_cmd=f"{sys.executable} {self.test_filter_path}",
  575. required=True, # Require success to test protocol compliance
  576. )
  577. # This should work with exact protocol format
  578. test_data = b"hello world"
  579. result = driver.clean(test_data)
  580. # Our test filter uppercases on clean
  581. self.assertEqual(result, b"HELLO WORLD")
  582. def test_capability_negotiation_exact_format(self):
  583. """Test that capabilities are sent and received in exact format."""
  584. import sys
  585. driver = ProcessFilterDriver(
  586. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  587. )
  588. # Force capability negotiation by using both clean and smudge
  589. clean_result = driver.clean(b"test")
  590. smudge_result = driver.smudge(b"TEST", b"test.txt")
  591. self.assertEqual(clean_result, b"TEST")
  592. self.assertEqual(smudge_result, b"test")
  593. def test_binary_data_handling(self):
  594. """Test handling of binary data through the protocol."""
  595. import sys
  596. driver = ProcessFilterDriver(
  597. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  598. )
  599. # Binary data with null bytes, high bytes, etc.
  600. binary_data = bytes(range(256))
  601. try:
  602. result = driver.clean(binary_data)
  603. # Should handle binary data without crashing
  604. self.assertIsInstance(result, bytes)
  605. # Our test filter uppercases, which may not work for all binary data
  606. # but should not crash
  607. except UnicodeDecodeError:
  608. # This might happen with binary data - acceptable
  609. pass
  610. def test_large_file_chunking(self):
  611. """Test proper chunking of large files."""
  612. import sys
  613. driver = ProcessFilterDriver(
  614. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  615. )
  616. # Create data larger than max pkt-line payload (65516 bytes)
  617. large_data = b"a" * 100000
  618. result = driver.clean(large_data)
  619. # Should be properly processed (uppercased)
  620. expected = b"A" * 100000
  621. self.assertEqual(result, expected)
  622. def test_empty_file_handling(self):
  623. """Test handling of empty files."""
  624. import sys
  625. driver = ProcessFilterDriver(
  626. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  627. )
  628. result = driver.clean(b"")
  629. self.assertEqual(result, b"")
  630. def test_special_characters_in_pathname(self):
  631. """Test paths with special characters are handled correctly."""
  632. import sys
  633. driver = ProcessFilterDriver(
  634. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  635. )
  636. # Test various special characters in paths
  637. special_paths = [
  638. b"file with spaces.txt",
  639. b"path/with/slashes.txt",
  640. b"file=with=equals.txt",
  641. b"file\nwith\nnewlines.txt",
  642. ]
  643. test_data = b"test data"
  644. for path in special_paths:
  645. result = driver.smudge(test_data, path)
  646. self.assertEqual(result, b"test data")
  647. def test_process_crash_recovery(self):
  648. """Test that process is properly restarted after crash."""
  649. import sys
  650. driver = ProcessFilterDriver(
  651. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  652. )
  653. # First operation
  654. result = driver.clean(b"test1")
  655. self.assertEqual(result, b"TEST1")
  656. # Kill the process
  657. if driver._process:
  658. driver._process.kill()
  659. driver._process.wait()
  660. driver._cleanup_process()
  661. # Should restart and work again
  662. result = driver.clean(b"test2")
  663. self.assertEqual(result, b"TEST2")
  664. def test_malformed_process_response_handling(self):
  665. """Test handling of malformed responses from process."""
  666. # Create a filter that sends malformed responses
  667. malformed_filter = """#!/usr/bin/env python3
  668. import sys
  669. import os
  670. sys.path.insert(0, os.path.dirname(__file__))
  671. from dulwich.protocol import Protocol
  672. protocol = Protocol(
  673. lambda n: sys.stdin.buffer.read(n),
  674. lambda d: sys.stdout.buffer.write(d) or len(d)
  675. )
  676. # Read handshake
  677. protocol.read_pkt_line()
  678. protocol.read_pkt_line()
  679. protocol.read_pkt_line()
  680. # Send invalid handshake
  681. protocol.write_pkt_line(b"invalid-welcome")
  682. protocol.write_pkt_line(b"version=2")
  683. protocol.write_pkt_line(None)
  684. """
  685. import tempfile
  686. fd, script_path = tempfile.mkstemp(suffix=".py")
  687. try:
  688. os.write(fd, malformed_filter.encode())
  689. os.close(fd)
  690. os.chmod(script_path, 0o755)
  691. driver = ProcessFilterDriver(
  692. process_cmd=f"python3 {script_path}",
  693. clean_cmd="cat", # Fallback
  694. required=False,
  695. )
  696. # Should fallback to clean_cmd when process fails
  697. result = driver.clean(b"test data")
  698. self.assertEqual(result, b"test data")
  699. finally:
  700. os.unlink(script_path)
  701. def test_concurrent_filter_operations(self):
  702. """Test that concurrent operations work correctly."""
  703. import sys
  704. driver = ProcessFilterDriver(
  705. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  706. )
  707. results = []
  708. errors = []
  709. def worker(data):
  710. try:
  711. result = driver.clean(data)
  712. results.append(result)
  713. except Exception as e:
  714. errors.append(e)
  715. # Start 5 concurrent operations
  716. threads = []
  717. test_data = [f"test{i}".encode() for i in range(5)]
  718. for data in test_data:
  719. t = threading.Thread(target=worker, args=(data,))
  720. threads.append(t)
  721. t.start()
  722. for t in threads:
  723. t.join()
  724. # Should have no errors
  725. self.assertEqual(len(errors), 0, f"Errors: {errors}")
  726. self.assertEqual(len(results), 5)
  727. # All results should be uppercase versions
  728. expected = [data.upper() for data in test_data]
  729. self.assertEqual(sorted(results), sorted(expected))
  730. def test_process_resource_cleanup(self):
  731. """Test that process resources are properly cleaned up."""
  732. import sys
  733. driver = ProcessFilterDriver(
  734. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  735. )
  736. # Use the driver
  737. result = driver.clean(b"test")
  738. self.assertEqual(result, b"TEST")
  739. # Process should be running
  740. self.assertIsNotNone(driver._process)
  741. self.assertIsNone(driver._process.poll()) # None means still running
  742. # Remember the old process to check it was terminated
  743. old_process = driver._process
  744. # Manually clean up (simulates __del__)
  745. driver._cleanup_process()
  746. # Process reference should be cleared
  747. self.assertIsNone(driver._process)
  748. self.assertIsNone(driver._protocol)
  749. # Old process should be terminated
  750. self.assertIsNotNone(old_process.poll()) # Not None means terminated
  751. def test_required_filter_error_propagation(self):
  752. """Test that errors are properly propagated when filter is required."""
  753. driver = ProcessFilterDriver(
  754. process_cmd="/definitely/nonexistent/command", required=True
  755. )
  756. with self.assertRaises(FilterError) as cm:
  757. driver.clean(b"test data")
  758. self.assertIn("Failed to start process filter", str(cm.exception))