test_filters.py 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185
  1. # test_filters.py -- Tests for filters
  2. # Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for filters."""
  22. import os
  23. import tempfile
  24. import threading
  25. from dulwich import porcelain
  26. from dulwich.filters import (
  27. FilterContext,
  28. FilterError,
  29. FilterRegistry,
  30. ProcessFilterDriver,
  31. )
  32. from dulwich.repo import Repo
  33. from . import TestCase
  34. class GitAttributesFilterIntegrationTests(TestCase):
  35. """Test gitattributes integration with filter drivers."""
  36. def setUp(self) -> None:
  37. super().setUp()
  38. self.test_dir = tempfile.mkdtemp()
  39. self.addCleanup(self._cleanup_test_dir)
  40. self.repo = Repo.init(self.test_dir)
  41. def _cleanup_test_dir(self) -> None:
  42. """Clean up test directory."""
  43. import shutil
  44. shutil.rmtree(self.test_dir)
  45. def test_gitattributes_text_filter(self) -> None:
  46. """Test that text attribute triggers line ending conversion."""
  47. # Configure autocrlf first
  48. config = self.repo.get_config()
  49. config.set((b"core",), b"autocrlf", b"true")
  50. config.write_to_path()
  51. # Create .gitattributes with text attribute
  52. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  53. with open(gitattributes_path, "wb") as f:
  54. f.write(b"*.txt text\n")
  55. f.write(b"*.bin -text\n")
  56. # Add .gitattributes
  57. porcelain.add(self.repo, paths=[".gitattributes"])
  58. porcelain.commit(self.repo, message=b"Add gitattributes")
  59. # Create text file with CRLF
  60. text_file = os.path.join(self.test_dir, "test.txt")
  61. with open(text_file, "wb") as f:
  62. f.write(b"line1\r\nline2\r\n")
  63. # Create binary file with CRLF
  64. bin_file = os.path.join(self.test_dir, "test.bin")
  65. with open(bin_file, "wb") as f:
  66. f.write(b"binary\r\ndata\r\n")
  67. # Add files
  68. porcelain.add(self.repo, paths=["test.txt", "test.bin"])
  69. # Check that text file was normalized
  70. index = self.repo.open_index()
  71. text_entry = index[b"test.txt"]
  72. text_blob = self.repo.object_store[text_entry.sha]
  73. self.assertEqual(text_blob.data, b"line1\nline2\n")
  74. # Check that binary file was not normalized
  75. bin_entry = index[b"test.bin"]
  76. bin_blob = self.repo.object_store[bin_entry.sha]
  77. self.assertEqual(bin_blob.data, b"binary\r\ndata\r\n")
  78. def test_gitattributes_custom_filter(self) -> None:
  79. """Test custom filter specified in gitattributes."""
  80. # Create a Python script that acts as our filter
  81. import sys
  82. filter_script = os.path.join(self.test_dir, "redact_filter.py")
  83. with open(filter_script, "w") as f:
  84. f.write("""#!/usr/bin/env python3
  85. import sys
  86. data = sys.stdin.buffer.read()
  87. # Replace all digits with X
  88. result = bytearray()
  89. for b in data:
  90. if chr(b).isdigit():
  91. result.append(ord('X'))
  92. else:
  93. result.append(b)
  94. sys.stdout.buffer.write(result)
  95. """)
  96. os.chmod(filter_script, 0o755)
  97. # Create .gitattributes with custom filter
  98. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  99. with open(gitattributes_path, "wb") as f:
  100. f.write(b"*.secret filter=redact\n")
  101. # Configure custom filter (use Python script for testing)
  102. config = self.repo.get_config()
  103. # This filter replaces all digits with X
  104. config.set(
  105. (b"filter", b"redact"),
  106. b"clean",
  107. f"{sys.executable} {filter_script}".encode(),
  108. )
  109. config.write_to_path()
  110. # Add .gitattributes
  111. porcelain.add(self.repo, paths=[".gitattributes"])
  112. # Create file with sensitive content
  113. secret_file = os.path.join(self.test_dir, "password.secret")
  114. with open(secret_file, "wb") as f:
  115. f.write(b"password123\ntoken456\n")
  116. # Add file
  117. porcelain.add(self.repo, paths=["password.secret"])
  118. # Check that content was filtered
  119. index = self.repo.open_index()
  120. entry = index[b"password.secret"]
  121. blob = self.repo.object_store[entry.sha]
  122. self.assertEqual(blob.data, b"passwordXXX\ntokenXXX\n")
  123. def test_gitattributes_from_tree(self) -> None:
  124. """Test that gitattributes from tree are used when no working tree exists."""
  125. # Create .gitattributes with text attribute
  126. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  127. with open(gitattributes_path, "wb") as f:
  128. f.write(b"*.txt text\n")
  129. # Add and commit .gitattributes
  130. porcelain.add(self.repo, paths=[".gitattributes"])
  131. porcelain.commit(self.repo, message=b"Add gitattributes")
  132. # Remove .gitattributes from working tree
  133. os.remove(gitattributes_path)
  134. # Get gitattributes - should still work from tree
  135. gitattributes = self.repo.get_gitattributes()
  136. attrs = gitattributes.match_path(b"test.txt")
  137. self.assertEqual(attrs.get(b"text"), True)
  138. def test_gitattributes_info_attributes(self) -> None:
  139. """Test that .git/info/attributes is read."""
  140. # Create info/attributes
  141. info_dir = os.path.join(self.repo.controldir(), "info")
  142. if not os.path.exists(info_dir):
  143. os.makedirs(info_dir)
  144. info_attrs_path = os.path.join(info_dir, "attributes")
  145. with open(info_attrs_path, "wb") as f:
  146. f.write(b"*.log text\n")
  147. # Get gitattributes
  148. gitattributes = self.repo.get_gitattributes()
  149. attrs = gitattributes.match_path(b"debug.log")
  150. self.assertEqual(attrs.get(b"text"), True)
  151. def test_filter_precedence(self) -> None:
  152. """Test that filter attribute takes precedence over text attribute."""
  153. # Create a Python script that converts to uppercase
  154. import sys
  155. filter_script = os.path.join(self.test_dir, "uppercase_filter.py")
  156. with open(filter_script, "w") as f:
  157. f.write("""#!/usr/bin/env python3
  158. import sys
  159. data = sys.stdin.buffer.read()
  160. # Convert bytes to string, uppercase, then back to bytes
  161. result = data.decode('utf-8', errors='replace').upper().encode('utf-8')
  162. sys.stdout.buffer.write(result)
  163. """)
  164. os.chmod(filter_script, 0o755)
  165. # Create .gitattributes with both text and filter
  166. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  167. with open(gitattributes_path, "wb") as f:
  168. f.write(b"*.txt text filter=custom\n")
  169. # Configure autocrlf and custom filter
  170. config = self.repo.get_config()
  171. config.set((b"core",), b"autocrlf", b"true")
  172. # This filter converts to uppercase
  173. config.set(
  174. (b"filter", b"custom"),
  175. b"clean",
  176. f"{sys.executable} {filter_script}".encode(),
  177. )
  178. config.write_to_path()
  179. # Add .gitattributes
  180. porcelain.add(self.repo, paths=[".gitattributes"])
  181. # Create text file with lowercase and CRLF
  182. text_file = os.path.join(self.test_dir, "test.txt")
  183. with open(text_file, "wb") as f:
  184. f.write(b"hello\r\nworld\r\n")
  185. # Add file
  186. porcelain.add(self.repo, paths=["test.txt"])
  187. # Check that custom filter was applied (not just line ending conversion)
  188. index = self.repo.open_index()
  189. entry = index[b"test.txt"]
  190. blob = self.repo.object_store[entry.sha]
  191. # Should be uppercase with LF endings
  192. self.assertEqual(blob.data, b"HELLO\nWORLD\n")
  193. def test_blob_normalizer_integration(self) -> None:
  194. """Test that get_blob_normalizer returns a FilterBlobNormalizer."""
  195. normalizer = self.repo.get_blob_normalizer()
  196. # Check it's the right type
  197. from dulwich.filters import FilterBlobNormalizer
  198. self.assertIsInstance(normalizer, FilterBlobNormalizer)
  199. # Check it has access to gitattributes
  200. self.assertIsNotNone(normalizer.gitattributes)
  201. self.assertIsNotNone(normalizer.filter_registry)
  202. def test_required_filter_missing(self) -> None:
  203. """Test that missing required filter raises an error."""
  204. # Create .gitattributes with required filter
  205. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  206. with open(gitattributes_path, "wb") as f:
  207. f.write(b"*.secret filter=required_filter\n")
  208. # Configure filter as required but without commands
  209. config = self.repo.get_config()
  210. config.set((b"filter", b"required_filter"), b"required", b"true")
  211. config.write_to_path()
  212. # Add .gitattributes
  213. porcelain.add(self.repo, paths=[".gitattributes"])
  214. # Create file that would use the filter
  215. secret_file = os.path.join(self.test_dir, "test.secret")
  216. with open(secret_file, "wb") as f:
  217. f.write(b"test content\n")
  218. # Adding file should raise error due to missing required filter
  219. with self.assertRaises(FilterError) as cm:
  220. porcelain.add(self.repo, paths=["test.secret"])
  221. self.assertIn(
  222. "Required filter 'required_filter' is not available", str(cm.exception)
  223. )
  224. def test_required_filter_clean_command_fails(self) -> None:
  225. """Test that required filter failure during clean raises an error."""
  226. # Create .gitattributes with required filter
  227. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  228. with open(gitattributes_path, "wb") as f:
  229. f.write(b"*.secret filter=failing_filter\n")
  230. # Configure filter as required with failing command
  231. config = self.repo.get_config()
  232. config.set(
  233. (b"filter", b"failing_filter"), b"clean", b"false"
  234. ) # false command always fails
  235. config.set((b"filter", b"failing_filter"), b"required", b"true")
  236. config.write_to_path()
  237. # Add .gitattributes
  238. porcelain.add(self.repo, paths=[".gitattributes"])
  239. # Create file that would use the filter
  240. secret_file = os.path.join(self.test_dir, "test.secret")
  241. with open(secret_file, "wb") as f:
  242. f.write(b"test content\n")
  243. # Adding file should raise error due to failing required filter
  244. with self.assertRaises(FilterError) as cm:
  245. porcelain.add(self.repo, paths=["test.secret"])
  246. self.assertIn("Required clean filter failed", str(cm.exception))
  247. def test_required_filter_success(self) -> None:
  248. """Test that required filter works when properly configured."""
  249. # Create .gitattributes with required filter
  250. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  251. with open(gitattributes_path, "wb") as f:
  252. f.write(b"*.secret filter=working_filter\n")
  253. # Configure filter as required with working command
  254. config = self.repo.get_config()
  255. config.set(
  256. (b"filter", b"working_filter"), b"clean", b"tr 'a-z' 'A-Z'"
  257. ) # uppercase
  258. config.set((b"filter", b"working_filter"), b"required", b"true")
  259. config.write_to_path()
  260. # Add .gitattributes
  261. porcelain.add(self.repo, paths=[".gitattributes"])
  262. # Create file that would use the filter
  263. secret_file = os.path.join(self.test_dir, "test.secret")
  264. with open(secret_file, "wb") as f:
  265. f.write(b"hello world\n")
  266. # Adding file should work and apply filter
  267. porcelain.add(self.repo, paths=["test.secret"])
  268. # Check that content was filtered
  269. index = self.repo.open_index()
  270. entry = index[b"test.secret"]
  271. blob = self.repo.object_store[entry.sha]
  272. self.assertEqual(blob.data, b"HELLO WORLD\n")
  273. def test_optional_filter_failure_fallback(self) -> None:
  274. """Test that optional filter failure falls back to original data."""
  275. # Create .gitattributes with optional filter
  276. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  277. with open(gitattributes_path, "wb") as f:
  278. f.write(b"*.txt filter=optional_filter\n")
  279. # Configure filter as optional (required=false) with failing command
  280. config = self.repo.get_config()
  281. config.set(
  282. (b"filter", b"optional_filter"), b"clean", b"false"
  283. ) # false command always fails
  284. config.set((b"filter", b"optional_filter"), b"required", b"false")
  285. config.write_to_path()
  286. # Add .gitattributes
  287. porcelain.add(self.repo, paths=[".gitattributes"])
  288. # Create file that would use the filter
  289. test_file = os.path.join(self.test_dir, "test.txt")
  290. with open(test_file, "wb") as f:
  291. f.write(b"test content\n")
  292. # Adding file should work and fallback to original content
  293. porcelain.add(self.repo, paths=["test.txt"])
  294. # Check that original content was preserved
  295. index = self.repo.open_index()
  296. entry = index[b"test.txt"]
  297. blob = self.repo.object_store[entry.sha]
  298. self.assertEqual(blob.data, b"test content\n")
  299. class ProcessFilterDriverTests(TestCase):
  300. """Tests for ProcessFilterDriver with real process filter."""
  301. def setUp(self):
  302. super().setUp()
  303. # Create a temporary test filter process dynamically
  304. self.test_filter_path = self._create_test_filter()
  305. def tearDown(self):
  306. # Clean up the test filter
  307. if hasattr(self, "test_filter_path") and os.path.exists(self.test_filter_path):
  308. os.unlink(self.test_filter_path)
  309. super().tearDown()
  310. def _create_test_filter(self):
  311. """Create a simple test filter process that works on all platforms."""
  312. import tempfile
  313. # Create filter script that uppercases on clean, lowercases on smudge
  314. filter_script = """import sys
  315. import os
  316. # Simple filter that doesn't use any external dependencies
  317. def read_exact(n):
  318. data = b""
  319. while len(data) < n:
  320. chunk = sys.stdin.buffer.read(n - len(data))
  321. if not chunk:
  322. break
  323. data += chunk
  324. return data
  325. def write_pkt(data):
  326. if data is None:
  327. sys.stdout.buffer.write(b"0000")
  328. else:
  329. length = len(data) + 4
  330. sys.stdout.buffer.write(("{:04x}".format(length)).encode())
  331. sys.stdout.buffer.write(data)
  332. sys.stdout.buffer.flush()
  333. def read_pkt():
  334. size_bytes = read_exact(4)
  335. if not size_bytes:
  336. return None
  337. size = int(size_bytes.decode(), 16)
  338. if size == 0:
  339. return None
  340. return read_exact(size - 4)
  341. # Handshake
  342. client_hello = read_pkt()
  343. version = read_pkt()
  344. flush = read_pkt()
  345. write_pkt(b"git-filter-server")
  346. write_pkt(b"version=2")
  347. write_pkt(None)
  348. # Read and echo capabilities
  349. caps = []
  350. while True:
  351. cap = read_pkt()
  352. if cap is None:
  353. break
  354. caps.append(cap)
  355. for cap in caps:
  356. write_pkt(cap)
  357. write_pkt(None)
  358. # Process commands
  359. while True:
  360. headers = {}
  361. while True:
  362. line = read_pkt()
  363. if line is None:
  364. break
  365. if b"=" in line:
  366. k, v = line.split(b"=", 1)
  367. headers[k.decode()] = v.decode()
  368. if not headers:
  369. break
  370. # Read data
  371. data_chunks = []
  372. while True:
  373. chunk = read_pkt()
  374. if chunk is None:
  375. break
  376. data_chunks.append(chunk)
  377. data = b"".join(data_chunks)
  378. # Process (uppercase for clean, lowercase for smudge)
  379. if headers.get("command") == "clean":
  380. result = data.upper()
  381. elif headers.get("command") == "smudge":
  382. result = data.lower()
  383. else:
  384. result = data
  385. # Send response
  386. write_pkt(b"status=success")
  387. write_pkt(None)
  388. # Send result
  389. chunk_size = 65516
  390. for i in range(0, len(result), chunk_size):
  391. write_pkt(result[i:i+chunk_size])
  392. write_pkt(None)
  393. """
  394. # Create temporary file
  395. fd, path = tempfile.mkstemp(suffix=".py", prefix="test_filter_")
  396. try:
  397. os.write(fd, filter_script.encode())
  398. os.close(fd)
  399. # Make executable on Unix-like systems
  400. if os.name != "nt": # Not Windows
  401. os.chmod(path, 0o755)
  402. return path
  403. except:
  404. if os.path.exists(path):
  405. os.unlink(path)
  406. raise
  407. def test_process_filter_clean_operation(self):
  408. """Test clean operation using real process filter."""
  409. import sys
  410. driver = ProcessFilterDriver(
  411. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  412. )
  413. test_data = b"hello world"
  414. result = driver.clean(test_data)
  415. # Our test filter uppercases on clean
  416. self.assertEqual(result, b"HELLO WORLD")
  417. def test_process_filter_smudge_operation(self):
  418. """Test smudge operation using real process filter."""
  419. import sys
  420. driver = ProcessFilterDriver(
  421. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  422. )
  423. test_data = b"HELLO WORLD"
  424. result = driver.smudge(test_data, b"test.txt")
  425. # Our test filter lowercases on smudge
  426. self.assertEqual(result, b"hello world")
  427. def test_process_filter_large_data(self):
  428. """Test process filter with data larger than single pkt-line."""
  429. import sys
  430. driver = ProcessFilterDriver(
  431. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  432. )
  433. # Create data larger than max pkt-line payload (65516 bytes)
  434. test_data = b"a" * 70000
  435. result = driver.clean(test_data)
  436. # Should be uppercased
  437. self.assertEqual(result, b"A" * 70000)
  438. def test_fallback_to_individual_commands(self):
  439. """Test fallback when process filter fails."""
  440. driver = ProcessFilterDriver(
  441. clean_cmd="tr '[:lower:]' '[:upper:]'", # Shell command to uppercase
  442. process_cmd="/nonexistent/command", # This should fail
  443. required=False,
  444. )
  445. test_data = b"hello world\n"
  446. result = driver.clean(test_data)
  447. # Should fallback to tr command and uppercase
  448. self.assertEqual(result, b"HELLO WORLD\n")
  449. def test_process_reuse(self):
  450. """Test that process is reused across multiple operations."""
  451. import sys
  452. driver = ProcessFilterDriver(
  453. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  454. )
  455. # First operation
  456. result1 = driver.clean(b"test1")
  457. self.assertEqual(result1, b"TEST1")
  458. # Second operation should reuse the same process
  459. result2 = driver.clean(b"test2")
  460. self.assertEqual(result2, b"TEST2")
  461. # Process should still be alive
  462. self.assertIsNotNone(driver._process)
  463. self.assertIsNone(driver._process.poll()) # None means still running
  464. def test_error_handling_invalid_command(self):
  465. """Test error handling with invalid filter command."""
  466. driver = ProcessFilterDriver(process_cmd="/nonexistent/command", required=True)
  467. with self.assertRaises(FilterError) as cm:
  468. driver.clean(b"test data")
  469. self.assertIn("Failed to start process filter", str(cm.exception))
  470. class FilterContextTests(TestCase):
  471. """Tests for FilterContext class."""
  472. def test_filter_context_caches_long_running_drivers(self):
  473. """Test that FilterContext caches only long-running drivers."""
  474. # Create real filter drivers
  475. class UppercaseFilter:
  476. def clean(self, data):
  477. return data.upper()
  478. def smudge(self, data, path=b""):
  479. return data.lower()
  480. def cleanup(self):
  481. pass
  482. def reuse(self, config, filter_name):
  483. # Pretend it's a long-running filter that should be cached
  484. return True
  485. class IdentityFilter:
  486. def clean(self, data):
  487. return data
  488. def smudge(self, data, path=b""):
  489. return data
  490. def cleanup(self):
  491. pass
  492. def reuse(self, config, filter_name):
  493. # Lightweight filter, don't cache
  494. return False
  495. # Create registry and context
  496. # Need to provide a config for caching to work
  497. from dulwich.config import ConfigDict
  498. config = ConfigDict()
  499. # Add some dummy config to make it truthy (use proper format)
  500. config.set((b"filter", b"uppercase"), b"clean", b"dummy")
  501. registry = FilterRegistry(config=config)
  502. context = FilterContext(registry)
  503. # Register drivers
  504. long_running = UppercaseFilter()
  505. stateless = IdentityFilter()
  506. registry.register_driver("uppercase", long_running)
  507. registry.register_driver("identity", stateless)
  508. # Get drivers through context
  509. driver1 = context.get_driver("uppercase")
  510. driver2 = context.get_driver("uppercase")
  511. # Long-running driver should be cached
  512. self.assertIs(driver1, driver2)
  513. self.assertIs(driver1, long_running)
  514. # Get stateless driver
  515. stateless1 = context.get_driver("identity")
  516. stateless2 = context.get_driver("identity")
  517. # Stateless driver comes from registry but isn't cached in context
  518. self.assertIs(stateless1, stateless)
  519. self.assertIs(stateless2, stateless)
  520. self.assertNotIn("identity", context._active_drivers)
  521. self.assertIn("uppercase", context._active_drivers)
  522. def test_filter_context_cleanup(self):
  523. """Test that FilterContext properly cleans up resources."""
  524. cleanup_called = []
  525. class TrackableFilter:
  526. def __init__(self, name):
  527. self.name = name
  528. def clean(self, data):
  529. return data
  530. def smudge(self, data, path=b""):
  531. return data
  532. def cleanup(self):
  533. cleanup_called.append(self.name)
  534. def is_long_running(self):
  535. return True
  536. # Create registry and context
  537. registry = FilterRegistry()
  538. context = FilterContext(registry)
  539. # Register and use drivers
  540. filter1 = TrackableFilter("filter1")
  541. filter2 = TrackableFilter("filter2")
  542. filter3 = TrackableFilter("filter3")
  543. registry.register_driver("filter1", filter1)
  544. registry.register_driver("filter2", filter2)
  545. registry.register_driver("filter3", filter3)
  546. # Get only some drivers to cache them
  547. context.get_driver("filter1")
  548. context.get_driver("filter2")
  549. # Don't get filter3
  550. # Close context
  551. context.close()
  552. # Verify cleanup was called for all drivers (context closes registry too)
  553. self.assertEqual(set(cleanup_called), {"filter1", "filter2", "filter3"})
  554. def test_filter_context_get_driver_returns_none_for_missing(self):
  555. """Test that get_driver returns None for non-existent drivers."""
  556. registry = FilterRegistry()
  557. context = FilterContext(registry)
  558. result = context.get_driver("nonexistent")
  559. self.assertIsNone(result)
  560. def test_filter_context_with_real_process_filter(self):
  561. """Test FilterContext with real ProcessFilterDriver instances."""
  562. import sys
  563. # Use existing test filter from ProcessFilterDriverTests
  564. test_dir = tempfile.mkdtemp()
  565. self.addCleanup(lambda: __import__("shutil").rmtree(test_dir))
  566. # Create a simple test filter that just passes data through
  567. filter_script = """import sys
  568. while True:
  569. line = sys.stdin.buffer.read()
  570. if not line:
  571. break
  572. sys.stdout.buffer.write(line)
  573. sys.stdout.buffer.flush()
  574. """
  575. filter_path = os.path.join(test_dir, "simple_filter.py")
  576. with open(filter_path, "w") as f:
  577. f.write(filter_script)
  578. # Create ProcessFilterDriver instances
  579. # One with process_cmd (long-running)
  580. process_driver = ProcessFilterDriver(
  581. process_cmd=None, # Don't use actual process to avoid complexity
  582. clean_cmd=f"{sys.executable} {filter_path}",
  583. smudge_cmd=f"{sys.executable} {filter_path}",
  584. )
  585. # Register in context
  586. from dulwich.config import ConfigDict
  587. config = ConfigDict()
  588. # Add some dummy config to make it truthy (use proper format)
  589. config.set(
  590. (b"filter", b"process"),
  591. b"clean",
  592. f"{sys.executable} {filter_path}".encode(),
  593. )
  594. config.set(
  595. (b"filter", b"process"),
  596. b"smudge",
  597. f"{sys.executable} {filter_path}".encode(),
  598. )
  599. registry = FilterRegistry(config=config)
  600. context = FilterContext(registry)
  601. registry.register_driver("process", process_driver)
  602. # Get driver - should not be cached since it's not long-running
  603. driver1 = context.get_driver("process")
  604. self.assertIsNotNone(driver1)
  605. # Check that it's not a long-running process (no process_cmd)
  606. self.assertIsNone(driver1.process_cmd)
  607. self.assertNotIn("process", context._active_drivers)
  608. # Test with a long-running driver that should be cached
  609. # Create a mock driver that always wants to be reused
  610. class CacheableProcessDriver:
  611. def __init__(self):
  612. self.process_cmd = "dummy"
  613. self.clean_cmd = None
  614. self.smudge_cmd = None
  615. self.required = False
  616. def clean(self, data):
  617. return data
  618. def smudge(self, data, path=b""):
  619. return data
  620. def cleanup(self):
  621. pass
  622. def reuse(self, config, filter_name):
  623. # This driver always wants to be cached (simulates a long-running process)
  624. return True
  625. cacheable_driver = CacheableProcessDriver()
  626. registry.register_driver("long_process", cacheable_driver)
  627. driver2 = context.get_driver("long_process")
  628. # Check that it has a process_cmd (long-running)
  629. self.assertIsNotNone(driver2.process_cmd)
  630. self.assertIn("long_process", context._active_drivers)
  631. context.close()
  632. def test_filter_context_closes_registry(self):
  633. """Test that closing FilterContext also closes the registry."""
  634. # Track if registry.close() is called
  635. registry_closed = []
  636. class TrackingRegistry(FilterRegistry):
  637. def close(self):
  638. registry_closed.append(True)
  639. super().close()
  640. registry = TrackingRegistry()
  641. context = FilterContext(registry)
  642. # Close context should also close registry
  643. context.close()
  644. self.assertTrue(registry_closed)
  645. class ProcessFilterProtocolTests(TestCase):
  646. """Tests for ProcessFilterDriver protocol compliance."""
  647. def setUp(self):
  648. super().setUp()
  649. # Create a spec-compliant test filter process dynamically
  650. self.test_filter_path = self._create_spec_compliant_filter()
  651. def tearDown(self):
  652. # Clean up the test filter
  653. if hasattr(self, "test_filter_path") and os.path.exists(self.test_filter_path):
  654. os.unlink(self.test_filter_path)
  655. super().tearDown()
  656. def _create_spec_compliant_filter(self):
  657. """Create a spec-compliant test filter that works on all platforms."""
  658. import tempfile
  659. # This filter strictly follows Git spec - no newlines in packets
  660. filter_script = """import sys
  661. def read_exact(n):
  662. data = b""
  663. while len(data) < n:
  664. chunk = sys.stdin.buffer.read(n - len(data))
  665. if not chunk:
  666. break
  667. data += chunk
  668. return data
  669. def write_pkt(data):
  670. if data is None:
  671. sys.stdout.buffer.write(b"0000")
  672. else:
  673. length = len(data) + 4
  674. sys.stdout.buffer.write(("{:04x}".format(length)).encode())
  675. sys.stdout.buffer.write(data)
  676. sys.stdout.buffer.flush()
  677. def read_pkt():
  678. size_bytes = read_exact(4)
  679. if not size_bytes:
  680. return None
  681. size = int(size_bytes.decode(), 16)
  682. if size == 0:
  683. return None
  684. return read_exact(size - 4)
  685. # Handshake - exact format, no newlines
  686. client_hello = read_pkt()
  687. version = read_pkt()
  688. flush = read_pkt()
  689. if client_hello != b"git-filter-client":
  690. sys.exit(1)
  691. if version != b"version=2":
  692. sys.exit(1)
  693. write_pkt(b"git-filter-server") # No newline
  694. write_pkt(b"version=2") # No newline
  695. write_pkt(None)
  696. # Read and echo capabilities
  697. caps = []
  698. while True:
  699. cap = read_pkt()
  700. if cap is None:
  701. break
  702. caps.append(cap)
  703. for cap in caps:
  704. if cap in [b"capability=clean", b"capability=smudge"]:
  705. write_pkt(cap)
  706. write_pkt(None)
  707. # Process commands
  708. while True:
  709. headers = {}
  710. while True:
  711. line = read_pkt()
  712. if line is None:
  713. break
  714. if b"=" in line:
  715. k, v = line.split(b"=", 1)
  716. headers[k.decode()] = v.decode()
  717. if not headers:
  718. break
  719. # Read data
  720. data_chunks = []
  721. while True:
  722. chunk = read_pkt()
  723. if chunk is None:
  724. break
  725. data_chunks.append(chunk)
  726. data = b"".join(data_chunks)
  727. # Process
  728. if headers.get("command") == "clean":
  729. result = data.upper()
  730. elif headers.get("command") == "smudge":
  731. result = data.lower()
  732. else:
  733. result = data
  734. # Send response
  735. write_pkt(b"status=success")
  736. write_pkt(None)
  737. # Send result
  738. chunk_size = 65516
  739. for i in range(0, len(result), chunk_size):
  740. write_pkt(result[i:i+chunk_size])
  741. write_pkt(None)
  742. """
  743. fd, path = tempfile.mkstemp(suffix=".py", prefix="test_filter_spec_")
  744. try:
  745. os.write(fd, filter_script.encode())
  746. os.close(fd)
  747. if os.name != "nt": # Not Windows
  748. os.chmod(path, 0o755)
  749. return path
  750. except:
  751. if os.path.exists(path):
  752. os.unlink(path)
  753. raise
  754. def test_protocol_handshake_exact_format(self):
  755. """Test that handshake uses exact format without newlines."""
  756. import sys
  757. driver = ProcessFilterDriver(
  758. process_cmd=f"{sys.executable} {self.test_filter_path}",
  759. required=True, # Require success to test protocol compliance
  760. )
  761. # This should work with exact protocol format
  762. test_data = b"hello world"
  763. result = driver.clean(test_data)
  764. # Our test filter uppercases on clean
  765. self.assertEqual(result, b"HELLO WORLD")
  766. def test_capability_negotiation_exact_format(self):
  767. """Test that capabilities are sent and received in exact format."""
  768. import sys
  769. driver = ProcessFilterDriver(
  770. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  771. )
  772. # Force capability negotiation by using both clean and smudge
  773. clean_result = driver.clean(b"test")
  774. smudge_result = driver.smudge(b"TEST", b"test.txt")
  775. self.assertEqual(clean_result, b"TEST")
  776. self.assertEqual(smudge_result, b"test")
  777. def test_binary_data_handling(self):
  778. """Test handling of binary data through the protocol."""
  779. import sys
  780. driver = ProcessFilterDriver(
  781. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  782. )
  783. # Binary data with null bytes, high bytes, etc.
  784. binary_data = bytes(range(256))
  785. try:
  786. result = driver.clean(binary_data)
  787. # Should handle binary data without crashing
  788. self.assertIsInstance(result, bytes)
  789. # Our test filter uppercases, which may not work for all binary data
  790. # but should not crash
  791. except UnicodeDecodeError:
  792. # This might happen with binary data - acceptable
  793. pass
  794. def test_large_file_chunking(self):
  795. """Test proper chunking of large files."""
  796. import sys
  797. driver = ProcessFilterDriver(
  798. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  799. )
  800. # Create data larger than max pkt-line payload (65516 bytes)
  801. large_data = b"a" * 100000
  802. result = driver.clean(large_data)
  803. # Should be properly processed (uppercased)
  804. expected = b"A" * 100000
  805. self.assertEqual(result, expected)
  806. def test_empty_file_handling(self):
  807. """Test handling of empty files."""
  808. import sys
  809. driver = ProcessFilterDriver(
  810. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  811. )
  812. result = driver.clean(b"")
  813. self.assertEqual(result, b"")
  814. def test_special_characters_in_pathname(self):
  815. """Test paths with special characters are handled correctly."""
  816. import sys
  817. driver = ProcessFilterDriver(
  818. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  819. )
  820. # Test various special characters in paths
  821. special_paths = [
  822. b"file with spaces.txt",
  823. b"path/with/slashes.txt",
  824. b"file=with=equals.txt",
  825. b"file\nwith\nnewlines.txt",
  826. ]
  827. test_data = b"test data"
  828. for path in special_paths:
  829. result = driver.smudge(test_data, path)
  830. self.assertEqual(result, b"test data")
  831. def test_process_crash_recovery(self):
  832. """Test that process is properly restarted after crash."""
  833. import sys
  834. driver = ProcessFilterDriver(
  835. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  836. )
  837. # First operation
  838. result = driver.clean(b"test1")
  839. self.assertEqual(result, b"TEST1")
  840. # Kill the process
  841. if driver._process:
  842. driver._process.kill()
  843. driver._process.wait()
  844. driver.cleanup()
  845. # Should restart and work again
  846. result = driver.clean(b"test2")
  847. self.assertEqual(result, b"TEST2")
  848. def test_malformed_process_response_handling(self):
  849. """Test handling of malformed responses from process."""
  850. # Create a filter that sends malformed responses
  851. malformed_filter = """#!/usr/bin/env python3
  852. import sys
  853. import os
  854. sys.path.insert(0, os.path.dirname(__file__))
  855. from dulwich.protocol import Protocol
  856. protocol = Protocol(
  857. lambda n: sys.stdin.buffer.read(n),
  858. lambda d: sys.stdout.buffer.write(d) or len(d)
  859. )
  860. # Read handshake
  861. protocol.read_pkt_line()
  862. protocol.read_pkt_line()
  863. protocol.read_pkt_line()
  864. # Send invalid handshake
  865. protocol.write_pkt_line(b"invalid-welcome")
  866. protocol.write_pkt_line(b"version=2")
  867. protocol.write_pkt_line(None)
  868. """
  869. import tempfile
  870. fd, script_path = tempfile.mkstemp(suffix=".py")
  871. try:
  872. os.write(fd, malformed_filter.encode())
  873. os.close(fd)
  874. os.chmod(script_path, 0o755)
  875. driver = ProcessFilterDriver(
  876. process_cmd=f"python3 {script_path}",
  877. clean_cmd="cat", # Fallback
  878. required=False,
  879. )
  880. # Should fallback to clean_cmd when process fails
  881. result = driver.clean(b"test data")
  882. self.assertEqual(result, b"test data")
  883. finally:
  884. os.unlink(script_path)
  885. def test_concurrent_filter_operations(self):
  886. """Test that concurrent operations work correctly."""
  887. import sys
  888. driver = ProcessFilterDriver(
  889. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  890. )
  891. results = []
  892. errors = []
  893. def worker(data):
  894. try:
  895. result = driver.clean(data)
  896. results.append(result)
  897. except Exception as e:
  898. errors.append(e)
  899. # Start 5 concurrent operations
  900. threads = []
  901. test_data = [f"test{i}".encode() for i in range(5)]
  902. for data in test_data:
  903. t = threading.Thread(target=worker, args=(data,))
  904. threads.append(t)
  905. t.start()
  906. for t in threads:
  907. t.join()
  908. # Should have no errors
  909. self.assertEqual(len(errors), 0, f"Errors: {errors}")
  910. self.assertEqual(len(results), 5)
  911. # All results should be uppercase versions
  912. expected = [data.upper() for data in test_data]
  913. self.assertEqual(sorted(results), sorted(expected))
  914. def test_process_resource_cleanup(self):
  915. """Test that process resources are properly cleaned up."""
  916. import sys
  917. driver = ProcessFilterDriver(
  918. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  919. )
  920. # Use the driver
  921. result = driver.clean(b"test")
  922. self.assertEqual(result, b"TEST")
  923. # Process should be running
  924. self.assertIsNotNone(driver._process)
  925. self.assertIsNone(driver._process.poll()) # None means still running
  926. # Remember the old process to check it was terminated
  927. old_process = driver._process
  928. # Manually clean up (simulates __del__)
  929. driver.cleanup()
  930. # Process reference should be cleared
  931. self.assertIsNone(driver._process)
  932. self.assertIsNone(driver._protocol)
  933. # Old process should be terminated
  934. self.assertIsNotNone(old_process.poll()) # Not None means terminated
  935. def test_required_filter_error_propagation(self):
  936. """Test that errors are properly propagated when filter is required."""
  937. driver = ProcessFilterDriver(
  938. process_cmd="/definitely/nonexistent/command", required=True
  939. )
  940. with self.assertRaises(FilterError) as cm:
  941. driver.clean(b"test data")
  942. self.assertIn("Failed to start process filter", str(cm.exception))