test_filters.py 50 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658
  1. # test_filters.py -- Tests for filters
  2. # Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for filters."""
  22. import os
  23. import shutil
  24. import sys
  25. import tempfile
  26. import threading
  27. from collections.abc import Iterator
  28. from contextlib import contextmanager
  29. from dulwich.filters import (
  30. FilterContext,
  31. FilterError,
  32. FilterRegistry,
  33. ProcessFilterDriver,
  34. )
  35. from dulwich.repo import Repo
  36. from . import TestCase
  37. class GitAttributesFilterIntegrationTests(TestCase):
  38. """Test gitattributes integration with filter drivers."""
  39. def setUp(self) -> None:
  40. super().setUp()
  41. self.test_dir = tempfile.mkdtemp()
  42. self.addCleanup(self._cleanup_test_dir)
  43. self.repo = Repo.init(self.test_dir)
  44. def _cleanup_test_dir(self) -> None:
  45. """Clean up test directory."""
  46. import shutil
  47. shutil.rmtree(self.test_dir)
  48. def test_gitattributes_text_filter(self) -> None:
  49. """Test that text attribute triggers line ending conversion."""
  50. # Configure autocrlf first
  51. config = self.repo.get_config()
  52. config.set((b"core",), b"autocrlf", b"true")
  53. config.write_to_path()
  54. # Create .gitattributes with text attribute
  55. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  56. with open(gitattributes_path, "wb") as f:
  57. f.write(b"*.txt text\n")
  58. f.write(b"*.bin -text\n")
  59. # Add .gitattributes
  60. worktree = self.repo.get_worktree()
  61. worktree.stage([".gitattributes"])
  62. worktree.commit(
  63. message=b"Add gitattributes",
  64. committer=b"Test <test@example.com>",
  65. author=b"Test <test@example.com>",
  66. commit_timestamp=1000000000,
  67. author_timestamp=1000000000,
  68. commit_timezone=0,
  69. author_timezone=0,
  70. )
  71. # Create text file with CRLF
  72. text_file = os.path.join(self.test_dir, "test.txt")
  73. with open(text_file, "wb") as f:
  74. f.write(b"line1\r\nline2\r\n")
  75. # Create binary file with CRLF
  76. bin_file = os.path.join(self.test_dir, "test.bin")
  77. with open(bin_file, "wb") as f:
  78. f.write(b"binary\r\ndata\r\n")
  79. # Add files
  80. worktree.stage(["test.txt", "test.bin"])
  81. # Check that text file was normalized
  82. index = self.repo.open_index()
  83. text_entry = index[b"test.txt"]
  84. text_blob = self.repo.object_store[text_entry.sha]
  85. self.assertEqual(text_blob.data, b"line1\nline2\n")
  86. # Check that binary file was not normalized
  87. bin_entry = index[b"test.bin"]
  88. bin_blob = self.repo.object_store[bin_entry.sha]
  89. self.assertEqual(bin_blob.data, b"binary\r\ndata\r\n")
  90. def test_gitattributes_custom_filter(self) -> None:
  91. """Test custom filter specified in gitattributes."""
  92. # Create a Python script that acts as our filter
  93. import sys
  94. filter_script = os.path.join(self.test_dir, "redact_filter.py")
  95. with open(filter_script, "w") as f:
  96. f.write(
  97. """#!/usr/bin/env python3
  98. import sys
  99. data = sys.stdin.buffer.read()
  100. # Replace all digits with X
  101. result = bytearray()
  102. for b in data:
  103. if chr(b).isdigit():
  104. result.append(ord('X'))
  105. else:
  106. result.append(b)
  107. sys.stdout.buffer.write(result)
  108. """
  109. )
  110. os.chmod(filter_script, 0o755)
  111. # Create .gitattributes with custom filter
  112. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  113. with open(gitattributes_path, "wb") as f:
  114. f.write(b"*.secret filter=redact\n")
  115. # Configure custom filter (use Python script for testing)
  116. config = self.repo.get_config()
  117. # This filter replaces all digits with X
  118. config.set(
  119. (b"filter", b"redact"),
  120. b"clean",
  121. f"{sys.executable} {filter_script}".encode(),
  122. )
  123. config.write_to_path()
  124. # Add .gitattributes
  125. worktree = self.repo.get_worktree()
  126. worktree.stage([".gitattributes"])
  127. # Create file with sensitive content
  128. secret_file = os.path.join(self.test_dir, "password.secret")
  129. with open(secret_file, "wb") as f:
  130. f.write(b"password123\ntoken456\n")
  131. # Add file
  132. worktree.stage(["password.secret"])
  133. # Check that content was filtered
  134. index = self.repo.open_index()
  135. entry = index[b"password.secret"]
  136. blob = self.repo.object_store[entry.sha]
  137. self.assertEqual(blob.data, b"passwordXXX\ntokenXXX\n")
  138. def test_gitattributes_from_tree(self) -> None:
  139. """Test that gitattributes from tree are used when no working tree exists."""
  140. # Create .gitattributes with text attribute
  141. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  142. with open(gitattributes_path, "wb") as f:
  143. f.write(b"*.txt text\n")
  144. # Add and commit .gitattributes
  145. worktree = self.repo.get_worktree()
  146. worktree.stage([".gitattributes"])
  147. worktree.commit(
  148. message=b"Add gitattributes",
  149. committer=b"Test <test@example.com>",
  150. author=b"Test <test@example.com>",
  151. commit_timestamp=1000000000,
  152. author_timestamp=1000000000,
  153. commit_timezone=0,
  154. author_timezone=0,
  155. )
  156. # Remove .gitattributes from working tree
  157. os.remove(gitattributes_path)
  158. # Get gitattributes - should still work from tree
  159. gitattributes = self.repo.get_gitattributes()
  160. attrs = gitattributes.match_path(b"test.txt")
  161. self.assertEqual(attrs.get(b"text"), True)
  162. def test_gitattributes_info_attributes(self) -> None:
  163. """Test that .git/info/attributes is read."""
  164. # Create info/attributes
  165. info_dir = os.path.join(self.repo.controldir(), "info")
  166. if not os.path.exists(info_dir):
  167. os.makedirs(info_dir)
  168. info_attrs_path = os.path.join(info_dir, "attributes")
  169. with open(info_attrs_path, "wb") as f:
  170. f.write(b"*.log text\n")
  171. # Get gitattributes
  172. gitattributes = self.repo.get_gitattributes()
  173. attrs = gitattributes.match_path(b"debug.log")
  174. self.assertEqual(attrs.get(b"text"), True)
  175. def test_filter_precedence(self) -> None:
  176. """Test that filter attribute takes precedence over text attribute."""
  177. # Create a Python script that converts to uppercase
  178. import sys
  179. filter_script = os.path.join(self.test_dir, "uppercase_filter.py")
  180. with open(filter_script, "w") as f:
  181. f.write(
  182. """#!/usr/bin/env python3
  183. import sys
  184. data = sys.stdin.buffer.read()
  185. # Convert bytes to string, uppercase, then back to bytes
  186. result = data.decode('utf-8', errors='replace').upper().encode('utf-8')
  187. sys.stdout.buffer.write(result)
  188. """
  189. )
  190. os.chmod(filter_script, 0o755)
  191. # Create .gitattributes with both text and filter
  192. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  193. with open(gitattributes_path, "wb") as f:
  194. f.write(b"*.txt text filter=custom\n")
  195. # Configure autocrlf and custom filter
  196. config = self.repo.get_config()
  197. config.set((b"core",), b"autocrlf", b"true")
  198. # This filter converts to uppercase
  199. config.set(
  200. (b"filter", b"custom"),
  201. b"clean",
  202. f"{sys.executable} {filter_script}".encode(),
  203. )
  204. config.write_to_path()
  205. # Add .gitattributes
  206. worktree = self.repo.get_worktree()
  207. worktree.stage([".gitattributes"])
  208. # Create text file with lowercase and CRLF
  209. text_file = os.path.join(self.test_dir, "test.txt")
  210. with open(text_file, "wb") as f:
  211. f.write(b"hello\r\nworld\r\n")
  212. # Add file
  213. worktree.stage(["test.txt"])
  214. # Check that custom filter was applied (not just line ending conversion)
  215. index = self.repo.open_index()
  216. entry = index[b"test.txt"]
  217. blob = self.repo.object_store[entry.sha]
  218. # Should be uppercase with LF endings
  219. self.assertEqual(blob.data, b"HELLO\nWORLD\n")
  220. def test_blob_normalizer_integration(self) -> None:
  221. """Test that get_blob_normalizer returns a FilterBlobNormalizer."""
  222. normalizer = self.repo.get_blob_normalizer()
  223. # Check it's the right type
  224. from dulwich.filters import FilterBlobNormalizer
  225. self.assertIsInstance(normalizer, FilterBlobNormalizer)
  226. # Check it has access to gitattributes
  227. self.assertIsNotNone(normalizer.gitattributes)
  228. self.assertIsNotNone(normalizer.filter_registry)
  229. def test_required_filter_missing(self) -> None:
  230. """Test that missing required filter raises an error."""
  231. # Create .gitattributes with required filter
  232. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  233. with open(gitattributes_path, "wb") as f:
  234. f.write(b"*.secret filter=required_filter\n")
  235. # Configure filter as required but without commands
  236. config = self.repo.get_config()
  237. config.set((b"filter", b"required_filter"), b"required", b"true")
  238. config.write_to_path()
  239. # Add .gitattributes
  240. worktree = self.repo.get_worktree()
  241. worktree.stage([".gitattributes"])
  242. # Create file that would use the filter
  243. secret_file = os.path.join(self.test_dir, "test.secret")
  244. with open(secret_file, "wb") as f:
  245. f.write(b"test content\n")
  246. # Adding file should raise error due to missing required filter
  247. with self.assertRaises(FilterError) as cm:
  248. worktree.stage(["test.secret"])
  249. self.assertIn(
  250. "Required filter 'required_filter' is not available", str(cm.exception)
  251. )
  252. def test_required_filter_clean_command_fails(self) -> None:
  253. """Test that required filter failure during clean raises an error."""
  254. # Create .gitattributes with required filter
  255. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  256. with open(gitattributes_path, "wb") as f:
  257. f.write(b"*.secret filter=failing_filter\n")
  258. # Configure filter as required with failing command
  259. config = self.repo.get_config()
  260. config.set(
  261. (b"filter", b"failing_filter"), b"clean", b"false"
  262. ) # false command always fails
  263. config.set((b"filter", b"failing_filter"), b"required", b"true")
  264. config.write_to_path()
  265. # Add .gitattributes
  266. worktree = self.repo.get_worktree()
  267. worktree.stage([".gitattributes"])
  268. # Create file that would use the filter
  269. secret_file = os.path.join(self.test_dir, "test.secret")
  270. with open(secret_file, "wb") as f:
  271. f.write(b"test content\n")
  272. # Adding file should raise error due to failing required filter
  273. with self.assertRaises(FilterError) as cm:
  274. worktree.stage(["test.secret"])
  275. self.assertIn("Required clean filter failed", str(cm.exception))
  276. def test_required_filter_success(self) -> None:
  277. """Test that required filter works when properly configured."""
  278. # Create .gitattributes with required filter
  279. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  280. with open(gitattributes_path, "wb") as f:
  281. f.write(b"*.secret filter=working_filter\n")
  282. # Configure filter as required with working command
  283. config = self.repo.get_config()
  284. config.set(
  285. (b"filter", b"working_filter"), b"clean", b"tr 'a-z' 'A-Z'"
  286. ) # uppercase
  287. config.set((b"filter", b"working_filter"), b"required", b"true")
  288. config.write_to_path()
  289. # Add .gitattributes
  290. worktree = self.repo.get_worktree()
  291. worktree.stage([".gitattributes"])
  292. # Create file that would use the filter
  293. secret_file = os.path.join(self.test_dir, "test.secret")
  294. with open(secret_file, "wb") as f:
  295. f.write(b"hello world\n")
  296. # Adding file should work and apply filter
  297. worktree.stage(["test.secret"])
  298. # Check that content was filtered
  299. index = self.repo.open_index()
  300. entry = index[b"test.secret"]
  301. blob = self.repo.object_store[entry.sha]
  302. self.assertEqual(blob.data, b"HELLO WORLD\n")
  303. def test_optional_filter_failure_fallback(self) -> None:
  304. """Test that optional filter failure falls back to original data."""
  305. # Create .gitattributes with optional filter
  306. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  307. with open(gitattributes_path, "wb") as f:
  308. f.write(b"*.txt filter=optional_filter\n")
  309. # Configure filter as optional (required=false) with failing command
  310. config = self.repo.get_config()
  311. config.set(
  312. (b"filter", b"optional_filter"), b"clean", b"false"
  313. ) # false command always fails
  314. config.set((b"filter", b"optional_filter"), b"required", b"false")
  315. config.write_to_path()
  316. # Add .gitattributes
  317. worktree = self.repo.get_worktree()
  318. worktree.stage([".gitattributes"])
  319. # Create file that would use the filter
  320. test_file = os.path.join(self.test_dir, "test.txt")
  321. with open(test_file, "wb") as f:
  322. f.write(b"test content\n")
  323. # Adding file should work and fallback to original content
  324. worktree.stage(["test.txt"])
  325. # Check that original content was preserved
  326. index = self.repo.open_index()
  327. entry = index[b"test.txt"]
  328. blob = self.repo.object_store[entry.sha]
  329. self.assertEqual(blob.data, b"test content\n")
  330. class ProcessFilterDriverTests(TestCase):
  331. """Tests for ProcessFilterDriver with real process filter."""
  332. def setUp(self):
  333. super().setUp()
  334. # Create a temporary test filter process dynamically
  335. self.test_filter_path = self._create_test_filter()
  336. def tearDown(self):
  337. # Clean up the test filter
  338. if hasattr(self, "test_filter_path") and os.path.exists(self.test_filter_path):
  339. os.unlink(self.test_filter_path)
  340. super().tearDown()
  341. def _create_test_filter(self):
  342. """Create a simple test filter process that works on all platforms."""
  343. import tempfile
  344. # Create filter script that uppercases on clean, lowercases on smudge
  345. filter_script = """import sys
  346. import os
  347. # Simple filter that doesn't use any external dependencies
  348. def read_exact(n):
  349. data = b""
  350. while len(data) < n:
  351. chunk = sys.stdin.buffer.read(n - len(data))
  352. if not chunk:
  353. break
  354. data += chunk
  355. return data
  356. def write_pkt(data):
  357. if data is None:
  358. sys.stdout.buffer.write(b"0000")
  359. else:
  360. length = len(data) + 4
  361. sys.stdout.buffer.write(("{:04x}".format(length)).encode())
  362. sys.stdout.buffer.write(data)
  363. sys.stdout.buffer.flush()
  364. def read_pkt():
  365. size_bytes = read_exact(4)
  366. if not size_bytes:
  367. return None
  368. size = int(size_bytes.decode(), 16)
  369. if size == 0:
  370. return None
  371. return read_exact(size - 4)
  372. # Handshake
  373. client_hello = read_pkt()
  374. version = read_pkt()
  375. flush = read_pkt()
  376. write_pkt(b"git-filter-server")
  377. write_pkt(b"version=2")
  378. write_pkt(None)
  379. # Read and echo capabilities
  380. caps = []
  381. while True:
  382. cap = read_pkt()
  383. if cap is None:
  384. break
  385. caps.append(cap)
  386. for cap in caps:
  387. write_pkt(cap)
  388. write_pkt(None)
  389. # Process commands
  390. while True:
  391. headers = {}
  392. while True:
  393. line = read_pkt()
  394. if line is None:
  395. break
  396. if b"=" in line:
  397. k, v = line.split(b"=", 1)
  398. headers[k.decode()] = v.decode()
  399. if not headers:
  400. break
  401. # Read data
  402. data_chunks = []
  403. while True:
  404. chunk = read_pkt()
  405. if chunk is None:
  406. break
  407. data_chunks.append(chunk)
  408. data = b"".join(data_chunks)
  409. # Process (uppercase for clean, lowercase for smudge)
  410. if headers.get("command") == "clean":
  411. result = data.upper()
  412. elif headers.get("command") == "smudge":
  413. result = data.lower()
  414. else:
  415. result = data
  416. # Send response
  417. write_pkt(b"status=success")
  418. write_pkt(None)
  419. # Send result
  420. chunk_size = 65516
  421. for i in range(0, len(result), chunk_size):
  422. write_pkt(result[i:i+chunk_size])
  423. write_pkt(None)
  424. # Send final headers (empty list to keep status=success)
  425. write_pkt(None)
  426. """
  427. # Create temporary file
  428. fd, path = tempfile.mkstemp(suffix=".py", prefix="test_filter_")
  429. try:
  430. os.write(fd, filter_script.encode())
  431. os.close(fd)
  432. # Make executable on Unix-like systems
  433. if os.name != "nt": # Not Windows
  434. os.chmod(path, 0o755)
  435. return path
  436. except:
  437. if os.path.exists(path):
  438. os.unlink(path)
  439. raise
  440. def test_process_filter_clean_operation(self):
  441. """Test clean operation using real process filter."""
  442. import sys
  443. driver = ProcessFilterDriver(
  444. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  445. )
  446. test_data = b"hello world"
  447. result = driver.clean(test_data)
  448. # Our test filter uppercases on clean
  449. self.assertEqual(result, b"HELLO WORLD")
  450. def test_process_filter_smudge_operation(self):
  451. """Test smudge operation using real process filter."""
  452. import sys
  453. driver = ProcessFilterDriver(
  454. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  455. )
  456. test_data = b"HELLO WORLD"
  457. result = driver.smudge(test_data, b"test.txt")
  458. # Our test filter lowercases on smudge
  459. self.assertEqual(result, b"hello world")
  460. def test_process_filter_large_data(self):
  461. """Test process filter with data larger than single pkt-line."""
  462. import sys
  463. driver = ProcessFilterDriver(
  464. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  465. )
  466. # Create data larger than max pkt-line payload (65516 bytes)
  467. test_data = b"a" * 70000
  468. result = driver.clean(test_data)
  469. # Should be uppercased
  470. self.assertEqual(result, b"A" * 70000)
  471. def test_fallback_to_individual_commands(self):
  472. """Test fallback when process filter fails."""
  473. driver = ProcessFilterDriver(
  474. clean_cmd="tr '[:lower:]' '[:upper:]'", # Shell command to uppercase
  475. process_cmd="/nonexistent/command", # This should fail
  476. required=False,
  477. )
  478. test_data = b"hello world\n"
  479. result = driver.clean(test_data)
  480. # Should fallback to tr command and uppercase
  481. self.assertEqual(result, b"HELLO WORLD\n")
  482. def test_process_reuse(self):
  483. """Test that process is reused across multiple operations."""
  484. import sys
  485. driver = ProcessFilterDriver(
  486. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  487. )
  488. # First operation
  489. result1 = driver.clean(b"test1")
  490. self.assertEqual(result1, b"TEST1")
  491. # Second operation should reuse the same process
  492. result2 = driver.clean(b"test2")
  493. self.assertEqual(result2, b"TEST2")
  494. # Process should still be alive
  495. self.assertIsNotNone(driver._process)
  496. self.assertIsNone(driver._process.poll()) # None means still running
  497. def test_error_handling_invalid_command(self):
  498. """Test error handling with invalid filter command."""
  499. driver = ProcessFilterDriver(process_cmd="/nonexistent/command", required=True)
  500. with self.assertRaises(FilterError) as cm:
  501. driver.clean(b"test data")
  502. self.assertIn("Failed to start process filter", str(cm.exception))
  503. class FilterContextTests(TestCase):
  504. """Tests for FilterContext class."""
  505. def test_filter_context_caches_long_running_drivers(self):
  506. """Test that FilterContext caches only long-running drivers."""
  507. # Create real filter drivers
  508. class UppercaseFilter:
  509. def clean(self, data):
  510. return data.upper()
  511. def smudge(self, data, path=b""):
  512. return data.lower()
  513. def cleanup(self):
  514. pass
  515. def reuse(self, config, filter_name):
  516. # Pretend it's a long-running filter that should be cached
  517. return True
  518. class IdentityFilter:
  519. def clean(self, data):
  520. return data
  521. def smudge(self, data, path=b""):
  522. return data
  523. def cleanup(self):
  524. pass
  525. def reuse(self, config, filter_name):
  526. # Lightweight filter, don't cache
  527. return False
  528. # Create registry and context
  529. # Need to provide a config for caching to work
  530. from dulwich.config import ConfigDict
  531. config = ConfigDict()
  532. # Add some dummy config to make it truthy (use proper format)
  533. config.set((b"filter", b"uppercase"), b"clean", b"dummy")
  534. registry = FilterRegistry(config=config)
  535. context = FilterContext(registry)
  536. # Register drivers
  537. long_running = UppercaseFilter()
  538. stateless = IdentityFilter()
  539. registry.register_driver("uppercase", long_running)
  540. registry.register_driver("identity", stateless)
  541. # Get drivers through context
  542. driver1 = context.get_driver("uppercase")
  543. driver2 = context.get_driver("uppercase")
  544. # Long-running driver should be cached
  545. self.assertIs(driver1, driver2)
  546. self.assertIs(driver1, long_running)
  547. # Get stateless driver
  548. stateless1 = context.get_driver("identity")
  549. stateless2 = context.get_driver("identity")
  550. # Stateless driver comes from registry but isn't cached in context
  551. self.assertIs(stateless1, stateless)
  552. self.assertIs(stateless2, stateless)
  553. self.assertNotIn("identity", context._active_drivers)
  554. self.assertIn("uppercase", context._active_drivers)
  555. def test_filter_context_cleanup(self):
  556. """Test that FilterContext properly cleans up resources."""
  557. cleanup_called = []
  558. class TrackableFilter:
  559. def __init__(self, name):
  560. self.name = name
  561. def clean(self, data):
  562. return data
  563. def smudge(self, data, path=b""):
  564. return data
  565. def cleanup(self):
  566. cleanup_called.append(self.name)
  567. def is_long_running(self):
  568. return True
  569. # Create registry and context
  570. registry = FilterRegistry()
  571. context = FilterContext(registry)
  572. # Register and use drivers
  573. filter1 = TrackableFilter("filter1")
  574. filter2 = TrackableFilter("filter2")
  575. filter3 = TrackableFilter("filter3")
  576. registry.register_driver("filter1", filter1)
  577. registry.register_driver("filter2", filter2)
  578. registry.register_driver("filter3", filter3)
  579. # Get only some drivers to cache them
  580. context.get_driver("filter1")
  581. context.get_driver("filter2")
  582. # Don't get filter3
  583. # Close context
  584. context.close()
  585. # Verify cleanup was called for all drivers (context closes registry too)
  586. self.assertEqual(set(cleanup_called), {"filter1", "filter2", "filter3"})
  587. def test_filter_context_get_driver_returns_none_for_missing(self):
  588. """Test that get_driver returns None for non-existent drivers."""
  589. registry = FilterRegistry()
  590. context = FilterContext(registry)
  591. result = context.get_driver("nonexistent")
  592. self.assertIsNone(result)
  593. def test_filter_context_with_real_process_filter(self):
  594. """Test FilterContext with real ProcessFilterDriver instances."""
  595. # Use existing test filter from ProcessFilterDriverTests
  596. test_dir = tempfile.mkdtemp()
  597. self.addCleanup(shutil.rmtree, test_dir)
  598. # Create a simple test filter that just passes data through
  599. filter_script = _PASSTHROUGH_FILTER_SCRIPT
  600. filter_path = os.path.join(test_dir, "simple_filter.py")
  601. with open(filter_path, "w") as f:
  602. f.write(filter_script)
  603. # Create ProcessFilterDriver instances
  604. # One with process_cmd (long-running)
  605. process_driver = ProcessFilterDriver(
  606. process_cmd=None, # Don't use actual process to avoid complexity
  607. clean_cmd=f"{sys.executable} {filter_path}",
  608. smudge_cmd=f"{sys.executable} {filter_path}",
  609. )
  610. # Register in context
  611. from dulwich.config import ConfigDict
  612. config = ConfigDict()
  613. # Add some dummy config to make it truthy (use proper format)
  614. config.set(
  615. (b"filter", b"process"),
  616. b"clean",
  617. f"{sys.executable} {filter_path}".encode(),
  618. )
  619. config.set(
  620. (b"filter", b"process"),
  621. b"smudge",
  622. f"{sys.executable} {filter_path}".encode(),
  623. )
  624. registry = FilterRegistry(config=config)
  625. context = FilterContext(registry)
  626. registry.register_driver("process", process_driver)
  627. # Get driver - should not be cached since it's not long-running
  628. driver1 = context.get_driver("process")
  629. self.assertIsNotNone(driver1)
  630. # Check that it's not a long-running process (no process_cmd)
  631. self.assertIsNone(driver1.process_cmd)
  632. self.assertNotIn("process", context._active_drivers)
  633. # Test with a long-running driver that should be cached
  634. # Create a mock driver that always wants to be reused
  635. class CacheableProcessDriver:
  636. def __init__(self):
  637. self.process_cmd = "dummy"
  638. self.clean_cmd = None
  639. self.smudge_cmd = None
  640. self.required = False
  641. def clean(self, data):
  642. return data
  643. def smudge(self, data, path=b""):
  644. return data
  645. def cleanup(self):
  646. pass
  647. def reuse(self, config, filter_name):
  648. # This driver always wants to be cached (simulates a long-running process)
  649. return True
  650. cacheable_driver = CacheableProcessDriver()
  651. registry.register_driver("long_process", cacheable_driver)
  652. driver2 = context.get_driver("long_process")
  653. # Check that it has a process_cmd (long-running)
  654. self.assertIsNotNone(driver2.process_cmd)
  655. self.assertIn("long_process", context._active_drivers)
  656. context.close()
  657. def test_filter_context_closes_registry(self):
  658. """Test that closing FilterContext also closes the registry."""
  659. # Track if registry.close() is called
  660. registry_closed = []
  661. class TrackingRegistry(FilterRegistry):
  662. def close(self):
  663. registry_closed.append(True)
  664. super().close()
  665. registry = TrackingRegistry()
  666. context = FilterContext(registry)
  667. # Close context should also close registry
  668. context.close()
  669. self.assertTrue(registry_closed)
  670. class ProcessFilterProtocolTests(TestCase):
  671. """Tests for ProcessFilterDriver protocol compliance."""
  672. def setUp(self):
  673. super().setUp()
  674. # Create a spec-compliant test filter process dynamically
  675. self.test_filter_path = self._create_spec_compliant_filter()
  676. def tearDown(self):
  677. # Clean up the test filter
  678. if hasattr(self, "test_filter_path") and os.path.exists(self.test_filter_path):
  679. os.unlink(self.test_filter_path)
  680. super().tearDown()
  681. def _create_spec_compliant_filter(self):
  682. """Create a spec-compliant test filter that works on all platforms."""
  683. import tempfile
  684. # This filter strictly follows Git spec - no newlines in packets
  685. filter_script = """import sys
  686. def read_exact(n):
  687. data = b""
  688. while len(data) < n:
  689. chunk = sys.stdin.buffer.read(n - len(data))
  690. if not chunk:
  691. break
  692. data += chunk
  693. return data
  694. def write_pkt(data):
  695. if data is None:
  696. sys.stdout.buffer.write(b"0000")
  697. else:
  698. length = len(data) + 4
  699. sys.stdout.buffer.write(("{:04x}".format(length)).encode())
  700. sys.stdout.buffer.write(data)
  701. sys.stdout.buffer.flush()
  702. def read_pkt():
  703. size_bytes = read_exact(4)
  704. if not size_bytes:
  705. return None
  706. size = int(size_bytes.decode(), 16)
  707. if size == 0:
  708. return None
  709. return read_exact(size - 4)
  710. # Handshake - exact format, no newlines
  711. client_hello = read_pkt()
  712. version = read_pkt()
  713. flush = read_pkt()
  714. if client_hello != b"git-filter-client":
  715. sys.exit(1)
  716. if version != b"version=2":
  717. sys.exit(1)
  718. write_pkt(b"git-filter-server") # No newline
  719. write_pkt(b"version=2") # No newline
  720. write_pkt(None)
  721. # Read and echo capabilities
  722. caps = []
  723. while True:
  724. cap = read_pkt()
  725. if cap is None:
  726. break
  727. caps.append(cap)
  728. for cap in caps:
  729. if cap in [b"capability=clean", b"capability=smudge"]:
  730. write_pkt(cap)
  731. write_pkt(None)
  732. # Process commands
  733. while True:
  734. headers = {}
  735. while True:
  736. line = read_pkt()
  737. if line is None:
  738. break
  739. if b"=" in line:
  740. k, v = line.split(b"=", 1)
  741. headers[k.decode()] = v.decode()
  742. if not headers:
  743. break
  744. # Read data
  745. data_chunks = []
  746. while True:
  747. chunk = read_pkt()
  748. if chunk is None:
  749. break
  750. data_chunks.append(chunk)
  751. data = b"".join(data_chunks)
  752. # Process
  753. if headers.get("command") == "clean":
  754. result = data.upper()
  755. elif headers.get("command") == "smudge":
  756. result = data.lower()
  757. else:
  758. result = data
  759. # Send response
  760. write_pkt(b"status=success")
  761. write_pkt(None)
  762. # Send result
  763. chunk_size = 65516
  764. for i in range(0, len(result), chunk_size):
  765. write_pkt(result[i:i+chunk_size])
  766. write_pkt(None)
  767. # Send final headers (empty list to keep status=success)
  768. write_pkt(None)
  769. """
  770. fd, path = tempfile.mkstemp(suffix=".py", prefix="test_filter_spec_")
  771. try:
  772. os.write(fd, filter_script.encode())
  773. os.close(fd)
  774. if os.name != "nt": # Not Windows
  775. os.chmod(path, 0o755)
  776. return path
  777. except:
  778. if os.path.exists(path):
  779. os.unlink(path)
  780. raise
  781. def test_protocol_handshake_exact_format(self):
  782. """Test that handshake uses exact format without newlines."""
  783. import sys
  784. driver = ProcessFilterDriver(
  785. process_cmd=f"{sys.executable} {self.test_filter_path}",
  786. required=True, # Require success to test protocol compliance
  787. )
  788. # This should work with exact protocol format
  789. test_data = b"hello world"
  790. result = driver.clean(test_data)
  791. # Our test filter uppercases on clean
  792. self.assertEqual(result, b"HELLO WORLD")
  793. def test_capability_negotiation_exact_format(self):
  794. """Test that capabilities are sent and received in exact format."""
  795. import sys
  796. driver = ProcessFilterDriver(
  797. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  798. )
  799. # Force capability negotiation by using both clean and smudge
  800. clean_result = driver.clean(b"test")
  801. smudge_result = driver.smudge(b"TEST", b"test.txt")
  802. self.assertEqual(clean_result, b"TEST")
  803. self.assertEqual(smudge_result, b"test")
  804. def test_binary_data_handling(self):
  805. """Test handling of binary data through the protocol."""
  806. import sys
  807. driver = ProcessFilterDriver(
  808. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  809. )
  810. # Binary data with null bytes, high bytes, etc.
  811. binary_data = bytes(range(256))
  812. result = driver.clean(binary_data)
  813. # Should handle binary data without crashing
  814. self.assertIsInstance(result, bytes)
  815. # Our test filter uppercases bytes directly, which works for binary data
  816. # The fix ensures headers are kept as bytes, so binary content doesn't cause decode errors
  817. def test_binary_data_with_invalid_utf8_sequences(self):
  818. """Test handling of binary data with invalid UTF-8 sequences.
  819. Regression test for https://github.com/jelmer/dulwich/issues/2023
  820. where binary files (like .ogg, .jpg) caused UTF-8 decode errors.
  821. """
  822. import sys
  823. driver = ProcessFilterDriver(
  824. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  825. )
  826. # Create binary data with the specific byte that caused the issue (0xe5 at position 14)
  827. # plus other invalid UTF-8 sequences
  828. binary_data = b"some header \xe5\xff\xfe binary data"
  829. result = driver.clean(binary_data)
  830. # Should handle binary data without UTF-8 decode errors
  831. self.assertIsInstance(result, bytes)
  832. # The filter should process it successfully
  833. self.assertEqual(result, binary_data.upper())
  834. def test_large_file_chunking(self):
  835. """Test proper chunking of large files."""
  836. import sys
  837. driver = ProcessFilterDriver(
  838. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  839. )
  840. # Create data larger than max pkt-line payload (65516 bytes)
  841. large_data = b"a" * 100000
  842. result = driver.clean(large_data)
  843. # Should be properly processed (uppercased)
  844. expected = b"A" * 100000
  845. self.assertEqual(result, expected)
  846. def test_empty_file_handling(self):
  847. """Test handling of empty files."""
  848. import sys
  849. driver = ProcessFilterDriver(
  850. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  851. )
  852. result = driver.clean(b"")
  853. self.assertEqual(result, b"")
  854. def test_special_characters_in_pathname(self):
  855. """Test paths with special characters are handled correctly."""
  856. import sys
  857. # Test various special characters in paths
  858. special_paths = [
  859. b"file with spaces.txt",
  860. b"path/with/slashes.txt",
  861. b"file=with=equals.txt",
  862. b"file\nwith\nnewlines.txt",
  863. b"filew&with&ampersand.txt",
  864. ]
  865. test_data = b"test data"
  866. with create_passthrough_filter() as passthrough_filter_path:
  867. for process_cmd, smudge_cmd in [
  868. (f"{sys.executable} {self.test_filter_path}", None),
  869. (None, f"{sys.executable} {passthrough_filter_path} %f"),
  870. ]:
  871. driver = ProcessFilterDriver(
  872. process_cmd=process_cmd,
  873. smudge_cmd=smudge_cmd,
  874. required=True,
  875. )
  876. for path in special_paths:
  877. with self.subTest(
  878. process_cmd=process_cmd, smudge_cmd=smudge_cmd, path=path
  879. ):
  880. result = driver.smudge(test_data, path)
  881. self.assertEqual(result, b"test data")
  882. def test_process_crash_recovery(self):
  883. """Test that process is properly restarted after crash."""
  884. import sys
  885. driver = ProcessFilterDriver(
  886. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  887. )
  888. # First operation
  889. result = driver.clean(b"test1")
  890. self.assertEqual(result, b"TEST1")
  891. # Kill the process
  892. if driver._process:
  893. driver._process.kill()
  894. driver._process.wait()
  895. driver.cleanup()
  896. # Should restart and work again
  897. result = driver.clean(b"test2")
  898. self.assertEqual(result, b"TEST2")
  899. def test_malformed_process_response_handling(self):
  900. """Test handling of malformed responses from process."""
  901. # Create a filter that sends malformed responses
  902. malformed_filter = """#!/usr/bin/env python3
  903. import sys
  904. import os
  905. sys.path.insert(0, os.path.dirname(__file__))
  906. from dulwich.protocol import Protocol
  907. protocol = Protocol(
  908. lambda n: sys.stdin.buffer.read(n),
  909. lambda d: sys.stdout.buffer.write(d) or len(d)
  910. )
  911. # Read handshake
  912. protocol.read_pkt_line()
  913. protocol.read_pkt_line()
  914. protocol.read_pkt_line()
  915. # Send invalid handshake
  916. protocol.write_pkt_line(b"invalid-welcome")
  917. protocol.write_pkt_line(b"version=2")
  918. protocol.write_pkt_line(None)
  919. """
  920. import tempfile
  921. fd, script_path = tempfile.mkstemp(suffix=".py")
  922. try:
  923. os.write(fd, malformed_filter.encode())
  924. os.close(fd)
  925. os.chmod(script_path, 0o755)
  926. driver = ProcessFilterDriver(
  927. process_cmd=f"python3 {script_path}",
  928. clean_cmd="cat", # Fallback
  929. required=False,
  930. )
  931. # Should fallback to clean_cmd when process fails
  932. result = driver.clean(b"test data")
  933. self.assertEqual(result, b"test data")
  934. finally:
  935. os.unlink(script_path)
  936. def test_concurrent_filter_operations(self):
  937. """Test that concurrent operations work correctly."""
  938. import sys
  939. driver = ProcessFilterDriver(
  940. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  941. )
  942. results = []
  943. errors = []
  944. def worker(data):
  945. try:
  946. result = driver.clean(data)
  947. results.append(result)
  948. except Exception as e:
  949. errors.append(e)
  950. # Start 5 concurrent operations
  951. threads = []
  952. test_data = [f"test{i}".encode() for i in range(5)]
  953. for data in test_data:
  954. t = threading.Thread(target=worker, args=(data,))
  955. threads.append(t)
  956. t.start()
  957. for t in threads:
  958. t.join()
  959. # Should have no errors
  960. self.assertEqual(len(errors), 0, f"Errors: {errors}")
  961. self.assertEqual(len(results), 5)
  962. # All results should be uppercase versions
  963. expected = [data.upper() for data in test_data]
  964. self.assertEqual(sorted(results), sorted(expected))
  965. def test_process_resource_cleanup(self):
  966. """Test that process resources are properly cleaned up."""
  967. import sys
  968. driver = ProcessFilterDriver(
  969. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  970. )
  971. # Use the driver
  972. result = driver.clean(b"test")
  973. self.assertEqual(result, b"TEST")
  974. # Process should be running
  975. self.assertIsNotNone(driver._process)
  976. self.assertIsNone(driver._process.poll()) # None means still running
  977. # Remember the old process to check it was terminated
  978. old_process = driver._process
  979. # Manually clean up (simulates __del__)
  980. driver.cleanup()
  981. # Process reference should be cleared
  982. self.assertIsNone(driver._process)
  983. self.assertIsNone(driver._protocol)
  984. # Old process should be terminated
  985. self.assertIsNotNone(old_process.poll()) # Not None means terminated
  986. def test_required_filter_error_propagation(self):
  987. """Test that errors are properly propagated when filter is required."""
  988. driver = ProcessFilterDriver(
  989. process_cmd="/definitely/nonexistent/command", required=True
  990. )
  991. with self.assertRaises(FilterError) as cm:
  992. driver.clean(b"test data")
  993. self.assertIn("Failed to start process filter", str(cm.exception))
  994. def test_two_phase_response_protocol(self):
  995. """Test filter protocol with two-phase response (initial + final headers).
  996. This test verifies that the filter correctly handles the Git LFS protocol
  997. where filters send:
  998. 1. Initial headers with status
  999. 2. Content data
  1000. 3. Final headers with status
  1001. This is the format used by git-lfs and documented in the Git filter protocol.
  1002. """
  1003. import sys
  1004. import tempfile
  1005. # Create a filter that follows the two-phase protocol
  1006. filter_script = """import sys
  1007. def read_exact(n):
  1008. data = b""
  1009. while len(data) < n:
  1010. chunk = sys.stdin.buffer.read(n - len(data))
  1011. if not chunk:
  1012. break
  1013. data += chunk
  1014. return data
  1015. def write_pkt(data):
  1016. if data is None:
  1017. sys.stdout.buffer.write(b"0000")
  1018. else:
  1019. length = len(data) + 4
  1020. sys.stdout.buffer.write(("{:04x}".format(length)).encode())
  1021. sys.stdout.buffer.write(data)
  1022. sys.stdout.buffer.flush()
  1023. def read_pkt():
  1024. size_bytes = read_exact(4)
  1025. if not size_bytes:
  1026. return None
  1027. size = int(size_bytes.decode(), 16)
  1028. if size == 0:
  1029. return None
  1030. return read_exact(size - 4)
  1031. # Handshake
  1032. client_hello = read_pkt()
  1033. version = read_pkt()
  1034. flush = read_pkt()
  1035. write_pkt(b"git-filter-server")
  1036. write_pkt(b"version=2")
  1037. write_pkt(None)
  1038. # Read and echo capabilities
  1039. caps = []
  1040. while True:
  1041. cap = read_pkt()
  1042. if cap is None:
  1043. break
  1044. caps.append(cap)
  1045. for cap in caps:
  1046. write_pkt(cap)
  1047. write_pkt(None)
  1048. # Process commands
  1049. while True:
  1050. headers = {}
  1051. while True:
  1052. line = read_pkt()
  1053. if line is None:
  1054. break
  1055. if b"=" in line:
  1056. k, v = line.split(b"=", 1)
  1057. headers[k.decode()] = v.decode()
  1058. if not headers:
  1059. break
  1060. # Read data
  1061. data_chunks = []
  1062. while True:
  1063. chunk = read_pkt()
  1064. if chunk is None:
  1065. break
  1066. data_chunks.append(chunk)
  1067. data = b"".join(data_chunks)
  1068. # Process
  1069. if headers.get("command") == "clean":
  1070. result = data.upper()
  1071. elif headers.get("command") == "smudge":
  1072. result = data.lower()
  1073. else:
  1074. result = data
  1075. # TWO-PHASE RESPONSE: Send initial headers
  1076. write_pkt(b"status=success")
  1077. write_pkt(None)
  1078. # Send result data
  1079. chunk_size = 65516
  1080. for i in range(0, len(result), chunk_size):
  1081. write_pkt(result[i:i+chunk_size])
  1082. write_pkt(None)
  1083. # TWO-PHASE RESPONSE: Send final headers (empty list to keep status=success)
  1084. write_pkt(None)
  1085. """
  1086. fd, filter_path = tempfile.mkstemp(
  1087. suffix=".py", prefix="test_filter_two_phase_"
  1088. )
  1089. try:
  1090. os.write(fd, filter_script.encode())
  1091. os.close(fd)
  1092. if os.name != "nt":
  1093. os.chmod(filter_path, 0o755)
  1094. driver = ProcessFilterDriver(
  1095. process_cmd=f"{sys.executable} {filter_path}", required=True
  1096. )
  1097. # Test clean operation
  1098. test_data = b"hello world"
  1099. result = driver.clean(test_data)
  1100. self.assertEqual(result, b"HELLO WORLD")
  1101. # Test smudge operation
  1102. result = driver.smudge(b"HELLO WORLD", b"test.txt")
  1103. self.assertEqual(result, b"hello world")
  1104. driver.cleanup()
  1105. finally:
  1106. if os.path.exists(filter_path):
  1107. os.unlink(filter_path)
  1108. def test_two_phase_response_with_status_messages(self):
  1109. """Test filter that sends status messages in final headers.
  1110. Some filters (like git-lfs) may send progress or status messages
  1111. in the final headers. This test verifies that we can handle those.
  1112. """
  1113. import sys
  1114. import tempfile
  1115. # Create a filter that sends extra status info in final headers
  1116. filter_script = """import sys
  1117. def read_exact(n):
  1118. data = b""
  1119. while len(data) < n:
  1120. chunk = sys.stdin.buffer.read(n - len(data))
  1121. if not chunk:
  1122. break
  1123. data += chunk
  1124. return data
  1125. def write_pkt(data):
  1126. if data is None:
  1127. sys.stdout.buffer.write(b"0000")
  1128. else:
  1129. length = len(data) + 4
  1130. sys.stdout.buffer.write(("{:04x}".format(length)).encode())
  1131. sys.stdout.buffer.write(data)
  1132. sys.stdout.buffer.flush()
  1133. def read_pkt():
  1134. size_bytes = read_exact(4)
  1135. if not size_bytes:
  1136. return None
  1137. size = int(size_bytes.decode(), 16)
  1138. if size == 0:
  1139. return None
  1140. return read_exact(size - 4)
  1141. # Handshake
  1142. client_hello = read_pkt()
  1143. version = read_pkt()
  1144. flush = read_pkt()
  1145. write_pkt(b"git-filter-server")
  1146. write_pkt(b"version=2")
  1147. write_pkt(None)
  1148. # Read and echo capabilities
  1149. caps = []
  1150. while True:
  1151. cap = read_pkt()
  1152. if cap is None:
  1153. break
  1154. caps.append(cap)
  1155. for cap in caps:
  1156. write_pkt(cap)
  1157. write_pkt(None)
  1158. # Process commands
  1159. while True:
  1160. headers = {}
  1161. while True:
  1162. line = read_pkt()
  1163. if line is None:
  1164. break
  1165. if b"=" in line:
  1166. k, v = line.split(b"=", 1)
  1167. headers[k.decode()] = v.decode()
  1168. if not headers:
  1169. break
  1170. # Read data
  1171. data_chunks = []
  1172. while True:
  1173. chunk = read_pkt()
  1174. if chunk is None:
  1175. break
  1176. data_chunks.append(chunk)
  1177. data = b"".join(data_chunks)
  1178. # Process
  1179. result = data.upper()
  1180. # Send initial headers
  1181. write_pkt(b"status=success")
  1182. write_pkt(None)
  1183. # Send result data
  1184. chunk_size = 65516
  1185. for i in range(0, len(result), chunk_size):
  1186. write_pkt(result[i:i+chunk_size])
  1187. write_pkt(None)
  1188. # Send final headers with progress messages (like git-lfs does)
  1189. write_pkt(b"status=success")
  1190. write_pkt(None)
  1191. """
  1192. fd, filter_path = tempfile.mkstemp(suffix=".py", prefix="test_filter_status_")
  1193. try:
  1194. os.write(fd, filter_script.encode())
  1195. os.close(fd)
  1196. if os.name != "nt":
  1197. os.chmod(filter_path, 0o755)
  1198. driver = ProcessFilterDriver(
  1199. process_cmd=f"{sys.executable} {filter_path}", required=True
  1200. )
  1201. # Test clean operation with status messages
  1202. test_data = b"test data with status"
  1203. result = driver.clean(test_data)
  1204. self.assertEqual(result, b"TEST DATA WITH STATUS")
  1205. driver.cleanup()
  1206. finally:
  1207. if os.path.exists(filter_path):
  1208. os.unlink(filter_path)
  1209. def test_two_phase_response_with_final_error(self):
  1210. """Test filter that reports error in final headers.
  1211. The Git protocol allows filters to report success initially,
  1212. then report an error in the final headers. This test ensures
  1213. we handle that correctly.
  1214. """
  1215. import sys
  1216. import tempfile
  1217. # Create a filter that sends error in final headers
  1218. filter_script = """import sys
  1219. def read_exact(n):
  1220. data = b""
  1221. while len(data) < n:
  1222. chunk = sys.stdin.buffer.read(n - len(data))
  1223. if not chunk:
  1224. break
  1225. data += chunk
  1226. return data
  1227. def write_pkt(data):
  1228. if data is None:
  1229. sys.stdout.buffer.write(b"0000")
  1230. else:
  1231. length = len(data) + 4
  1232. sys.stdout.buffer.write(("{:04x}".format(length)).encode())
  1233. sys.stdout.buffer.write(data)
  1234. sys.stdout.buffer.flush()
  1235. def read_pkt():
  1236. size_bytes = read_exact(4)
  1237. if not size_bytes:
  1238. return None
  1239. size = int(size_bytes.decode(), 16)
  1240. if size == 0:
  1241. return None
  1242. return read_exact(size - 4)
  1243. # Handshake
  1244. client_hello = read_pkt()
  1245. version = read_pkt()
  1246. flush = read_pkt()
  1247. write_pkt(b"git-filter-server")
  1248. write_pkt(b"version=2")
  1249. write_pkt(None)
  1250. # Read and echo capabilities
  1251. caps = []
  1252. while True:
  1253. cap = read_pkt()
  1254. if cap is None:
  1255. break
  1256. caps.append(cap)
  1257. for cap in caps:
  1258. write_pkt(cap)
  1259. write_pkt(None)
  1260. # Process commands
  1261. while True:
  1262. headers = {}
  1263. while True:
  1264. line = read_pkt()
  1265. if line is None:
  1266. break
  1267. if b"=" in line:
  1268. k, v = line.split(b"=", 1)
  1269. headers[k.decode()] = v.decode()
  1270. if not headers:
  1271. break
  1272. # Read data
  1273. data_chunks = []
  1274. while True:
  1275. chunk = read_pkt()
  1276. if chunk is None:
  1277. break
  1278. data_chunks.append(chunk)
  1279. data = b"".join(data_chunks)
  1280. # Send initial headers with success
  1281. write_pkt(b"status=success")
  1282. write_pkt(None)
  1283. # Send partial result
  1284. write_pkt(b"PARTIAL")
  1285. write_pkt(None)
  1286. # Send final headers with error (simulating processing failure)
  1287. write_pkt(b"status=error")
  1288. write_pkt(None)
  1289. """
  1290. fd, filter_path = tempfile.mkstemp(suffix=".py", prefix="test_filter_error_")
  1291. try:
  1292. os.write(fd, filter_script.encode())
  1293. os.close(fd)
  1294. if os.name != "nt":
  1295. os.chmod(filter_path, 0o755)
  1296. driver = ProcessFilterDriver(
  1297. process_cmd=f"{sys.executable} {filter_path}", required=True
  1298. )
  1299. # Should raise FilterError due to final status being error
  1300. with self.assertRaises(FilterError) as cm:
  1301. driver.clean(b"test data")
  1302. self.assertIn("final status: error", str(cm.exception))
  1303. driver.cleanup()
  1304. finally:
  1305. if os.path.exists(filter_path):
  1306. os.unlink(filter_path)
  1307. _PASSTHROUGH_FILTER_SCRIPT = """import sys
  1308. while True:
  1309. line = sys.stdin.buffer.read()
  1310. if not line:
  1311. break
  1312. sys.stdout.buffer.write(line)
  1313. sys.stdout.buffer.flush()
  1314. """
  1315. @contextmanager
  1316. def create_passthrough_filter() -> Iterator[str]:
  1317. filter_script = _PASSTHROUGH_FILTER_SCRIPT
  1318. with tempfile.NamedTemporaryFile(
  1319. suffix=".py", delete=False, prefix="test_filter_passthrough_"
  1320. ) as f:
  1321. f.write(filter_script.encode())
  1322. path = f.name
  1323. try:
  1324. if os.name != "nt": # Not Windows
  1325. os.chmod(path, 0o755)
  1326. yield path
  1327. finally:
  1328. try:
  1329. os.unlink(path)
  1330. except FileNotFoundError:
  1331. pass