test_filters.py 36 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105
  1. # test_filters.py -- Tests for filters
  2. # Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for filters."""
  22. import os
  23. import tempfile
  24. import threading
  25. import unittest
  26. from dulwich import porcelain
  27. from dulwich.filters import (
  28. FilterContext,
  29. FilterError,
  30. FilterRegistry,
  31. ProcessFilterDriver,
  32. )
  33. from dulwich.repo import Repo
  34. from . import TestCase
  35. class GitAttributesFilterIntegrationTests(TestCase):
  36. """Test gitattributes integration with filter drivers."""
  37. def setUp(self) -> None:
  38. super().setUp()
  39. self.test_dir = tempfile.mkdtemp()
  40. self.addCleanup(self._cleanup_test_dir)
  41. self.repo = Repo.init(self.test_dir)
  42. def _cleanup_test_dir(self) -> None:
  43. """Clean up test directory."""
  44. import shutil
  45. shutil.rmtree(self.test_dir)
  46. def test_gitattributes_text_filter(self) -> None:
  47. """Test that text attribute triggers line ending conversion."""
  48. # Configure autocrlf first
  49. config = self.repo.get_config()
  50. config.set((b"core",), b"autocrlf", b"true")
  51. config.write_to_path()
  52. # Create .gitattributes with text attribute
  53. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  54. with open(gitattributes_path, "wb") as f:
  55. f.write(b"*.txt text\n")
  56. f.write(b"*.bin -text\n")
  57. # Add .gitattributes
  58. porcelain.add(self.repo, paths=[".gitattributes"])
  59. porcelain.commit(self.repo, message=b"Add gitattributes")
  60. # Create text file with CRLF
  61. text_file = os.path.join(self.test_dir, "test.txt")
  62. with open(text_file, "wb") as f:
  63. f.write(b"line1\r\nline2\r\n")
  64. # Create binary file with CRLF
  65. bin_file = os.path.join(self.test_dir, "test.bin")
  66. with open(bin_file, "wb") as f:
  67. f.write(b"binary\r\ndata\r\n")
  68. # Add files
  69. porcelain.add(self.repo, paths=["test.txt", "test.bin"])
  70. # Check that text file was normalized
  71. index = self.repo.open_index()
  72. text_entry = index[b"test.txt"]
  73. text_blob = self.repo.object_store[text_entry.sha]
  74. self.assertEqual(text_blob.data, b"line1\nline2\n")
  75. # Check that binary file was not normalized
  76. bin_entry = index[b"test.bin"]
  77. bin_blob = self.repo.object_store[bin_entry.sha]
  78. self.assertEqual(bin_blob.data, b"binary\r\ndata\r\n")
  79. @unittest.skip("Custom process filters require external commands")
  80. def test_gitattributes_custom_filter(self) -> None:
  81. """Test custom filter specified in gitattributes."""
  82. # Create .gitattributes with custom filter
  83. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  84. with open(gitattributes_path, "wb") as f:
  85. f.write(b"*.secret filter=redact\n")
  86. # Configure custom filter (use tr command for testing)
  87. config = self.repo.get_config()
  88. # This filter replaces all digits with X
  89. config.set((b"filter", b"redact"), b"clean", b"tr '0-9' 'X'")
  90. config.write_to_path()
  91. # Add .gitattributes
  92. porcelain.add(self.repo, paths=[".gitattributes"])
  93. # Create file with sensitive content
  94. secret_file = os.path.join(self.test_dir, "password.secret")
  95. with open(secret_file, "wb") as f:
  96. f.write(b"password123\ntoken456\n")
  97. # Add file
  98. porcelain.add(self.repo, paths=["password.secret"])
  99. # Check that content was filtered
  100. index = self.repo.open_index()
  101. entry = index[b"password.secret"]
  102. blob = self.repo.object_store[entry.sha]
  103. self.assertEqual(blob.data, b"passwordXXX\ntokenXXX\n")
  104. def test_gitattributes_from_tree(self) -> None:
  105. """Test that gitattributes from tree are used when no working tree exists."""
  106. # Create .gitattributes with text attribute
  107. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  108. with open(gitattributes_path, "wb") as f:
  109. f.write(b"*.txt text\n")
  110. # Add and commit .gitattributes
  111. porcelain.add(self.repo, paths=[".gitattributes"])
  112. porcelain.commit(self.repo, message=b"Add gitattributes")
  113. # Remove .gitattributes from working tree
  114. os.remove(gitattributes_path)
  115. # Get gitattributes - should still work from tree
  116. gitattributes = self.repo.get_gitattributes()
  117. attrs = gitattributes.match_path(b"test.txt")
  118. self.assertEqual(attrs.get(b"text"), True)
  119. def test_gitattributes_info_attributes(self) -> None:
  120. """Test that .git/info/attributes is read."""
  121. # Create info/attributes
  122. info_dir = os.path.join(self.repo.controldir(), "info")
  123. if not os.path.exists(info_dir):
  124. os.makedirs(info_dir)
  125. info_attrs_path = os.path.join(info_dir, "attributes")
  126. with open(info_attrs_path, "wb") as f:
  127. f.write(b"*.log text\n")
  128. # Get gitattributes
  129. gitattributes = self.repo.get_gitattributes()
  130. attrs = gitattributes.match_path(b"debug.log")
  131. self.assertEqual(attrs.get(b"text"), True)
  132. @unittest.skip("Custom process filters require external commands")
  133. def test_filter_precedence(self) -> None:
  134. """Test that filter attribute takes precedence over text attribute."""
  135. # Create .gitattributes with both text and filter
  136. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  137. with open(gitattributes_path, "wb") as f:
  138. f.write(b"*.txt text filter=custom\n")
  139. # Configure autocrlf and custom filter
  140. config = self.repo.get_config()
  141. config.set((b"core",), b"autocrlf", b"true")
  142. # This filter converts to uppercase
  143. config.set((b"filter", b"custom"), b"clean", b"tr '[:lower:]' '[:upper:]'")
  144. config.write_to_path()
  145. # Add .gitattributes
  146. porcelain.add(self.repo, paths=[".gitattributes"])
  147. # Create text file with lowercase and CRLF
  148. text_file = os.path.join(self.test_dir, "test.txt")
  149. with open(text_file, "wb") as f:
  150. f.write(b"hello\r\nworld\r\n")
  151. # Add file
  152. porcelain.add(self.repo, paths=["test.txt"])
  153. # Check that custom filter was applied (not just line ending conversion)
  154. index = self.repo.open_index()
  155. entry = index[b"test.txt"]
  156. blob = self.repo.object_store[entry.sha]
  157. # Should be uppercase with LF endings
  158. self.assertEqual(blob.data, b"HELLO\nWORLD\n")
  159. def test_blob_normalizer_integration(self) -> None:
  160. """Test that get_blob_normalizer returns a FilterBlobNormalizer."""
  161. normalizer = self.repo.get_blob_normalizer()
  162. # Check it's the right type
  163. from dulwich.filters import FilterBlobNormalizer
  164. self.assertIsInstance(normalizer, FilterBlobNormalizer)
  165. # Check it has access to gitattributes
  166. self.assertIsNotNone(normalizer.gitattributes)
  167. self.assertIsNotNone(normalizer.filter_registry)
  168. def test_required_filter_missing(self) -> None:
  169. """Test that missing required filter raises an error."""
  170. # Create .gitattributes with required filter
  171. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  172. with open(gitattributes_path, "wb") as f:
  173. f.write(b"*.secret filter=required_filter\n")
  174. # Configure filter as required but without commands
  175. config = self.repo.get_config()
  176. config.set((b"filter", b"required_filter"), b"required", b"true")
  177. config.write_to_path()
  178. # Add .gitattributes
  179. porcelain.add(self.repo, paths=[".gitattributes"])
  180. # Create file that would use the filter
  181. secret_file = os.path.join(self.test_dir, "test.secret")
  182. with open(secret_file, "wb") as f:
  183. f.write(b"test content\n")
  184. # Adding file should raise error due to missing required filter
  185. with self.assertRaises(FilterError) as cm:
  186. porcelain.add(self.repo, paths=["test.secret"])
  187. self.assertIn(
  188. "Required filter 'required_filter' is not available", str(cm.exception)
  189. )
  190. def test_required_filter_clean_command_fails(self) -> None:
  191. """Test that required filter failure during clean raises an error."""
  192. # Create .gitattributes with required filter
  193. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  194. with open(gitattributes_path, "wb") as f:
  195. f.write(b"*.secret filter=failing_filter\n")
  196. # Configure filter as required with failing command
  197. config = self.repo.get_config()
  198. config.set(
  199. (b"filter", b"failing_filter"), b"clean", b"false"
  200. ) # false command always fails
  201. config.set((b"filter", b"failing_filter"), b"required", b"true")
  202. config.write_to_path()
  203. # Add .gitattributes
  204. porcelain.add(self.repo, paths=[".gitattributes"])
  205. # Create file that would use the filter
  206. secret_file = os.path.join(self.test_dir, "test.secret")
  207. with open(secret_file, "wb") as f:
  208. f.write(b"test content\n")
  209. # Adding file should raise error due to failing required filter
  210. with self.assertRaises(FilterError) as cm:
  211. porcelain.add(self.repo, paths=["test.secret"])
  212. self.assertIn("Required clean filter failed", str(cm.exception))
  213. def test_required_filter_success(self) -> None:
  214. """Test that required filter works when properly configured."""
  215. # Create .gitattributes with required filter
  216. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  217. with open(gitattributes_path, "wb") as f:
  218. f.write(b"*.secret filter=working_filter\n")
  219. # Configure filter as required with working command
  220. config = self.repo.get_config()
  221. config.set(
  222. (b"filter", b"working_filter"), b"clean", b"tr 'a-z' 'A-Z'"
  223. ) # uppercase
  224. config.set((b"filter", b"working_filter"), b"required", b"true")
  225. config.write_to_path()
  226. # Add .gitattributes
  227. porcelain.add(self.repo, paths=[".gitattributes"])
  228. # Create file that would use the filter
  229. secret_file = os.path.join(self.test_dir, "test.secret")
  230. with open(secret_file, "wb") as f:
  231. f.write(b"hello world\n")
  232. # Adding file should work and apply filter
  233. porcelain.add(self.repo, paths=["test.secret"])
  234. # Check that content was filtered
  235. index = self.repo.open_index()
  236. entry = index[b"test.secret"]
  237. blob = self.repo.object_store[entry.sha]
  238. self.assertEqual(blob.data, b"HELLO WORLD\n")
  239. def test_optional_filter_failure_fallback(self) -> None:
  240. """Test that optional filter failure falls back to original data."""
  241. # Create .gitattributes with optional filter
  242. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  243. with open(gitattributes_path, "wb") as f:
  244. f.write(b"*.txt filter=optional_filter\n")
  245. # Configure filter as optional (required=false) with failing command
  246. config = self.repo.get_config()
  247. config.set(
  248. (b"filter", b"optional_filter"), b"clean", b"false"
  249. ) # false command always fails
  250. config.set((b"filter", b"optional_filter"), b"required", b"false")
  251. config.write_to_path()
  252. # Add .gitattributes
  253. porcelain.add(self.repo, paths=[".gitattributes"])
  254. # Create file that would use the filter
  255. test_file = os.path.join(self.test_dir, "test.txt")
  256. with open(test_file, "wb") as f:
  257. f.write(b"test content\n")
  258. # Adding file should work and fallback to original content
  259. porcelain.add(self.repo, paths=["test.txt"])
  260. # Check that original content was preserved
  261. index = self.repo.open_index()
  262. entry = index[b"test.txt"]
  263. blob = self.repo.object_store[entry.sha]
  264. self.assertEqual(blob.data, b"test content\n")
  265. class ProcessFilterDriverTests(TestCase):
  266. """Tests for ProcessFilterDriver with real process filter."""
  267. def setUp(self):
  268. super().setUp()
  269. # Create a temporary test filter process dynamically
  270. self.test_filter_path = self._create_test_filter()
  271. def tearDown(self):
  272. # Clean up the test filter
  273. if hasattr(self, "test_filter_path") and os.path.exists(self.test_filter_path):
  274. os.unlink(self.test_filter_path)
  275. super().tearDown()
  276. def _create_test_filter(self):
  277. """Create a simple test filter process that works on all platforms."""
  278. import tempfile
  279. # Create filter script that uppercases on clean, lowercases on smudge
  280. filter_script = """import sys
  281. import os
  282. # Simple filter that doesn't use any external dependencies
  283. def read_exact(n):
  284. data = b""
  285. while len(data) < n:
  286. chunk = sys.stdin.buffer.read(n - len(data))
  287. if not chunk:
  288. break
  289. data += chunk
  290. return data
  291. def write_pkt(data):
  292. if data is None:
  293. sys.stdout.buffer.write(b"0000")
  294. else:
  295. length = len(data) + 4
  296. sys.stdout.buffer.write(("{:04x}".format(length)).encode())
  297. sys.stdout.buffer.write(data)
  298. sys.stdout.buffer.flush()
  299. def read_pkt():
  300. size_bytes = read_exact(4)
  301. if not size_bytes:
  302. return None
  303. size = int(size_bytes.decode(), 16)
  304. if size == 0:
  305. return None
  306. return read_exact(size - 4)
  307. # Handshake
  308. client_hello = read_pkt()
  309. version = read_pkt()
  310. flush = read_pkt()
  311. write_pkt(b"git-filter-server")
  312. write_pkt(b"version=2")
  313. write_pkt(None)
  314. # Read and echo capabilities
  315. caps = []
  316. while True:
  317. cap = read_pkt()
  318. if cap is None:
  319. break
  320. caps.append(cap)
  321. for cap in caps:
  322. write_pkt(cap)
  323. write_pkt(None)
  324. # Process commands
  325. while True:
  326. headers = {}
  327. while True:
  328. line = read_pkt()
  329. if line is None:
  330. break
  331. if b"=" in line:
  332. k, v = line.split(b"=", 1)
  333. headers[k.decode()] = v.decode()
  334. if not headers:
  335. break
  336. # Read data
  337. data_chunks = []
  338. while True:
  339. chunk = read_pkt()
  340. if chunk is None:
  341. break
  342. data_chunks.append(chunk)
  343. data = b"".join(data_chunks)
  344. # Process (uppercase for clean, lowercase for smudge)
  345. if headers.get("command") == "clean":
  346. result = data.upper()
  347. elif headers.get("command") == "smudge":
  348. result = data.lower()
  349. else:
  350. result = data
  351. # Send response
  352. write_pkt(b"status=success")
  353. write_pkt(None)
  354. # Send result
  355. chunk_size = 65516
  356. for i in range(0, len(result), chunk_size):
  357. write_pkt(result[i:i+chunk_size])
  358. write_pkt(None)
  359. """
  360. # Create temporary file
  361. fd, path = tempfile.mkstemp(suffix=".py", prefix="test_filter_")
  362. try:
  363. os.write(fd, filter_script.encode())
  364. os.close(fd)
  365. # Make executable on Unix-like systems
  366. if os.name != "nt": # Not Windows
  367. os.chmod(path, 0o755)
  368. return path
  369. except:
  370. if os.path.exists(path):
  371. os.unlink(path)
  372. raise
  373. def test_process_filter_clean_operation(self):
  374. """Test clean operation using real process filter."""
  375. import sys
  376. driver = ProcessFilterDriver(
  377. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  378. )
  379. test_data = b"hello world"
  380. result = driver.clean(test_data)
  381. # Our test filter uppercases on clean
  382. self.assertEqual(result, b"HELLO WORLD")
  383. def test_process_filter_smudge_operation(self):
  384. """Test smudge operation using real process filter."""
  385. import sys
  386. driver = ProcessFilterDriver(
  387. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  388. )
  389. test_data = b"HELLO WORLD"
  390. result = driver.smudge(test_data, b"test.txt")
  391. # Our test filter lowercases on smudge
  392. self.assertEqual(result, b"hello world")
  393. def test_process_filter_large_data(self):
  394. """Test process filter with data larger than single pkt-line."""
  395. import sys
  396. driver = ProcessFilterDriver(
  397. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  398. )
  399. # Create data larger than max pkt-line payload (65516 bytes)
  400. test_data = b"a" * 70000
  401. result = driver.clean(test_data)
  402. # Should be uppercased
  403. self.assertEqual(result, b"A" * 70000)
  404. def test_fallback_to_individual_commands(self):
  405. """Test fallback when process filter fails."""
  406. driver = ProcessFilterDriver(
  407. clean_cmd="tr '[:lower:]' '[:upper:]'", # Shell command to uppercase
  408. process_cmd="/nonexistent/command", # This should fail
  409. required=False,
  410. )
  411. test_data = b"hello world\n"
  412. result = driver.clean(test_data)
  413. # Should fallback to tr command and uppercase
  414. self.assertEqual(result, b"HELLO WORLD\n")
  415. def test_process_reuse(self):
  416. """Test that process is reused across multiple operations."""
  417. import sys
  418. driver = ProcessFilterDriver(
  419. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  420. )
  421. # First operation
  422. result1 = driver.clean(b"test1")
  423. self.assertEqual(result1, b"TEST1")
  424. # Second operation should reuse the same process
  425. result2 = driver.clean(b"test2")
  426. self.assertEqual(result2, b"TEST2")
  427. # Process should still be alive
  428. self.assertIsNotNone(driver._process)
  429. self.assertIsNone(driver._process.poll()) # None means still running
  430. def test_error_handling_invalid_command(self):
  431. """Test error handling with invalid filter command."""
  432. driver = ProcessFilterDriver(process_cmd="/nonexistent/command", required=True)
  433. with self.assertRaises(FilterError) as cm:
  434. driver.clean(b"test data")
  435. self.assertIn("Failed to start process filter", str(cm.exception))
  436. class FilterContextTests(TestCase):
  437. """Tests for FilterContext class."""
  438. def test_filter_context_caches_long_running_drivers(self):
  439. """Test that FilterContext caches only long-running drivers."""
  440. # Create real filter drivers
  441. class UppercaseFilter:
  442. def clean(self, data):
  443. return data.upper()
  444. def smudge(self, data, path=b""):
  445. return data.lower()
  446. def cleanup(self):
  447. pass
  448. def reuse(self, config, filter_name):
  449. # Pretend it's a long-running filter that should be cached
  450. return True
  451. class IdentityFilter:
  452. def clean(self, data):
  453. return data
  454. def smudge(self, data, path=b""):
  455. return data
  456. def cleanup(self):
  457. pass
  458. def reuse(self, config, filter_name):
  459. # Lightweight filter, don't cache
  460. return False
  461. # Create registry and context
  462. registry = FilterRegistry()
  463. context = FilterContext(registry)
  464. # Register drivers
  465. long_running = UppercaseFilter()
  466. stateless = IdentityFilter()
  467. registry.register_driver("uppercase", long_running)
  468. registry.register_driver("identity", stateless)
  469. # Get drivers through context
  470. driver1 = context.get_driver("uppercase")
  471. driver2 = context.get_driver("uppercase")
  472. # Long-running driver should be cached
  473. self.assertIs(driver1, driver2)
  474. self.assertIs(driver1, long_running)
  475. # Get stateless driver
  476. stateless1 = context.get_driver("identity")
  477. stateless2 = context.get_driver("identity")
  478. # Stateless driver comes from registry but isn't cached in context
  479. self.assertIs(stateless1, stateless)
  480. self.assertIs(stateless2, stateless)
  481. self.assertNotIn("identity", context._active_drivers)
  482. self.assertIn("uppercase", context._active_drivers)
  483. def test_filter_context_cleanup(self):
  484. """Test that FilterContext properly cleans up resources."""
  485. cleanup_called = []
  486. class TrackableFilter:
  487. def __init__(self, name):
  488. self.name = name
  489. def clean(self, data):
  490. return data
  491. def smudge(self, data, path=b""):
  492. return data
  493. def cleanup(self):
  494. cleanup_called.append(self.name)
  495. def is_long_running(self):
  496. return True
  497. # Create registry and context
  498. registry = FilterRegistry()
  499. context = FilterContext(registry)
  500. # Register and use drivers
  501. filter1 = TrackableFilter("filter1")
  502. filter2 = TrackableFilter("filter2")
  503. filter3 = TrackableFilter("filter3")
  504. registry.register_driver("filter1", filter1)
  505. registry.register_driver("filter2", filter2)
  506. registry.register_driver("filter3", filter3)
  507. # Get only some drivers to cache them
  508. context.get_driver("filter1")
  509. context.get_driver("filter2")
  510. # Don't get filter3
  511. # Close context
  512. context.close()
  513. # Verify cleanup was called for all drivers (context closes registry too)
  514. self.assertEqual(set(cleanup_called), {"filter1", "filter2", "filter3"})
  515. def test_filter_context_get_driver_returns_none_for_missing(self):
  516. """Test that get_driver returns None for non-existent drivers."""
  517. registry = FilterRegistry()
  518. context = FilterContext(registry)
  519. result = context.get_driver("nonexistent")
  520. self.assertIsNone(result)
  521. def test_filter_context_with_real_process_filter(self):
  522. """Test FilterContext with real ProcessFilterDriver instances."""
  523. import sys
  524. # Use existing test filter from ProcessFilterDriverTests
  525. test_dir = tempfile.mkdtemp()
  526. self.addCleanup(lambda: __import__("shutil").rmtree(test_dir))
  527. # Create a simple test filter that just passes data through
  528. filter_script = """import sys
  529. while True:
  530. line = sys.stdin.buffer.read()
  531. if not line:
  532. break
  533. sys.stdout.buffer.write(line)
  534. sys.stdout.buffer.flush()
  535. """
  536. filter_path = os.path.join(test_dir, "simple_filter.py")
  537. with open(filter_path, "w") as f:
  538. f.write(filter_script)
  539. # Create ProcessFilterDriver instances
  540. # One with process_cmd (long-running)
  541. process_driver = ProcessFilterDriver(
  542. process_cmd=None, # Don't use actual process to avoid complexity
  543. clean_cmd=f"{sys.executable} {filter_path}",
  544. smudge_cmd=f"{sys.executable} {filter_path}",
  545. )
  546. # Register in context
  547. registry = FilterRegistry()
  548. context = FilterContext(registry)
  549. registry.register_driver("process", process_driver)
  550. # Get driver - should not be cached since it's not long-running
  551. driver1 = context.get_driver("process")
  552. self.assertIsNotNone(driver1)
  553. self.assertFalse(driver1.is_long_running())
  554. self.assertNotIn("process", context._active_drivers)
  555. # Test with a long-running driver (has process_cmd)
  556. long_process_driver = ProcessFilterDriver()
  557. long_process_driver.process_cmd = "dummy" # Just to make it long-running
  558. registry.register_driver("long_process", long_process_driver)
  559. driver2 = context.get_driver("long_process")
  560. self.assertTrue(driver2.is_long_running())
  561. self.assertIn("long_process", context._active_drivers)
  562. context.close()
  563. def test_filter_context_closes_registry(self):
  564. """Test that closing FilterContext also closes the registry."""
  565. # Track if registry.close() is called
  566. registry_closed = []
  567. class TrackingRegistry(FilterRegistry):
  568. def close(self):
  569. registry_closed.append(True)
  570. super().close()
  571. registry = TrackingRegistry()
  572. context = FilterContext(registry)
  573. # Close context should also close registry
  574. context.close()
  575. self.assertTrue(registry_closed)
  576. class ProcessFilterProtocolTests(TestCase):
  577. """Tests for ProcessFilterDriver protocol compliance."""
  578. def setUp(self):
  579. super().setUp()
  580. # Create a spec-compliant test filter process dynamically
  581. self.test_filter_path = self._create_spec_compliant_filter()
  582. def tearDown(self):
  583. # Clean up the test filter
  584. if hasattr(self, "test_filter_path") and os.path.exists(self.test_filter_path):
  585. os.unlink(self.test_filter_path)
  586. super().tearDown()
  587. def _create_spec_compliant_filter(self):
  588. """Create a spec-compliant test filter that works on all platforms."""
  589. import tempfile
  590. # This filter strictly follows Git spec - no newlines in packets
  591. filter_script = """import sys
  592. def read_exact(n):
  593. data = b""
  594. while len(data) < n:
  595. chunk = sys.stdin.buffer.read(n - len(data))
  596. if not chunk:
  597. break
  598. data += chunk
  599. return data
  600. def write_pkt(data):
  601. if data is None:
  602. sys.stdout.buffer.write(b"0000")
  603. else:
  604. length = len(data) + 4
  605. sys.stdout.buffer.write(("{:04x}".format(length)).encode())
  606. sys.stdout.buffer.write(data)
  607. sys.stdout.buffer.flush()
  608. def read_pkt():
  609. size_bytes = read_exact(4)
  610. if not size_bytes:
  611. return None
  612. size = int(size_bytes.decode(), 16)
  613. if size == 0:
  614. return None
  615. return read_exact(size - 4)
  616. # Handshake - exact format, no newlines
  617. client_hello = read_pkt()
  618. version = read_pkt()
  619. flush = read_pkt()
  620. if client_hello != b"git-filter-client":
  621. sys.exit(1)
  622. if version != b"version=2":
  623. sys.exit(1)
  624. write_pkt(b"git-filter-server") # No newline
  625. write_pkt(b"version=2") # No newline
  626. write_pkt(None)
  627. # Read and echo capabilities
  628. caps = []
  629. while True:
  630. cap = read_pkt()
  631. if cap is None:
  632. break
  633. caps.append(cap)
  634. for cap in caps:
  635. if cap in [b"capability=clean", b"capability=smudge"]:
  636. write_pkt(cap)
  637. write_pkt(None)
  638. # Process commands
  639. while True:
  640. headers = {}
  641. while True:
  642. line = read_pkt()
  643. if line is None:
  644. break
  645. if b"=" in line:
  646. k, v = line.split(b"=", 1)
  647. headers[k.decode()] = v.decode()
  648. if not headers:
  649. break
  650. # Read data
  651. data_chunks = []
  652. while True:
  653. chunk = read_pkt()
  654. if chunk is None:
  655. break
  656. data_chunks.append(chunk)
  657. data = b"".join(data_chunks)
  658. # Process
  659. if headers.get("command") == "clean":
  660. result = data.upper()
  661. elif headers.get("command") == "smudge":
  662. result = data.lower()
  663. else:
  664. result = data
  665. # Send response
  666. write_pkt(b"status=success")
  667. write_pkt(None)
  668. # Send result
  669. chunk_size = 65516
  670. for i in range(0, len(result), chunk_size):
  671. write_pkt(result[i:i+chunk_size])
  672. write_pkt(None)
  673. """
  674. fd, path = tempfile.mkstemp(suffix=".py", prefix="test_filter_spec_")
  675. try:
  676. os.write(fd, filter_script.encode())
  677. os.close(fd)
  678. if os.name != "nt": # Not Windows
  679. os.chmod(path, 0o755)
  680. return path
  681. except:
  682. if os.path.exists(path):
  683. os.unlink(path)
  684. raise
  685. def test_protocol_handshake_exact_format(self):
  686. """Test that handshake uses exact format without newlines."""
  687. import sys
  688. driver = ProcessFilterDriver(
  689. process_cmd=f"{sys.executable} {self.test_filter_path}",
  690. required=True, # Require success to test protocol compliance
  691. )
  692. # This should work with exact protocol format
  693. test_data = b"hello world"
  694. result = driver.clean(test_data)
  695. # Our test filter uppercases on clean
  696. self.assertEqual(result, b"HELLO WORLD")
  697. def test_capability_negotiation_exact_format(self):
  698. """Test that capabilities are sent and received in exact format."""
  699. import sys
  700. driver = ProcessFilterDriver(
  701. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  702. )
  703. # Force capability negotiation by using both clean and smudge
  704. clean_result = driver.clean(b"test")
  705. smudge_result = driver.smudge(b"TEST", b"test.txt")
  706. self.assertEqual(clean_result, b"TEST")
  707. self.assertEqual(smudge_result, b"test")
  708. def test_binary_data_handling(self):
  709. """Test handling of binary data through the protocol."""
  710. import sys
  711. driver = ProcessFilterDriver(
  712. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  713. )
  714. # Binary data with null bytes, high bytes, etc.
  715. binary_data = bytes(range(256))
  716. try:
  717. result = driver.clean(binary_data)
  718. # Should handle binary data without crashing
  719. self.assertIsInstance(result, bytes)
  720. # Our test filter uppercases, which may not work for all binary data
  721. # but should not crash
  722. except UnicodeDecodeError:
  723. # This might happen with binary data - acceptable
  724. pass
  725. def test_large_file_chunking(self):
  726. """Test proper chunking of large files."""
  727. import sys
  728. driver = ProcessFilterDriver(
  729. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  730. )
  731. # Create data larger than max pkt-line payload (65516 bytes)
  732. large_data = b"a" * 100000
  733. result = driver.clean(large_data)
  734. # Should be properly processed (uppercased)
  735. expected = b"A" * 100000
  736. self.assertEqual(result, expected)
  737. def test_empty_file_handling(self):
  738. """Test handling of empty files."""
  739. import sys
  740. driver = ProcessFilterDriver(
  741. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  742. )
  743. result = driver.clean(b"")
  744. self.assertEqual(result, b"")
  745. def test_special_characters_in_pathname(self):
  746. """Test paths with special characters are handled correctly."""
  747. import sys
  748. driver = ProcessFilterDriver(
  749. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  750. )
  751. # Test various special characters in paths
  752. special_paths = [
  753. b"file with spaces.txt",
  754. b"path/with/slashes.txt",
  755. b"file=with=equals.txt",
  756. b"file\nwith\nnewlines.txt",
  757. ]
  758. test_data = b"test data"
  759. for path in special_paths:
  760. result = driver.smudge(test_data, path)
  761. self.assertEqual(result, b"test data")
  762. def test_process_crash_recovery(self):
  763. """Test that process is properly restarted after crash."""
  764. import sys
  765. driver = ProcessFilterDriver(
  766. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  767. )
  768. # First operation
  769. result = driver.clean(b"test1")
  770. self.assertEqual(result, b"TEST1")
  771. # Kill the process
  772. if driver._process:
  773. driver._process.kill()
  774. driver._process.wait()
  775. driver.cleanup()
  776. # Should restart and work again
  777. result = driver.clean(b"test2")
  778. self.assertEqual(result, b"TEST2")
  779. def test_malformed_process_response_handling(self):
  780. """Test handling of malformed responses from process."""
  781. # Create a filter that sends malformed responses
  782. malformed_filter = """#!/usr/bin/env python3
  783. import sys
  784. import os
  785. sys.path.insert(0, os.path.dirname(__file__))
  786. from dulwich.protocol import Protocol
  787. protocol = Protocol(
  788. lambda n: sys.stdin.buffer.read(n),
  789. lambda d: sys.stdout.buffer.write(d) or len(d)
  790. )
  791. # Read handshake
  792. protocol.read_pkt_line()
  793. protocol.read_pkt_line()
  794. protocol.read_pkt_line()
  795. # Send invalid handshake
  796. protocol.write_pkt_line(b"invalid-welcome")
  797. protocol.write_pkt_line(b"version=2")
  798. protocol.write_pkt_line(None)
  799. """
  800. import tempfile
  801. fd, script_path = tempfile.mkstemp(suffix=".py")
  802. try:
  803. os.write(fd, malformed_filter.encode())
  804. os.close(fd)
  805. os.chmod(script_path, 0o755)
  806. driver = ProcessFilterDriver(
  807. process_cmd=f"python3 {script_path}",
  808. clean_cmd="cat", # Fallback
  809. required=False,
  810. )
  811. # Should fallback to clean_cmd when process fails
  812. result = driver.clean(b"test data")
  813. self.assertEqual(result, b"test data")
  814. finally:
  815. os.unlink(script_path)
  816. def test_concurrent_filter_operations(self):
  817. """Test that concurrent operations work correctly."""
  818. import sys
  819. driver = ProcessFilterDriver(
  820. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  821. )
  822. results = []
  823. errors = []
  824. def worker(data):
  825. try:
  826. result = driver.clean(data)
  827. results.append(result)
  828. except Exception as e:
  829. errors.append(e)
  830. # Start 5 concurrent operations
  831. threads = []
  832. test_data = [f"test{i}".encode() for i in range(5)]
  833. for data in test_data:
  834. t = threading.Thread(target=worker, args=(data,))
  835. threads.append(t)
  836. t.start()
  837. for t in threads:
  838. t.join()
  839. # Should have no errors
  840. self.assertEqual(len(errors), 0, f"Errors: {errors}")
  841. self.assertEqual(len(results), 5)
  842. # All results should be uppercase versions
  843. expected = [data.upper() for data in test_data]
  844. self.assertEqual(sorted(results), sorted(expected))
  845. def test_process_resource_cleanup(self):
  846. """Test that process resources are properly cleaned up."""
  847. import sys
  848. driver = ProcessFilterDriver(
  849. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  850. )
  851. # Use the driver
  852. result = driver.clean(b"test")
  853. self.assertEqual(result, b"TEST")
  854. # Process should be running
  855. self.assertIsNotNone(driver._process)
  856. self.assertIsNone(driver._process.poll()) # None means still running
  857. # Remember the old process to check it was terminated
  858. old_process = driver._process
  859. # Manually clean up (simulates __del__)
  860. driver.cleanup()
  861. # Process reference should be cleared
  862. self.assertIsNone(driver._process)
  863. self.assertIsNone(driver._protocol)
  864. # Old process should be terminated
  865. self.assertIsNotNone(old_process.poll()) # Not None means terminated
  866. def test_required_filter_error_propagation(self):
  867. """Test that errors are properly propagated when filter is required."""
  868. driver = ProcessFilterDriver(
  869. process_cmd="/definitely/nonexistent/command", required=True
  870. )
  871. with self.assertRaises(FilterError) as cm:
  872. driver.clean(b"test data")
  873. self.assertIn("Failed to start process filter", str(cm.exception))