test_filters.py 50 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661
  1. # test_filters.py -- Tests for filters
  2. # Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for filters."""
  22. import os
  23. import shutil
  24. import sys
  25. import tempfile
  26. import threading
  27. from collections.abc import Iterator
  28. from contextlib import contextmanager
  29. from dulwich.filters import (
  30. FilterContext,
  31. FilterError,
  32. FilterRegistry,
  33. ProcessFilterDriver,
  34. )
  35. from dulwich.repo import Repo
  36. from . import TestCase
  37. class GitAttributesFilterIntegrationTests(TestCase):
  38. """Test gitattributes integration with filter drivers."""
  39. def setUp(self) -> None:
  40. super().setUp()
  41. self.test_dir = tempfile.mkdtemp()
  42. self.addCleanup(self._cleanup_test_dir)
  43. self.repo = Repo.init(self.test_dir)
  44. def _cleanup_test_dir(self) -> None:
  45. """Clean up test directory."""
  46. import shutil
  47. shutil.rmtree(self.test_dir)
  48. def test_gitattributes_text_filter(self) -> None:
  49. """Test that text attribute triggers line ending conversion."""
  50. # Configure autocrlf first
  51. config = self.repo.get_config()
  52. config.set((b"core",), b"autocrlf", b"true")
  53. config.write_to_path()
  54. # Create .gitattributes with text attribute
  55. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  56. with open(gitattributes_path, "wb") as f:
  57. f.write(b"*.txt text\n")
  58. f.write(b"*.bin -text\n")
  59. # Add .gitattributes
  60. worktree = self.repo.get_worktree()
  61. worktree.stage([".gitattributes"])
  62. worktree.commit(
  63. message=b"Add gitattributes",
  64. committer=b"Test <test@example.com>",
  65. author=b"Test <test@example.com>",
  66. commit_timestamp=1000000000,
  67. author_timestamp=1000000000,
  68. commit_timezone=0,
  69. author_timezone=0,
  70. )
  71. # Create text file with CRLF
  72. text_file = os.path.join(self.test_dir, "test.txt")
  73. with open(text_file, "wb") as f:
  74. f.write(b"line1\r\nline2\r\n")
  75. # Create binary file with CRLF
  76. bin_file = os.path.join(self.test_dir, "test.bin")
  77. with open(bin_file, "wb") as f:
  78. f.write(b"binary\r\ndata\r\n")
  79. # Add files
  80. worktree.stage(["test.txt", "test.bin"])
  81. # Check that text file was normalized
  82. index = self.repo.open_index()
  83. text_entry = index[b"test.txt"]
  84. text_blob = self.repo.object_store[text_entry.sha]
  85. self.assertEqual(text_blob.data, b"line1\nline2\n")
  86. # Check that binary file was not normalized
  87. bin_entry = index[b"test.bin"]
  88. bin_blob = self.repo.object_store[bin_entry.sha]
  89. self.assertEqual(bin_blob.data, b"binary\r\ndata\r\n")
  90. def test_gitattributes_custom_filter(self) -> None:
  91. """Test custom filter specified in gitattributes."""
  92. # Create a Python script that acts as our filter
  93. import sys
  94. filter_script = os.path.join(self.test_dir, "redact_filter.py")
  95. with open(filter_script, "w") as f:
  96. f.write(
  97. """#!/usr/bin/env python3
  98. import sys
  99. data = sys.stdin.buffer.read()
  100. # Replace all digits with X
  101. result = bytearray()
  102. for b in data:
  103. if chr(b).isdigit():
  104. result.append(ord('X'))
  105. else:
  106. result.append(b)
  107. sys.stdout.buffer.write(result)
  108. """
  109. )
  110. os.chmod(filter_script, 0o755)
  111. # Create .gitattributes with custom filter
  112. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  113. with open(gitattributes_path, "wb") as f:
  114. f.write(b"*.secret filter=redact\n")
  115. # Configure custom filter (use Python script for testing)
  116. config = self.repo.get_config()
  117. # This filter replaces all digits with X
  118. config.set(
  119. (b"filter", b"redact"),
  120. b"clean",
  121. f"{sys.executable} {filter_script}".encode(),
  122. )
  123. config.write_to_path()
  124. # Add .gitattributes
  125. worktree = self.repo.get_worktree()
  126. worktree.stage([".gitattributes"])
  127. # Create file with sensitive content
  128. secret_file = os.path.join(self.test_dir, "password.secret")
  129. with open(secret_file, "wb") as f:
  130. f.write(b"password123\ntoken456\n")
  131. # Add file
  132. worktree.stage(["password.secret"])
  133. # Check that content was filtered
  134. index = self.repo.open_index()
  135. entry = index[b"password.secret"]
  136. blob = self.repo.object_store[entry.sha]
  137. self.assertEqual(blob.data, b"passwordXXX\ntokenXXX\n")
  138. def test_gitattributes_from_tree(self) -> None:
  139. """Test that gitattributes from tree are used when no working tree exists."""
  140. # Create .gitattributes with text attribute
  141. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  142. with open(gitattributes_path, "wb") as f:
  143. f.write(b"*.txt text\n")
  144. # Add and commit .gitattributes
  145. worktree = self.repo.get_worktree()
  146. worktree.stage([".gitattributes"])
  147. worktree.commit(
  148. message=b"Add gitattributes",
  149. committer=b"Test <test@example.com>",
  150. author=b"Test <test@example.com>",
  151. commit_timestamp=1000000000,
  152. author_timestamp=1000000000,
  153. commit_timezone=0,
  154. author_timezone=0,
  155. )
  156. # Remove .gitattributes from working tree
  157. os.remove(gitattributes_path)
  158. # Get gitattributes - should still work from tree
  159. gitattributes = self.repo.get_gitattributes()
  160. attrs = gitattributes.match_path(b"test.txt")
  161. self.assertEqual(attrs.get(b"text"), True)
  162. def test_gitattributes_info_attributes(self) -> None:
  163. """Test that .git/info/attributes is read."""
  164. # Create info/attributes
  165. info_dir = os.path.join(self.repo.controldir(), "info")
  166. if not os.path.exists(info_dir):
  167. os.makedirs(info_dir)
  168. info_attrs_path = os.path.join(info_dir, "attributes")
  169. with open(info_attrs_path, "wb") as f:
  170. f.write(b"*.log text\n")
  171. # Get gitattributes
  172. gitattributes = self.repo.get_gitattributes()
  173. attrs = gitattributes.match_path(b"debug.log")
  174. self.assertEqual(attrs.get(b"text"), True)
  175. def test_filter_precedence(self) -> None:
  176. """Test that filter attribute takes precedence over text attribute."""
  177. # Create a Python script that converts to uppercase
  178. import sys
  179. filter_script = os.path.join(self.test_dir, "uppercase_filter.py")
  180. with open(filter_script, "w") as f:
  181. f.write(
  182. """#!/usr/bin/env python3
  183. import sys
  184. data = sys.stdin.buffer.read()
  185. # Convert bytes to string, uppercase, then back to bytes
  186. result = data.decode('utf-8', errors='replace').upper().encode('utf-8')
  187. sys.stdout.buffer.write(result)
  188. """
  189. )
  190. os.chmod(filter_script, 0o755)
  191. # Create .gitattributes with both text and filter
  192. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  193. with open(gitattributes_path, "wb") as f:
  194. f.write(b"*.txt text filter=custom\n")
  195. # Configure autocrlf and custom filter
  196. config = self.repo.get_config()
  197. config.set((b"core",), b"autocrlf", b"true")
  198. # This filter converts to uppercase
  199. config.set(
  200. (b"filter", b"custom"),
  201. b"clean",
  202. f"{sys.executable} {filter_script}".encode(),
  203. )
  204. config.write_to_path()
  205. # Add .gitattributes
  206. worktree = self.repo.get_worktree()
  207. worktree.stage([".gitattributes"])
  208. # Create text file with lowercase and CRLF
  209. text_file = os.path.join(self.test_dir, "test.txt")
  210. with open(text_file, "wb") as f:
  211. f.write(b"hello\r\nworld\r\n")
  212. # Add file
  213. worktree.stage(["test.txt"])
  214. # Check that custom filter was applied (not just line ending conversion)
  215. index = self.repo.open_index()
  216. entry = index[b"test.txt"]
  217. blob = self.repo.object_store[entry.sha]
  218. # Should be uppercase with LF endings
  219. self.assertEqual(blob.data, b"HELLO\nWORLD\n")
  220. def test_blob_normalizer_integration(self) -> None:
  221. """Test that get_blob_normalizer returns a FilterBlobNormalizer."""
  222. normalizer = self.repo.get_blob_normalizer()
  223. # Check it's the right type
  224. from dulwich.filters import FilterBlobNormalizer
  225. self.assertIsInstance(normalizer, FilterBlobNormalizer)
  226. # Check it has access to gitattributes
  227. self.assertIsNotNone(normalizer.gitattributes)
  228. self.assertIsNotNone(normalizer.filter_registry)
  229. def test_required_filter_missing(self) -> None:
  230. """Test that missing required filter raises an error."""
  231. # Create .gitattributes with required filter
  232. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  233. with open(gitattributes_path, "wb") as f:
  234. f.write(b"*.secret filter=required_filter\n")
  235. # Configure filter as required but without commands
  236. config = self.repo.get_config()
  237. config.set((b"filter", b"required_filter"), b"required", b"true")
  238. config.write_to_path()
  239. # Add .gitattributes
  240. worktree = self.repo.get_worktree()
  241. worktree.stage([".gitattributes"])
  242. # Create file that would use the filter
  243. secret_file = os.path.join(self.test_dir, "test.secret")
  244. with open(secret_file, "wb") as f:
  245. f.write(b"test content\n")
  246. # Adding file should raise error due to missing required filter
  247. with self.assertRaises(FilterError) as cm:
  248. worktree.stage(["test.secret"])
  249. self.assertIn(
  250. "Required filter 'required_filter' is not available", str(cm.exception)
  251. )
  252. def test_required_filter_clean_command_fails(self) -> None:
  253. """Test that required filter failure during clean raises an error."""
  254. # Create .gitattributes with required filter
  255. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  256. with open(gitattributes_path, "wb") as f:
  257. f.write(b"*.secret filter=failing_filter\n")
  258. # Configure filter as required with failing command
  259. config = self.repo.get_config()
  260. config.set(
  261. (b"filter", b"failing_filter"), b"clean", b"false"
  262. ) # false command always fails
  263. config.set((b"filter", b"failing_filter"), b"required", b"true")
  264. config.write_to_path()
  265. # Add .gitattributes
  266. worktree = self.repo.get_worktree()
  267. worktree.stage([".gitattributes"])
  268. # Create file that would use the filter
  269. secret_file = os.path.join(self.test_dir, "test.secret")
  270. with open(secret_file, "wb") as f:
  271. f.write(b"test content\n")
  272. # Adding file should raise error due to failing required filter
  273. with self.assertRaises(FilterError) as cm:
  274. worktree.stage(["test.secret"])
  275. self.assertIn("Required clean filter failed", str(cm.exception))
  276. def test_required_filter_success(self) -> None:
  277. """Test that required filter works when properly configured."""
  278. # Create .gitattributes with required filter
  279. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  280. with open(gitattributes_path, "wb") as f:
  281. f.write(b"*.secret filter=working_filter\n")
  282. # Configure filter as required with working command
  283. config = self.repo.get_config()
  284. config.set(
  285. (b"filter", b"working_filter"), b"clean", b"tr 'a-z' 'A-Z'"
  286. ) # uppercase
  287. config.set((b"filter", b"working_filter"), b"required", b"true")
  288. config.write_to_path()
  289. # Add .gitattributes
  290. worktree = self.repo.get_worktree()
  291. worktree.stage([".gitattributes"])
  292. # Create file that would use the filter
  293. secret_file = os.path.join(self.test_dir, "test.secret")
  294. with open(secret_file, "wb") as f:
  295. f.write(b"hello world\n")
  296. # Adding file should work and apply filter
  297. worktree.stage(["test.secret"])
  298. # Check that content was filtered
  299. index = self.repo.open_index()
  300. entry = index[b"test.secret"]
  301. blob = self.repo.object_store[entry.sha]
  302. self.assertEqual(blob.data, b"HELLO WORLD\n")
  303. def test_optional_filter_failure_fallback(self) -> None:
  304. """Test that optional filter failure falls back to original data."""
  305. # Create .gitattributes with optional filter
  306. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  307. with open(gitattributes_path, "wb") as f:
  308. f.write(b"*.txt filter=optional_filter\n")
  309. # Configure filter as optional (required=false) with failing command
  310. config = self.repo.get_config()
  311. config.set(
  312. (b"filter", b"optional_filter"), b"clean", b"false"
  313. ) # false command always fails
  314. config.set((b"filter", b"optional_filter"), b"required", b"false")
  315. config.write_to_path()
  316. # Add .gitattributes
  317. worktree = self.repo.get_worktree()
  318. worktree.stage([".gitattributes"])
  319. # Create file that would use the filter
  320. test_file = os.path.join(self.test_dir, "test.txt")
  321. with open(test_file, "wb") as f:
  322. f.write(b"test content\n")
  323. # Adding file should work and fallback to original content
  324. with self.assertLogs(level="WARNING"):
  325. worktree.stage(["test.txt"])
  326. # Check that original content was preserved
  327. index = self.repo.open_index()
  328. entry = index[b"test.txt"]
  329. blob = self.repo.object_store[entry.sha]
  330. self.assertEqual(blob.data, b"test content\n")
  331. class ProcessFilterDriverTests(TestCase):
  332. """Tests for ProcessFilterDriver with real process filter."""
  333. def setUp(self):
  334. super().setUp()
  335. # Create a temporary test filter process dynamically
  336. self.test_filter_path = self._create_test_filter()
  337. def tearDown(self):
  338. # Clean up the test filter
  339. if hasattr(self, "test_filter_path") and os.path.exists(self.test_filter_path):
  340. os.unlink(self.test_filter_path)
  341. super().tearDown()
  342. def _create_test_filter(self):
  343. """Create a simple test filter process that works on all platforms."""
  344. import tempfile
  345. # Create filter script that uppercases on clean, lowercases on smudge
  346. filter_script = """import sys
  347. import os
  348. # Simple filter that doesn't use any external dependencies
  349. def read_exact(n):
  350. data = b""
  351. while len(data) < n:
  352. chunk = sys.stdin.buffer.read(n - len(data))
  353. if not chunk:
  354. break
  355. data += chunk
  356. return data
  357. def write_pkt(data):
  358. if data is None:
  359. sys.stdout.buffer.write(b"0000")
  360. else:
  361. length = len(data) + 4
  362. sys.stdout.buffer.write(("{:04x}".format(length)).encode())
  363. sys.stdout.buffer.write(data)
  364. sys.stdout.buffer.flush()
  365. def read_pkt():
  366. size_bytes = read_exact(4)
  367. if not size_bytes:
  368. return None
  369. size = int(size_bytes.decode(), 16)
  370. if size == 0:
  371. return None
  372. return read_exact(size - 4)
  373. # Handshake
  374. client_hello = read_pkt()
  375. version = read_pkt()
  376. flush = read_pkt()
  377. write_pkt(b"git-filter-server")
  378. write_pkt(b"version=2")
  379. write_pkt(None)
  380. # Read and echo capabilities
  381. caps = []
  382. while True:
  383. cap = read_pkt()
  384. if cap is None:
  385. break
  386. caps.append(cap)
  387. for cap in caps:
  388. write_pkt(cap)
  389. write_pkt(None)
  390. # Process commands
  391. while True:
  392. headers = {}
  393. while True:
  394. line = read_pkt()
  395. if line is None:
  396. break
  397. if b"=" in line:
  398. k, v = line.split(b"=", 1)
  399. headers[k.decode()] = v.decode()
  400. if not headers:
  401. break
  402. # Read data
  403. data_chunks = []
  404. while True:
  405. chunk = read_pkt()
  406. if chunk is None:
  407. break
  408. data_chunks.append(chunk)
  409. data = b"".join(data_chunks)
  410. # Process (uppercase for clean, lowercase for smudge)
  411. if headers.get("command") == "clean":
  412. result = data.upper()
  413. elif headers.get("command") == "smudge":
  414. result = data.lower()
  415. else:
  416. result = data
  417. # Send response
  418. write_pkt(b"status=success")
  419. write_pkt(None)
  420. # Send result
  421. chunk_size = 65516
  422. for i in range(0, len(result), chunk_size):
  423. write_pkt(result[i:i+chunk_size])
  424. write_pkt(None)
  425. # Send final headers (empty list to keep status=success)
  426. write_pkt(None)
  427. """
  428. # Create temporary file
  429. fd, path = tempfile.mkstemp(suffix=".py", prefix="test_filter_")
  430. try:
  431. os.write(fd, filter_script.encode())
  432. os.close(fd)
  433. # Make executable on Unix-like systems
  434. if os.name != "nt": # Not Windows
  435. os.chmod(path, 0o755)
  436. return path
  437. except:
  438. if os.path.exists(path):
  439. os.unlink(path)
  440. raise
  441. def test_process_filter_clean_operation(self):
  442. """Test clean operation using real process filter."""
  443. import sys
  444. driver = ProcessFilterDriver(
  445. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  446. )
  447. test_data = b"hello world"
  448. result = driver.clean(test_data)
  449. # Our test filter uppercases on clean
  450. self.assertEqual(result, b"HELLO WORLD")
  451. def test_process_filter_smudge_operation(self):
  452. """Test smudge operation using real process filter."""
  453. import sys
  454. driver = ProcessFilterDriver(
  455. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  456. )
  457. test_data = b"HELLO WORLD"
  458. result = driver.smudge(test_data, b"test.txt")
  459. # Our test filter lowercases on smudge
  460. self.assertEqual(result, b"hello world")
  461. def test_process_filter_large_data(self):
  462. """Test process filter with data larger than single pkt-line."""
  463. import sys
  464. driver = ProcessFilterDriver(
  465. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  466. )
  467. # Create data larger than max pkt-line payload (65516 bytes)
  468. test_data = b"a" * 70000
  469. result = driver.clean(test_data)
  470. # Should be uppercased
  471. self.assertEqual(result, b"A" * 70000)
  472. def test_fallback_to_individual_commands(self):
  473. """Test fallback when process filter fails."""
  474. driver = ProcessFilterDriver(
  475. clean_cmd="tr '[:lower:]' '[:upper:]'", # Shell command to uppercase
  476. process_cmd="/nonexistent/command", # This should fail
  477. required=False,
  478. )
  479. test_data = b"hello world\n"
  480. with self.assertLogs(level="WARNING"):
  481. result = driver.clean(test_data)
  482. # Should fallback to tr command and uppercase
  483. self.assertEqual(result, b"HELLO WORLD\n")
  484. def test_process_reuse(self):
  485. """Test that process is reused across multiple operations."""
  486. import sys
  487. driver = ProcessFilterDriver(
  488. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  489. )
  490. # First operation
  491. result1 = driver.clean(b"test1")
  492. self.assertEqual(result1, b"TEST1")
  493. # Second operation should reuse the same process
  494. result2 = driver.clean(b"test2")
  495. self.assertEqual(result2, b"TEST2")
  496. # Process should still be alive
  497. self.assertIsNotNone(driver._process)
  498. self.assertIsNone(driver._process.poll()) # None means still running
  499. def test_error_handling_invalid_command(self):
  500. """Test error handling with invalid filter command."""
  501. driver = ProcessFilterDriver(process_cmd="/nonexistent/command", required=True)
  502. with self.assertRaises(FilterError) as cm:
  503. driver.clean(b"test data")
  504. self.assertIn("Failed to start process filter", str(cm.exception))
  505. class FilterContextTests(TestCase):
  506. """Tests for FilterContext class."""
  507. def test_filter_context_caches_long_running_drivers(self):
  508. """Test that FilterContext caches only long-running drivers."""
  509. # Create real filter drivers
  510. class UppercaseFilter:
  511. def clean(self, data):
  512. return data.upper()
  513. def smudge(self, data, path=b""):
  514. return data.lower()
  515. def cleanup(self):
  516. pass
  517. def reuse(self, config, filter_name):
  518. # Pretend it's a long-running filter that should be cached
  519. return True
  520. class IdentityFilter:
  521. def clean(self, data):
  522. return data
  523. def smudge(self, data, path=b""):
  524. return data
  525. def cleanup(self):
  526. pass
  527. def reuse(self, config, filter_name):
  528. # Lightweight filter, don't cache
  529. return False
  530. # Create registry and context
  531. # Need to provide a config for caching to work
  532. from dulwich.config import ConfigDict
  533. config = ConfigDict()
  534. # Add some dummy config to make it truthy (use proper format)
  535. config.set((b"filter", b"uppercase"), b"clean", b"dummy")
  536. registry = FilterRegistry(config=config)
  537. context = FilterContext(registry)
  538. # Register drivers
  539. long_running = UppercaseFilter()
  540. stateless = IdentityFilter()
  541. registry.register_driver("uppercase", long_running)
  542. registry.register_driver("identity", stateless)
  543. # Get drivers through context
  544. driver1 = context.get_driver("uppercase")
  545. driver2 = context.get_driver("uppercase")
  546. # Long-running driver should be cached
  547. self.assertIs(driver1, driver2)
  548. self.assertIs(driver1, long_running)
  549. # Get stateless driver
  550. stateless1 = context.get_driver("identity")
  551. stateless2 = context.get_driver("identity")
  552. # Stateless driver comes from registry but isn't cached in context
  553. self.assertIs(stateless1, stateless)
  554. self.assertIs(stateless2, stateless)
  555. self.assertNotIn("identity", context._active_drivers)
  556. self.assertIn("uppercase", context._active_drivers)
  557. def test_filter_context_cleanup(self):
  558. """Test that FilterContext properly cleans up resources."""
  559. cleanup_called = []
  560. class TrackableFilter:
  561. def __init__(self, name):
  562. self.name = name
  563. def clean(self, data):
  564. return data
  565. def smudge(self, data, path=b""):
  566. return data
  567. def cleanup(self):
  568. cleanup_called.append(self.name)
  569. def is_long_running(self):
  570. return True
  571. # Create registry and context
  572. registry = FilterRegistry()
  573. context = FilterContext(registry)
  574. # Register and use drivers
  575. filter1 = TrackableFilter("filter1")
  576. filter2 = TrackableFilter("filter2")
  577. filter3 = TrackableFilter("filter3")
  578. registry.register_driver("filter1", filter1)
  579. registry.register_driver("filter2", filter2)
  580. registry.register_driver("filter3", filter3)
  581. # Get only some drivers to cache them
  582. context.get_driver("filter1")
  583. context.get_driver("filter2")
  584. # Don't get filter3
  585. # Close context
  586. context.close()
  587. # Verify cleanup was called for all drivers (context closes registry too)
  588. self.assertEqual(set(cleanup_called), {"filter1", "filter2", "filter3"})
  589. def test_filter_context_get_driver_returns_none_for_missing(self):
  590. """Test that get_driver returns None for non-existent drivers."""
  591. registry = FilterRegistry()
  592. context = FilterContext(registry)
  593. result = context.get_driver("nonexistent")
  594. self.assertIsNone(result)
  595. def test_filter_context_with_real_process_filter(self):
  596. """Test FilterContext with real ProcessFilterDriver instances."""
  597. # Use existing test filter from ProcessFilterDriverTests
  598. test_dir = tempfile.mkdtemp()
  599. self.addCleanup(shutil.rmtree, test_dir)
  600. # Create a simple test filter that just passes data through
  601. filter_script = _PASSTHROUGH_FILTER_SCRIPT
  602. filter_path = os.path.join(test_dir, "simple_filter.py")
  603. with open(filter_path, "w") as f:
  604. f.write(filter_script)
  605. # Create ProcessFilterDriver instances
  606. # One with process_cmd (long-running)
  607. process_driver = ProcessFilterDriver(
  608. process_cmd=None, # Don't use actual process to avoid complexity
  609. clean_cmd=f"{sys.executable} {filter_path}",
  610. smudge_cmd=f"{sys.executable} {filter_path}",
  611. )
  612. # Register in context
  613. from dulwich.config import ConfigDict
  614. config = ConfigDict()
  615. # Add some dummy config to make it truthy (use proper format)
  616. config.set(
  617. (b"filter", b"process"),
  618. b"clean",
  619. f"{sys.executable} {filter_path}".encode(),
  620. )
  621. config.set(
  622. (b"filter", b"process"),
  623. b"smudge",
  624. f"{sys.executable} {filter_path}".encode(),
  625. )
  626. registry = FilterRegistry(config=config)
  627. context = FilterContext(registry)
  628. registry.register_driver("process", process_driver)
  629. # Get driver - should not be cached since it's not long-running
  630. driver1 = context.get_driver("process")
  631. self.assertIsNotNone(driver1)
  632. # Check that it's not a long-running process (no process_cmd)
  633. self.assertIsNone(driver1.process_cmd)
  634. self.assertNotIn("process", context._active_drivers)
  635. # Test with a long-running driver that should be cached
  636. # Create a mock driver that always wants to be reused
  637. class CacheableProcessDriver:
  638. def __init__(self):
  639. self.process_cmd = "dummy"
  640. self.clean_cmd = None
  641. self.smudge_cmd = None
  642. self.required = False
  643. def clean(self, data):
  644. return data
  645. def smudge(self, data, path=b""):
  646. return data
  647. def cleanup(self):
  648. pass
  649. def reuse(self, config, filter_name):
  650. # This driver always wants to be cached (simulates a long-running process)
  651. return True
  652. cacheable_driver = CacheableProcessDriver()
  653. registry.register_driver("long_process", cacheable_driver)
  654. driver2 = context.get_driver("long_process")
  655. # Check that it has a process_cmd (long-running)
  656. self.assertIsNotNone(driver2.process_cmd)
  657. self.assertIn("long_process", context._active_drivers)
  658. context.close()
  659. def test_filter_context_closes_registry(self):
  660. """Test that closing FilterContext also closes the registry."""
  661. # Track if registry.close() is called
  662. registry_closed = []
  663. class TrackingRegistry(FilterRegistry):
  664. def close(self):
  665. registry_closed.append(True)
  666. super().close()
  667. registry = TrackingRegistry()
  668. context = FilterContext(registry)
  669. # Close context should also close registry
  670. context.close()
  671. self.assertTrue(registry_closed)
  672. class ProcessFilterProtocolTests(TestCase):
  673. """Tests for ProcessFilterDriver protocol compliance."""
  674. def setUp(self):
  675. super().setUp()
  676. # Create a spec-compliant test filter process dynamically
  677. self.test_filter_path = self._create_spec_compliant_filter()
  678. def tearDown(self):
  679. # Clean up the test filter
  680. if hasattr(self, "test_filter_path") and os.path.exists(self.test_filter_path):
  681. os.unlink(self.test_filter_path)
  682. super().tearDown()
  683. def _create_spec_compliant_filter(self):
  684. """Create a spec-compliant test filter that works on all platforms."""
  685. import tempfile
  686. # This filter strictly follows Git spec - no newlines in packets
  687. filter_script = """import sys
  688. def read_exact(n):
  689. data = b""
  690. while len(data) < n:
  691. chunk = sys.stdin.buffer.read(n - len(data))
  692. if not chunk:
  693. break
  694. data += chunk
  695. return data
  696. def write_pkt(data):
  697. if data is None:
  698. sys.stdout.buffer.write(b"0000")
  699. else:
  700. length = len(data) + 4
  701. sys.stdout.buffer.write(("{:04x}".format(length)).encode())
  702. sys.stdout.buffer.write(data)
  703. sys.stdout.buffer.flush()
  704. def read_pkt():
  705. size_bytes = read_exact(4)
  706. if not size_bytes:
  707. return None
  708. size = int(size_bytes.decode(), 16)
  709. if size == 0:
  710. return None
  711. return read_exact(size - 4)
  712. # Handshake - exact format, no newlines
  713. client_hello = read_pkt()
  714. version = read_pkt()
  715. flush = read_pkt()
  716. if client_hello != b"git-filter-client":
  717. sys.exit(1)
  718. if version != b"version=2":
  719. sys.exit(1)
  720. write_pkt(b"git-filter-server") # No newline
  721. write_pkt(b"version=2") # No newline
  722. write_pkt(None)
  723. # Read and echo capabilities
  724. caps = []
  725. while True:
  726. cap = read_pkt()
  727. if cap is None:
  728. break
  729. caps.append(cap)
  730. for cap in caps:
  731. if cap in [b"capability=clean", b"capability=smudge"]:
  732. write_pkt(cap)
  733. write_pkt(None)
  734. # Process commands
  735. while True:
  736. headers = {}
  737. while True:
  738. line = read_pkt()
  739. if line is None:
  740. break
  741. if b"=" in line:
  742. k, v = line.split(b"=", 1)
  743. headers[k.decode()] = v.decode()
  744. if not headers:
  745. break
  746. # Read data
  747. data_chunks = []
  748. while True:
  749. chunk = read_pkt()
  750. if chunk is None:
  751. break
  752. data_chunks.append(chunk)
  753. data = b"".join(data_chunks)
  754. # Process
  755. if headers.get("command") == "clean":
  756. result = data.upper()
  757. elif headers.get("command") == "smudge":
  758. result = data.lower()
  759. else:
  760. result = data
  761. # Send response
  762. write_pkt(b"status=success")
  763. write_pkt(None)
  764. # Send result
  765. chunk_size = 65516
  766. for i in range(0, len(result), chunk_size):
  767. write_pkt(result[i:i+chunk_size])
  768. write_pkt(None)
  769. # Send final headers (empty list to keep status=success)
  770. write_pkt(None)
  771. """
  772. fd, path = tempfile.mkstemp(suffix=".py", prefix="test_filter_spec_")
  773. try:
  774. os.write(fd, filter_script.encode())
  775. os.close(fd)
  776. if os.name != "nt": # Not Windows
  777. os.chmod(path, 0o755)
  778. return path
  779. except:
  780. if os.path.exists(path):
  781. os.unlink(path)
  782. raise
  783. def test_protocol_handshake_exact_format(self):
  784. """Test that handshake uses exact format without newlines."""
  785. import sys
  786. driver = ProcessFilterDriver(
  787. process_cmd=f"{sys.executable} {self.test_filter_path}",
  788. required=True, # Require success to test protocol compliance
  789. )
  790. # This should work with exact protocol format
  791. test_data = b"hello world"
  792. result = driver.clean(test_data)
  793. # Our test filter uppercases on clean
  794. self.assertEqual(result, b"HELLO WORLD")
  795. def test_capability_negotiation_exact_format(self):
  796. """Test that capabilities are sent and received in exact format."""
  797. import sys
  798. driver = ProcessFilterDriver(
  799. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  800. )
  801. # Force capability negotiation by using both clean and smudge
  802. clean_result = driver.clean(b"test")
  803. smudge_result = driver.smudge(b"TEST", b"test.txt")
  804. self.assertEqual(clean_result, b"TEST")
  805. self.assertEqual(smudge_result, b"test")
  806. def test_binary_data_handling(self):
  807. """Test handling of binary data through the protocol."""
  808. import sys
  809. driver = ProcessFilterDriver(
  810. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  811. )
  812. # Binary data with null bytes, high bytes, etc.
  813. binary_data = bytes(range(256))
  814. result = driver.clean(binary_data)
  815. # Should handle binary data without crashing
  816. self.assertIsInstance(result, bytes)
  817. # Our test filter uppercases bytes directly, which works for binary data
  818. # The fix ensures headers are kept as bytes, so binary content doesn't cause decode errors
  819. def test_binary_data_with_invalid_utf8_sequences(self):
  820. """Test handling of binary data with invalid UTF-8 sequences.
  821. Regression test for https://github.com/jelmer/dulwich/issues/2023
  822. where binary files (like .ogg, .jpg) caused UTF-8 decode errors.
  823. """
  824. import sys
  825. driver = ProcessFilterDriver(
  826. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  827. )
  828. # Create binary data with the specific byte that caused the issue (0xe5 at position 14)
  829. # plus other invalid UTF-8 sequences
  830. binary_data = b"some header \xe5\xff\xfe binary data"
  831. result = driver.clean(binary_data)
  832. # Should handle binary data without UTF-8 decode errors
  833. self.assertIsInstance(result, bytes)
  834. # The filter should process it successfully
  835. self.assertEqual(result, binary_data.upper())
  836. def test_large_file_chunking(self):
  837. """Test proper chunking of large files."""
  838. import sys
  839. driver = ProcessFilterDriver(
  840. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  841. )
  842. # Create data larger than max pkt-line payload (65516 bytes)
  843. large_data = b"a" * 100000
  844. result = driver.clean(large_data)
  845. # Should be properly processed (uppercased)
  846. expected = b"A" * 100000
  847. self.assertEqual(result, expected)
  848. def test_empty_file_handling(self):
  849. """Test handling of empty files."""
  850. import sys
  851. driver = ProcessFilterDriver(
  852. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  853. )
  854. result = driver.clean(b"")
  855. self.assertEqual(result, b"")
  856. def test_special_characters_in_pathname(self):
  857. """Test paths with special characters are handled correctly."""
  858. import sys
  859. # Test various special characters in paths
  860. special_paths = [
  861. b"file with spaces.txt",
  862. b"path/with/slashes.txt",
  863. b"file=with=equals.txt",
  864. b"file\nwith\nnewlines.txt",
  865. b"filew&with&ampersand.txt",
  866. ]
  867. test_data = b"test data"
  868. with create_passthrough_filter() as passthrough_filter_path:
  869. for process_cmd, smudge_cmd in [
  870. (f"{sys.executable} {self.test_filter_path}", None),
  871. (None, f"{sys.executable} {passthrough_filter_path} %f"),
  872. ]:
  873. driver = ProcessFilterDriver(
  874. process_cmd=process_cmd,
  875. smudge_cmd=smudge_cmd,
  876. required=True,
  877. )
  878. for path in special_paths:
  879. with self.subTest(
  880. process_cmd=process_cmd, smudge_cmd=smudge_cmd, path=path
  881. ):
  882. result = driver.smudge(test_data, path)
  883. self.assertEqual(result, b"test data")
  884. def test_process_crash_recovery(self):
  885. """Test that process is properly restarted after crash."""
  886. import sys
  887. driver = ProcessFilterDriver(
  888. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  889. )
  890. # First operation
  891. result = driver.clean(b"test1")
  892. self.assertEqual(result, b"TEST1")
  893. # Kill the process
  894. if driver._process:
  895. driver._process.kill()
  896. driver._process.wait()
  897. driver.cleanup()
  898. # Should restart and work again
  899. result = driver.clean(b"test2")
  900. self.assertEqual(result, b"TEST2")
  901. def test_malformed_process_response_handling(self):
  902. """Test handling of malformed responses from process."""
  903. # Create a filter that sends malformed responses
  904. malformed_filter = """#!/usr/bin/env python3
  905. import sys
  906. import os
  907. sys.path.insert(0, os.path.dirname(__file__))
  908. from dulwich.protocol import Protocol
  909. protocol = Protocol(
  910. lambda n: sys.stdin.buffer.read(n),
  911. lambda d: sys.stdout.buffer.write(d) or len(d)
  912. )
  913. # Read handshake
  914. protocol.read_pkt_line()
  915. protocol.read_pkt_line()
  916. protocol.read_pkt_line()
  917. # Send invalid handshake
  918. protocol.write_pkt_line(b"invalid-welcome")
  919. protocol.write_pkt_line(b"version=2")
  920. protocol.write_pkt_line(None)
  921. """
  922. import tempfile
  923. fd, script_path = tempfile.mkstemp(suffix=".py")
  924. try:
  925. os.write(fd, malformed_filter.encode())
  926. os.close(fd)
  927. os.chmod(script_path, 0o755)
  928. driver = ProcessFilterDriver(
  929. process_cmd=f"python3 {script_path}",
  930. clean_cmd="cat", # Fallback
  931. required=False,
  932. )
  933. # Should fallback to clean_cmd when process fails
  934. with self.assertLogs(level="WARNING"):
  935. result = driver.clean(b"test data")
  936. self.assertEqual(result, b"test data")
  937. finally:
  938. os.unlink(script_path)
  939. def test_concurrent_filter_operations(self):
  940. """Test that concurrent operations work correctly."""
  941. import sys
  942. driver = ProcessFilterDriver(
  943. process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
  944. )
  945. results = []
  946. errors = []
  947. def worker(data):
  948. try:
  949. result = driver.clean(data)
  950. results.append(result)
  951. except Exception as e:
  952. errors.append(e)
  953. # Start 5 concurrent operations
  954. threads = []
  955. test_data = [f"test{i}".encode() for i in range(5)]
  956. for data in test_data:
  957. t = threading.Thread(target=worker, args=(data,))
  958. threads.append(t)
  959. t.start()
  960. for t in threads:
  961. t.join()
  962. # Should have no errors
  963. self.assertEqual(len(errors), 0, f"Errors: {errors}")
  964. self.assertEqual(len(results), 5)
  965. # All results should be uppercase versions
  966. expected = [data.upper() for data in test_data]
  967. self.assertEqual(sorted(results), sorted(expected))
  968. def test_process_resource_cleanup(self):
  969. """Test that process resources are properly cleaned up."""
  970. import sys
  971. driver = ProcessFilterDriver(
  972. process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
  973. )
  974. # Use the driver
  975. result = driver.clean(b"test")
  976. self.assertEqual(result, b"TEST")
  977. # Process should be running
  978. self.assertIsNotNone(driver._process)
  979. self.assertIsNone(driver._process.poll()) # None means still running
  980. # Remember the old process to check it was terminated
  981. old_process = driver._process
  982. # Manually clean up (simulates __del__)
  983. driver.cleanup()
  984. # Process reference should be cleared
  985. self.assertIsNone(driver._process)
  986. self.assertIsNone(driver._protocol)
  987. # Old process should be terminated
  988. self.assertIsNotNone(old_process.poll()) # Not None means terminated
  989. def test_required_filter_error_propagation(self):
  990. """Test that errors are properly propagated when filter is required."""
  991. driver = ProcessFilterDriver(
  992. process_cmd="/definitely/nonexistent/command", required=True
  993. )
  994. with self.assertRaises(FilterError) as cm:
  995. driver.clean(b"test data")
  996. self.assertIn("Failed to start process filter", str(cm.exception))
  997. def test_two_phase_response_protocol(self):
  998. """Test filter protocol with two-phase response (initial + final headers).
  999. This test verifies that the filter correctly handles the Git LFS protocol
  1000. where filters send:
  1001. 1. Initial headers with status
  1002. 2. Content data
  1003. 3. Final headers with status
  1004. This is the format used by git-lfs and documented in the Git filter protocol.
  1005. """
  1006. import sys
  1007. import tempfile
  1008. # Create a filter that follows the two-phase protocol
  1009. filter_script = """import sys
  1010. def read_exact(n):
  1011. data = b""
  1012. while len(data) < n:
  1013. chunk = sys.stdin.buffer.read(n - len(data))
  1014. if not chunk:
  1015. break
  1016. data += chunk
  1017. return data
  1018. def write_pkt(data):
  1019. if data is None:
  1020. sys.stdout.buffer.write(b"0000")
  1021. else:
  1022. length = len(data) + 4
  1023. sys.stdout.buffer.write(("{:04x}".format(length)).encode())
  1024. sys.stdout.buffer.write(data)
  1025. sys.stdout.buffer.flush()
  1026. def read_pkt():
  1027. size_bytes = read_exact(4)
  1028. if not size_bytes:
  1029. return None
  1030. size = int(size_bytes.decode(), 16)
  1031. if size == 0:
  1032. return None
  1033. return read_exact(size - 4)
  1034. # Handshake
  1035. client_hello = read_pkt()
  1036. version = read_pkt()
  1037. flush = read_pkt()
  1038. write_pkt(b"git-filter-server")
  1039. write_pkt(b"version=2")
  1040. write_pkt(None)
  1041. # Read and echo capabilities
  1042. caps = []
  1043. while True:
  1044. cap = read_pkt()
  1045. if cap is None:
  1046. break
  1047. caps.append(cap)
  1048. for cap in caps:
  1049. write_pkt(cap)
  1050. write_pkt(None)
  1051. # Process commands
  1052. while True:
  1053. headers = {}
  1054. while True:
  1055. line = read_pkt()
  1056. if line is None:
  1057. break
  1058. if b"=" in line:
  1059. k, v = line.split(b"=", 1)
  1060. headers[k.decode()] = v.decode()
  1061. if not headers:
  1062. break
  1063. # Read data
  1064. data_chunks = []
  1065. while True:
  1066. chunk = read_pkt()
  1067. if chunk is None:
  1068. break
  1069. data_chunks.append(chunk)
  1070. data = b"".join(data_chunks)
  1071. # Process
  1072. if headers.get("command") == "clean":
  1073. result = data.upper()
  1074. elif headers.get("command") == "smudge":
  1075. result = data.lower()
  1076. else:
  1077. result = data
  1078. # TWO-PHASE RESPONSE: Send initial headers
  1079. write_pkt(b"status=success")
  1080. write_pkt(None)
  1081. # Send result data
  1082. chunk_size = 65516
  1083. for i in range(0, len(result), chunk_size):
  1084. write_pkt(result[i:i+chunk_size])
  1085. write_pkt(None)
  1086. # TWO-PHASE RESPONSE: Send final headers (empty list to keep status=success)
  1087. write_pkt(None)
  1088. """
  1089. fd, filter_path = tempfile.mkstemp(
  1090. suffix=".py", prefix="test_filter_two_phase_"
  1091. )
  1092. try:
  1093. os.write(fd, filter_script.encode())
  1094. os.close(fd)
  1095. if os.name != "nt":
  1096. os.chmod(filter_path, 0o755)
  1097. driver = ProcessFilterDriver(
  1098. process_cmd=f"{sys.executable} {filter_path}", required=True
  1099. )
  1100. # Test clean operation
  1101. test_data = b"hello world"
  1102. result = driver.clean(test_data)
  1103. self.assertEqual(result, b"HELLO WORLD")
  1104. # Test smudge operation
  1105. result = driver.smudge(b"HELLO WORLD", b"test.txt")
  1106. self.assertEqual(result, b"hello world")
  1107. driver.cleanup()
  1108. finally:
  1109. if os.path.exists(filter_path):
  1110. os.unlink(filter_path)
  1111. def test_two_phase_response_with_status_messages(self):
  1112. """Test filter that sends status messages in final headers.
  1113. Some filters (like git-lfs) may send progress or status messages
  1114. in the final headers. This test verifies that we can handle those.
  1115. """
  1116. import sys
  1117. import tempfile
  1118. # Create a filter that sends extra status info in final headers
  1119. filter_script = """import sys
  1120. def read_exact(n):
  1121. data = b""
  1122. while len(data) < n:
  1123. chunk = sys.stdin.buffer.read(n - len(data))
  1124. if not chunk:
  1125. break
  1126. data += chunk
  1127. return data
  1128. def write_pkt(data):
  1129. if data is None:
  1130. sys.stdout.buffer.write(b"0000")
  1131. else:
  1132. length = len(data) + 4
  1133. sys.stdout.buffer.write(("{:04x}".format(length)).encode())
  1134. sys.stdout.buffer.write(data)
  1135. sys.stdout.buffer.flush()
  1136. def read_pkt():
  1137. size_bytes = read_exact(4)
  1138. if not size_bytes:
  1139. return None
  1140. size = int(size_bytes.decode(), 16)
  1141. if size == 0:
  1142. return None
  1143. return read_exact(size - 4)
  1144. # Handshake
  1145. client_hello = read_pkt()
  1146. version = read_pkt()
  1147. flush = read_pkt()
  1148. write_pkt(b"git-filter-server")
  1149. write_pkt(b"version=2")
  1150. write_pkt(None)
  1151. # Read and echo capabilities
  1152. caps = []
  1153. while True:
  1154. cap = read_pkt()
  1155. if cap is None:
  1156. break
  1157. caps.append(cap)
  1158. for cap in caps:
  1159. write_pkt(cap)
  1160. write_pkt(None)
  1161. # Process commands
  1162. while True:
  1163. headers = {}
  1164. while True:
  1165. line = read_pkt()
  1166. if line is None:
  1167. break
  1168. if b"=" in line:
  1169. k, v = line.split(b"=", 1)
  1170. headers[k.decode()] = v.decode()
  1171. if not headers:
  1172. break
  1173. # Read data
  1174. data_chunks = []
  1175. while True:
  1176. chunk = read_pkt()
  1177. if chunk is None:
  1178. break
  1179. data_chunks.append(chunk)
  1180. data = b"".join(data_chunks)
  1181. # Process
  1182. result = data.upper()
  1183. # Send initial headers
  1184. write_pkt(b"status=success")
  1185. write_pkt(None)
  1186. # Send result data
  1187. chunk_size = 65516
  1188. for i in range(0, len(result), chunk_size):
  1189. write_pkt(result[i:i+chunk_size])
  1190. write_pkt(None)
  1191. # Send final headers with progress messages (like git-lfs does)
  1192. write_pkt(b"status=success")
  1193. write_pkt(None)
  1194. """
  1195. fd, filter_path = tempfile.mkstemp(suffix=".py", prefix="test_filter_status_")
  1196. try:
  1197. os.write(fd, filter_script.encode())
  1198. os.close(fd)
  1199. if os.name != "nt":
  1200. os.chmod(filter_path, 0o755)
  1201. driver = ProcessFilterDriver(
  1202. process_cmd=f"{sys.executable} {filter_path}", required=True
  1203. )
  1204. # Test clean operation with status messages
  1205. test_data = b"test data with status"
  1206. result = driver.clean(test_data)
  1207. self.assertEqual(result, b"TEST DATA WITH STATUS")
  1208. driver.cleanup()
  1209. finally:
  1210. if os.path.exists(filter_path):
  1211. os.unlink(filter_path)
  1212. def test_two_phase_response_with_final_error(self):
  1213. """Test filter that reports error in final headers.
  1214. The Git protocol allows filters to report success initially,
  1215. then report an error in the final headers. This test ensures
  1216. we handle that correctly.
  1217. """
  1218. import sys
  1219. import tempfile
  1220. # Create a filter that sends error in final headers
  1221. filter_script = """import sys
  1222. def read_exact(n):
  1223. data = b""
  1224. while len(data) < n:
  1225. chunk = sys.stdin.buffer.read(n - len(data))
  1226. if not chunk:
  1227. break
  1228. data += chunk
  1229. return data
  1230. def write_pkt(data):
  1231. if data is None:
  1232. sys.stdout.buffer.write(b"0000")
  1233. else:
  1234. length = len(data) + 4
  1235. sys.stdout.buffer.write(("{:04x}".format(length)).encode())
  1236. sys.stdout.buffer.write(data)
  1237. sys.stdout.buffer.flush()
  1238. def read_pkt():
  1239. size_bytes = read_exact(4)
  1240. if not size_bytes:
  1241. return None
  1242. size = int(size_bytes.decode(), 16)
  1243. if size == 0:
  1244. return None
  1245. return read_exact(size - 4)
  1246. # Handshake
  1247. client_hello = read_pkt()
  1248. version = read_pkt()
  1249. flush = read_pkt()
  1250. write_pkt(b"git-filter-server")
  1251. write_pkt(b"version=2")
  1252. write_pkt(None)
  1253. # Read and echo capabilities
  1254. caps = []
  1255. while True:
  1256. cap = read_pkt()
  1257. if cap is None:
  1258. break
  1259. caps.append(cap)
  1260. for cap in caps:
  1261. write_pkt(cap)
  1262. write_pkt(None)
  1263. # Process commands
  1264. while True:
  1265. headers = {}
  1266. while True:
  1267. line = read_pkt()
  1268. if line is None:
  1269. break
  1270. if b"=" in line:
  1271. k, v = line.split(b"=", 1)
  1272. headers[k.decode()] = v.decode()
  1273. if not headers:
  1274. break
  1275. # Read data
  1276. data_chunks = []
  1277. while True:
  1278. chunk = read_pkt()
  1279. if chunk is None:
  1280. break
  1281. data_chunks.append(chunk)
  1282. data = b"".join(data_chunks)
  1283. # Send initial headers with success
  1284. write_pkt(b"status=success")
  1285. write_pkt(None)
  1286. # Send partial result
  1287. write_pkt(b"PARTIAL")
  1288. write_pkt(None)
  1289. # Send final headers with error (simulating processing failure)
  1290. write_pkt(b"status=error")
  1291. write_pkt(None)
  1292. """
  1293. fd, filter_path = tempfile.mkstemp(suffix=".py", prefix="test_filter_error_")
  1294. try:
  1295. os.write(fd, filter_script.encode())
  1296. os.close(fd)
  1297. if os.name != "nt":
  1298. os.chmod(filter_path, 0o755)
  1299. driver = ProcessFilterDriver(
  1300. process_cmd=f"{sys.executable} {filter_path}", required=True
  1301. )
  1302. # Should raise FilterError due to final status being error
  1303. with self.assertRaises(FilterError) as cm:
  1304. driver.clean(b"test data")
  1305. self.assertIn("final status: error", str(cm.exception))
  1306. driver.cleanup()
  1307. finally:
  1308. if os.path.exists(filter_path):
  1309. os.unlink(filter_path)
  1310. _PASSTHROUGH_FILTER_SCRIPT = """import sys
  1311. while True:
  1312. line = sys.stdin.buffer.read()
  1313. if not line:
  1314. break
  1315. sys.stdout.buffer.write(line)
  1316. sys.stdout.buffer.flush()
  1317. """
  1318. @contextmanager
  1319. def create_passthrough_filter() -> Iterator[str]:
  1320. filter_script = _PASSTHROUGH_FILTER_SCRIPT
  1321. with tempfile.NamedTemporaryFile(
  1322. suffix=".py", delete=False, prefix="test_filter_passthrough_"
  1323. ) as f:
  1324. f.write(filter_script.encode())
  1325. path = f.name
  1326. try:
  1327. if os.name != "nt": # Not Windows
  1328. os.chmod(path, 0o755)
  1329. yield path
  1330. finally:
  1331. try:
  1332. os.unlink(path)
  1333. except FileNotFoundError:
  1334. pass