2
0

test_client.py 92 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593
  1. # test_client.py -- Tests for the git protocol, client side
  2. # Copyright (C) 2009 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. import base64
  22. import os
  23. import shutil
  24. import sys
  25. import tempfile
  26. import warnings
  27. from io import BytesIO
  28. from typing import NoReturn
  29. from unittest.mock import patch
  30. from urllib.parse import quote as urlquote
  31. from urllib.parse import urlparse
  32. import dulwich
  33. from dulwich import client
  34. from dulwich.bundle import create_bundle_from_repo, write_bundle
  35. from dulwich.client import (
  36. BundleClient,
  37. FetchPackResult,
  38. GitProtocolError,
  39. HangupException,
  40. HttpGitClient,
  41. InvalidWants,
  42. LocalGitClient,
  43. PLinkSSHVendor,
  44. ReportStatusParser,
  45. SendPackError,
  46. SSHGitClient,
  47. StrangeHostname,
  48. SubprocessSSHVendor,
  49. TCPGitClient,
  50. TraditionalGitClient,
  51. _extract_symrefs_and_agent,
  52. _remote_error_from_stderr,
  53. _win32_url_to_path,
  54. check_wants,
  55. default_urllib3_manager,
  56. get_credentials_from_store,
  57. get_transport_and_path,
  58. get_transport_and_path_from_url,
  59. parse_rsync_url,
  60. )
  61. from dulwich.config import ConfigDict
  62. from dulwich.objects import Blob, Commit, Tree
  63. from dulwich.pack import pack_objects_to_data, write_pack_data, write_pack_objects
  64. from dulwich.protocol import DEFAULT_GIT_PROTOCOL_VERSION_FETCH, TCP_GIT_PORT, Protocol
  65. from dulwich.repo import MemoryRepo, Repo
  66. from dulwich.tests.utils import open_repo, setup_warning_catcher, tear_down_repo
  67. from . import TestCase, skipIf
  68. class DummyClient(TraditionalGitClient):
  69. def __init__(self, can_read, read, write) -> None:
  70. self.can_read = can_read
  71. self.read = read
  72. self.write = write
  73. TraditionalGitClient.__init__(self)
  74. def _connect(self, service, path, protocol_version=None):
  75. return Protocol(self.read, self.write), self.can_read, None
  76. class DummyPopen:
  77. def __init__(self, *args, **kwards) -> None:
  78. self.stdin = BytesIO(b"stdin")
  79. self.stdout = BytesIO(b"stdout")
  80. self.stderr = BytesIO(b"stderr")
  81. self.returncode = 0
  82. self.args = args
  83. self.kwargs = kwards
  84. def communicate(self, *args, **kwards):
  85. return ("Running", "")
  86. def wait(self, *args, **kwards) -> bool:
  87. return False
  88. # TODO(durin42): add unit-level tests of GitClient
  89. class GitClientTests(TestCase):
  90. def setUp(self) -> None:
  91. super().setUp()
  92. self.rout = BytesIO()
  93. self.rin = BytesIO()
  94. self.client = DummyClient(lambda x: True, self.rin.read, self.rout.write)
  95. def test_caps(self) -> None:
  96. agent_cap = "agent=dulwich/{}.{}.{}".format(*dulwich.__version__).encode(
  97. "ascii"
  98. )
  99. self.assertEqual(
  100. {
  101. b"multi_ack",
  102. b"side-band-64k",
  103. b"ofs-delta",
  104. b"thin-pack",
  105. b"multi_ack_detailed",
  106. b"shallow",
  107. agent_cap,
  108. },
  109. set(self.client._fetch_capabilities),
  110. )
  111. self.assertEqual(
  112. {
  113. b"delete-refs",
  114. b"ofs-delta",
  115. b"report-status",
  116. b"side-band-64k",
  117. agent_cap,
  118. },
  119. set(self.client._send_capabilities),
  120. )
  121. def test_archive_ack(self) -> None:
  122. self.rin.write(b"0009NACK\n0000")
  123. self.rin.seek(0)
  124. self.client.archive(b"bla", b"HEAD", None, None)
  125. self.assertEqual(self.rout.getvalue(), b"0011argument HEAD0000")
  126. def test_fetch_empty(self) -> None:
  127. self.rin.write(b"0000")
  128. self.rin.seek(0)
  129. def check_heads(heads, **kwargs):
  130. self.assertEqual(heads, {})
  131. return []
  132. ret = self.client.fetch_pack(b"/", check_heads, None, None)
  133. self.assertEqual({}, ret.refs)
  134. self.assertEqual({}, ret.symrefs)
  135. def test_fetch_pack_ignores_magic_ref(self) -> None:
  136. self.rin.write(
  137. b"00000000000000000000000000000000000000000000 capabilities^{}"
  138. b"\x00 multi_ack "
  139. b"thin-pack side-band side-band-64k ofs-delta shallow no-progress "
  140. b"include-tag\n"
  141. b"0000"
  142. )
  143. self.rin.seek(0)
  144. def check_heads(heads, **kwargs):
  145. self.assertEqual({}, heads)
  146. return []
  147. ret = self.client.fetch_pack(b"bla", check_heads, None, None, None)
  148. self.assertEqual({}, ret.refs)
  149. self.assertEqual({}, ret.symrefs)
  150. self.assertEqual(self.rout.getvalue(), b"0000")
  151. def test_fetch_pack_none(self) -> None:
  152. self.rin.write(
  153. b"008855dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7 HEAD\x00multi_ack "
  154. b"thin-pack side-band side-band-64k ofs-delta shallow no-progress "
  155. b"include-tag\n"
  156. b"0000"
  157. )
  158. self.rin.seek(0)
  159. ret = self.client.fetch_pack(
  160. b"bla", lambda heads, depth=None: [], None, None, None
  161. )
  162. self.assertEqual(
  163. {b"HEAD": b"55dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7"}, ret.refs
  164. )
  165. self.assertEqual({}, ret.symrefs)
  166. self.assertEqual(self.rout.getvalue(), b"0000")
  167. def test_handle_upload_pack_head_deepen_since(self) -> None:
  168. # Test that deepen-since command is properly sent
  169. from dulwich.client import _handle_upload_pack_head
  170. self.rin.write(b"0008NAK\n0000")
  171. self.rin.seek(0)
  172. class DummyGraphWalker:
  173. def __iter__(self):
  174. return self
  175. def __next__(self):
  176. return None
  177. proto = Protocol(self.rin.read, self.rout.write)
  178. capabilities = [b"shallow", b"deepen-since"]
  179. wants = [b"55dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7"]
  180. graph_walker = DummyGraphWalker()
  181. _handle_upload_pack_head(
  182. proto=proto,
  183. capabilities=capabilities,
  184. graph_walker=graph_walker,
  185. wants=wants,
  186. can_read=None,
  187. depth=None,
  188. protocol_version=0,
  189. shallow_since="2023-01-01T00:00:00Z",
  190. )
  191. # Verify the deepen-since command was sent
  192. output = self.rout.getvalue()
  193. self.assertIn(b"deepen-since 2023-01-01T00:00:00Z\n", output)
  194. def test_handle_upload_pack_head_deepen_not(self) -> None:
  195. # Test that deepen-not command is properly sent
  196. from dulwich.client import _handle_upload_pack_head
  197. self.rin.write(b"0008NAK\n0000")
  198. self.rin.seek(0)
  199. class DummyGraphWalker:
  200. def __iter__(self):
  201. return self
  202. def __next__(self):
  203. return None
  204. proto = Protocol(self.rin.read, self.rout.write)
  205. capabilities = [b"shallow", b"deepen-not"]
  206. wants = [b"55dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7"]
  207. graph_walker = DummyGraphWalker()
  208. _handle_upload_pack_head(
  209. proto=proto,
  210. capabilities=capabilities,
  211. graph_walker=graph_walker,
  212. wants=wants,
  213. can_read=None,
  214. depth=None,
  215. protocol_version=0,
  216. shallow_exclude=["refs/heads/excluded"],
  217. )
  218. # Verify the deepen-not command was sent
  219. output = self.rout.getvalue()
  220. self.assertIn(b"deepen-not refs/heads/excluded\n", output)
  221. def test_handle_upload_pack_head_deepen_not_multiple(self) -> None:
  222. # Test that multiple deepen-not commands are properly sent
  223. from dulwich.client import _handle_upload_pack_head
  224. self.rin.write(b"0008NAK\n0000")
  225. self.rin.seek(0)
  226. class DummyGraphWalker:
  227. def __iter__(self):
  228. return self
  229. def __next__(self):
  230. return None
  231. proto = Protocol(self.rin.read, self.rout.write)
  232. capabilities = [b"shallow", b"deepen-not"]
  233. wants = [b"55dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7"]
  234. graph_walker = DummyGraphWalker()
  235. _handle_upload_pack_head(
  236. proto=proto,
  237. capabilities=capabilities,
  238. graph_walker=graph_walker,
  239. wants=wants,
  240. can_read=None,
  241. depth=None,
  242. protocol_version=0,
  243. shallow_exclude=["refs/heads/excluded1", "refs/heads/excluded2"],
  244. )
  245. # Verify both deepen-not commands were sent
  246. output = self.rout.getvalue()
  247. self.assertIn(b"deepen-not refs/heads/excluded1\n", output)
  248. self.assertIn(b"deepen-not refs/heads/excluded2\n", output)
  249. def test_handle_upload_pack_head_deepen_since_and_not(self) -> None:
  250. # Test that deepen-since and deepen-not can be used together
  251. from dulwich.client import _handle_upload_pack_head
  252. self.rin.write(b"0008NAK\n0000")
  253. self.rin.seek(0)
  254. class DummyGraphWalker:
  255. def __iter__(self):
  256. return self
  257. def __next__(self):
  258. return None
  259. proto = Protocol(self.rin.read, self.rout.write)
  260. capabilities = [b"shallow", b"deepen-since", b"deepen-not"]
  261. wants = [b"55dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7"]
  262. graph_walker = DummyGraphWalker()
  263. _handle_upload_pack_head(
  264. proto=proto,
  265. capabilities=capabilities,
  266. graph_walker=graph_walker,
  267. wants=wants,
  268. can_read=None,
  269. depth=None,
  270. protocol_version=0,
  271. shallow_since="2023-01-01T00:00:00Z",
  272. shallow_exclude=["refs/heads/excluded"],
  273. )
  274. # Verify both deepen-since and deepen-not commands were sent
  275. output = self.rout.getvalue()
  276. self.assertIn(b"deepen-since 2023-01-01T00:00:00Z\n", output)
  277. self.assertIn(b"deepen-not refs/heads/excluded\n", output)
  278. def test_send_pack_no_sideband64k_with_update_ref_error(self) -> None:
  279. # No side-bank-64k reported by server shouldn't try to parse
  280. # side band data
  281. pkts = [
  282. b"55dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7 capabilities^{}"
  283. b"\x00 report-status delete-refs ofs-delta\n",
  284. b"",
  285. b"unpack ok",
  286. b"ng refs/foo/bar pre-receive hook declined",
  287. b"",
  288. ]
  289. for pkt in pkts:
  290. if pkt == b"":
  291. self.rin.write(b"0000")
  292. else:
  293. self.rin.write(("%04x" % (len(pkt) + 4)).encode("ascii") + pkt)
  294. self.rin.seek(0)
  295. tree = Tree()
  296. commit = Commit()
  297. commit.tree = tree
  298. commit.parents = []
  299. commit.author = commit.committer = b"test user"
  300. commit.commit_time = commit.author_time = 1174773719
  301. commit.commit_timezone = commit.author_timezone = 0
  302. commit.encoding = b"UTF-8"
  303. commit.message = b"test message"
  304. def update_refs(refs):
  305. return {
  306. b"refs/foo/bar": commit.id,
  307. }
  308. def generate_pack_data(have, want, ofs_delta=False, progress=None):
  309. return pack_objects_to_data(
  310. [
  311. (commit, None),
  312. (tree, b""),
  313. ]
  314. )
  315. result = self.client.send_pack("blah", update_refs, generate_pack_data)
  316. self.assertEqual(
  317. {b"refs/foo/bar": "pre-receive hook declined"}, result.ref_status
  318. )
  319. self.assertEqual({b"refs/foo/bar": commit.id}, result.refs)
  320. def test_send_pack_none(self) -> None:
  321. # Set ref to current value
  322. self.rin.write(
  323. b"0078310ca9477129b8586fa2afc779c1f57cf64bba6c "
  324. b"refs/heads/master\x00 report-status delete-refs "
  325. b"side-band-64k quiet ofs-delta\n"
  326. b"0000"
  327. )
  328. self.rin.seek(0)
  329. def update_refs(refs):
  330. return {b"refs/heads/master": b"310ca9477129b8586fa2afc779c1f57cf64bba6c"}
  331. def generate_pack_data(have, want, ofs_delta=False, progress=None):
  332. return 0, []
  333. self.client.send_pack(b"/", update_refs, generate_pack_data)
  334. self.assertEqual(self.rout.getvalue(), b"0000")
  335. def test_send_pack_keep_and_delete(self) -> None:
  336. self.rin.write(
  337. b"0063310ca9477129b8586fa2afc779c1f57cf64bba6c "
  338. b"refs/heads/master\x00report-status delete-refs ofs-delta\n"
  339. b"003f310ca9477129b8586fa2afc779c1f57cf64bba6c refs/heads/keepme\n"
  340. b"0000000eunpack ok\n"
  341. b"0019ok refs/heads/master\n"
  342. b"0000"
  343. )
  344. self.rin.seek(0)
  345. def update_refs(refs):
  346. return {b"refs/heads/master": b"0" * 40}
  347. def generate_pack_data(have, want, ofs_delta=False, progress=None):
  348. return 0, []
  349. self.client.send_pack(b"/", update_refs, generate_pack_data)
  350. self.assertEqual(
  351. self.rout.getvalue(),
  352. b"008b310ca9477129b8586fa2afc779c1f57cf64bba6c "
  353. b"0000000000000000000000000000000000000000 "
  354. b"refs/heads/master\x00delete-refs ofs-delta report-status0000",
  355. )
  356. def test_send_pack_delete_only(self) -> None:
  357. self.rin.write(
  358. b"0063310ca9477129b8586fa2afc779c1f57cf64bba6c "
  359. b"refs/heads/master\x00report-status delete-refs ofs-delta\n"
  360. b"0000000eunpack ok\n"
  361. b"0019ok refs/heads/master\n"
  362. b"0000"
  363. )
  364. self.rin.seek(0)
  365. def update_refs(refs):
  366. return {b"refs/heads/master": b"0" * 40}
  367. def generate_pack_data(have, want, ofs_delta=False, progress=None):
  368. return 0, []
  369. self.client.send_pack(b"/", update_refs, generate_pack_data)
  370. self.assertEqual(
  371. self.rout.getvalue(),
  372. b"008b310ca9477129b8586fa2afc779c1f57cf64bba6c "
  373. b"0000000000000000000000000000000000000000 "
  374. b"refs/heads/master\x00delete-refs ofs-delta report-status0000",
  375. )
  376. def test_send_pack_new_ref_only(self) -> None:
  377. self.rin.write(
  378. b"0063310ca9477129b8586fa2afc779c1f57cf64bba6c "
  379. b"refs/heads/master\x00report-status delete-refs ofs-delta\n"
  380. b"0000000eunpack ok\n"
  381. b"0019ok refs/heads/blah12\n"
  382. b"0000"
  383. )
  384. self.rin.seek(0)
  385. def update_refs(refs):
  386. return {
  387. b"refs/heads/blah12": b"310ca9477129b8586fa2afc779c1f57cf64bba6c",
  388. b"refs/heads/master": b"310ca9477129b8586fa2afc779c1f57cf64bba6c",
  389. }
  390. def generate_pack_data(have, want, ofs_delta=False, progress=None):
  391. return 0, []
  392. f = BytesIO()
  393. write_pack_objects(f.write, [])
  394. self.client.send_pack("/", update_refs, generate_pack_data)
  395. self.assertEqual(
  396. self.rout.getvalue(),
  397. b"008b0000000000000000000000000000000000000000 "
  398. b"310ca9477129b8586fa2afc779c1f57cf64bba6c "
  399. b"refs/heads/blah12\x00delete-refs ofs-delta report-status0000"
  400. + f.getvalue(),
  401. )
  402. def test_send_pack_new_ref(self) -> None:
  403. self.rin.write(
  404. b"0064310ca9477129b8586fa2afc779c1f57cf64bba6c "
  405. b"refs/heads/master\x00 report-status delete-refs ofs-delta\n"
  406. b"0000000eunpack ok\n"
  407. b"0019ok refs/heads/blah12\n"
  408. b"0000"
  409. )
  410. self.rin.seek(0)
  411. tree = Tree()
  412. commit = Commit()
  413. commit.tree = tree
  414. commit.parents = []
  415. commit.author = commit.committer = b"test user"
  416. commit.commit_time = commit.author_time = 1174773719
  417. commit.commit_timezone = commit.author_timezone = 0
  418. commit.encoding = b"UTF-8"
  419. commit.message = b"test message"
  420. def update_refs(refs):
  421. return {
  422. b"refs/heads/blah12": commit.id,
  423. b"refs/heads/master": b"310ca9477129b8586fa2afc779c1f57cf64bba6c",
  424. }
  425. def generate_pack_data(have, want, ofs_delta=False, progress=None):
  426. return pack_objects_to_data(
  427. [
  428. (commit, None),
  429. (tree, b""),
  430. ]
  431. )
  432. f = BytesIO()
  433. count, records = generate_pack_data(None, None)
  434. write_pack_data(f.write, records, num_records=count)
  435. self.client.send_pack(b"/", update_refs, generate_pack_data)
  436. self.assertEqual(
  437. self.rout.getvalue(),
  438. b"008b0000000000000000000000000000000000000000 "
  439. + commit.id
  440. + b" refs/heads/blah12\x00delete-refs ofs-delta report-status0000"
  441. + f.getvalue(),
  442. )
  443. def test_send_pack_no_deleteref_delete_only(self) -> None:
  444. pkts = [
  445. b"310ca9477129b8586fa2afc779c1f57cf64bba6c refs/heads/master"
  446. b"\x00 report-status ofs-delta\n",
  447. b"",
  448. b"",
  449. ]
  450. for pkt in pkts:
  451. if pkt == b"":
  452. self.rin.write(b"0000")
  453. else:
  454. self.rin.write(("%04x" % (len(pkt) + 4)).encode("ascii") + pkt)
  455. self.rin.seek(0)
  456. def update_refs(refs):
  457. return {b"refs/heads/master": b"0" * 40}
  458. def generate_pack_data(have, want, ofs_delta=False, progress=None):
  459. return 0, []
  460. result = self.client.send_pack(b"/", update_refs, generate_pack_data)
  461. self.assertEqual(
  462. result.ref_status,
  463. {b"refs/heads/master": "remote does not support deleting refs"},
  464. )
  465. self.assertEqual(
  466. result.refs,
  467. {b"refs/heads/master": b"310ca9477129b8586fa2afc779c1f57cf64bba6c"},
  468. )
  469. self.assertEqual(self.rout.getvalue(), b"0000")
  470. class TestGetTransportAndPath(TestCase):
  471. def test_tcp(self) -> None:
  472. c, path = get_transport_and_path("git://foo.com/bar/baz")
  473. self.assertIsInstance(c, TCPGitClient)
  474. self.assertEqual("foo.com", c._host)
  475. self.assertEqual(TCP_GIT_PORT, c._port)
  476. self.assertEqual("/bar/baz", path)
  477. def test_tcp_port(self) -> None:
  478. c, path = get_transport_and_path("git://foo.com:1234/bar/baz")
  479. self.assertIsInstance(c, TCPGitClient)
  480. self.assertEqual("foo.com", c._host)
  481. self.assertEqual(1234, c._port)
  482. self.assertEqual("/bar/baz", path)
  483. def test_tcp_ipv6(self) -> None:
  484. c, path = get_transport_and_path("git://[::1]/bar/baz")
  485. self.assertIsInstance(c, TCPGitClient)
  486. self.assertEqual("::1", c._host)
  487. self.assertEqual(TCP_GIT_PORT, c._port)
  488. self.assertEqual("/bar/baz", path)
  489. def test_tcp_ipv6_port(self) -> None:
  490. c, path = get_transport_and_path("git://[2001:db8::1]:1234/bar/baz")
  491. self.assertIsInstance(c, TCPGitClient)
  492. self.assertEqual("2001:db8::1", c._host)
  493. self.assertEqual(1234, c._port)
  494. self.assertEqual("/bar/baz", path)
  495. def test_git_ssh_explicit(self) -> None:
  496. c, path = get_transport_and_path("git+ssh://foo.com/bar/baz")
  497. self.assertIsInstance(c, SSHGitClient)
  498. self.assertEqual("foo.com", c.host)
  499. self.assertEqual(None, c.port)
  500. self.assertEqual(None, c.username)
  501. self.assertEqual("/bar/baz", path)
  502. def test_ssh_explicit(self) -> None:
  503. c, path = get_transport_and_path("ssh://foo.com/bar/baz")
  504. self.assertIsInstance(c, SSHGitClient)
  505. self.assertEqual("foo.com", c.host)
  506. self.assertEqual(None, c.port)
  507. self.assertEqual(None, c.username)
  508. self.assertEqual("/bar/baz", path)
  509. def test_ssh_port_explicit(self) -> None:
  510. c, path = get_transport_and_path("git+ssh://foo.com:1234/bar/baz")
  511. self.assertIsInstance(c, SSHGitClient)
  512. self.assertEqual("foo.com", c.host)
  513. self.assertEqual(1234, c.port)
  514. self.assertEqual("/bar/baz", path)
  515. def test_username_and_port_explicit_unknown_scheme(self) -> None:
  516. c, path = get_transport_and_path("unknown://git@server:7999/dply/stuff.git")
  517. self.assertIsInstance(c, SSHGitClient)
  518. self.assertEqual("unknown", c.host)
  519. self.assertEqual("//git@server:7999/dply/stuff.git", path)
  520. def test_username_and_port_explicit(self) -> None:
  521. c, path = get_transport_and_path("ssh://git@server:7999/dply/stuff.git")
  522. self.assertIsInstance(c, SSHGitClient)
  523. self.assertEqual("git", c.username)
  524. self.assertEqual("server", c.host)
  525. self.assertEqual(7999, c.port)
  526. self.assertEqual("/dply/stuff.git", path)
  527. def test_ssh_abspath_doubleslash(self) -> None:
  528. c, path = get_transport_and_path("git+ssh://foo.com//bar/baz")
  529. self.assertIsInstance(c, SSHGitClient)
  530. self.assertEqual("foo.com", c.host)
  531. self.assertEqual(None, c.port)
  532. self.assertEqual(None, c.username)
  533. self.assertEqual("//bar/baz", path)
  534. def test_ssh_port(self) -> None:
  535. c, path = get_transport_and_path("git+ssh://foo.com:1234/bar/baz")
  536. self.assertIsInstance(c, SSHGitClient)
  537. self.assertEqual("foo.com", c.host)
  538. self.assertEqual(1234, c.port)
  539. self.assertEqual("/bar/baz", path)
  540. def test_ssh_implicit(self) -> None:
  541. c, path = get_transport_and_path("foo:/bar/baz")
  542. self.assertIsInstance(c, SSHGitClient)
  543. self.assertEqual("foo", c.host)
  544. self.assertEqual(None, c.port)
  545. self.assertEqual(None, c.username)
  546. self.assertEqual("/bar/baz", path)
  547. def test_ssh_host(self) -> None:
  548. c, path = get_transport_and_path("foo.com:/bar/baz")
  549. self.assertIsInstance(c, SSHGitClient)
  550. self.assertEqual("foo.com", c.host)
  551. self.assertEqual(None, c.port)
  552. self.assertEqual(None, c.username)
  553. self.assertEqual("/bar/baz", path)
  554. def test_ssh_user_host(self) -> None:
  555. c, path = get_transport_and_path("user@foo.com:/bar/baz")
  556. self.assertIsInstance(c, SSHGitClient)
  557. self.assertEqual("foo.com", c.host)
  558. self.assertEqual(None, c.port)
  559. self.assertEqual("user", c.username)
  560. self.assertEqual("/bar/baz", path)
  561. def test_ssh_relpath(self) -> None:
  562. c, path = get_transport_and_path("foo:bar/baz")
  563. self.assertIsInstance(c, SSHGitClient)
  564. self.assertEqual("foo", c.host)
  565. self.assertEqual(None, c.port)
  566. self.assertEqual(None, c.username)
  567. self.assertEqual("bar/baz", path)
  568. def test_ssh_host_relpath(self) -> None:
  569. c, path = get_transport_and_path("foo.com:bar/baz")
  570. self.assertIsInstance(c, SSHGitClient)
  571. self.assertEqual("foo.com", c.host)
  572. self.assertEqual(None, c.port)
  573. self.assertEqual(None, c.username)
  574. self.assertEqual("bar/baz", path)
  575. def test_ssh_user_host_relpath(self) -> None:
  576. c, path = get_transport_and_path("user@foo.com:bar/baz")
  577. self.assertIsInstance(c, SSHGitClient)
  578. self.assertEqual("foo.com", c.host)
  579. self.assertEqual(None, c.port)
  580. self.assertEqual("user", c.username)
  581. self.assertEqual("bar/baz", path)
  582. def test_local(self) -> None:
  583. c, path = get_transport_and_path("foo.bar/baz")
  584. self.assertIsInstance(c, LocalGitClient)
  585. self.assertEqual("foo.bar/baz", path)
  586. def test_ssh_with_config(self) -> None:
  587. # Test that core.sshCommand from config is passed to SSHGitClient
  588. from dulwich.config import ConfigDict
  589. config = ConfigDict()
  590. c, _path = get_transport_and_path(
  591. "ssh://git@github.com/user/repo.git", config=config
  592. )
  593. self.assertIsInstance(c, SSHGitClient)
  594. self.assertEqual(c.ssh_command, "ssh") # Now defaults to "ssh"
  595. config.set((b"core",), b"sshCommand", b"custom-ssh -o CustomOption=yes")
  596. c, _path = get_transport_and_path(
  597. "ssh://git@github.com/user/repo.git", config=config
  598. )
  599. self.assertIsInstance(c, SSHGitClient)
  600. self.assertEqual("custom-ssh -o CustomOption=yes", c.ssh_command)
  601. # Test rsync-style URL also gets the config
  602. c, _path = get_transport_and_path("git@github.com:user/repo.git", config=config)
  603. self.assertIsInstance(c, SSHGitClient)
  604. self.assertEqual("custom-ssh -o CustomOption=yes", c.ssh_command)
  605. @skipIf(sys.platform != "win32", "Behaviour only happens on windows.")
  606. def test_local_abs_windows_path(self) -> None:
  607. c, path = get_transport_and_path("C:\\foo.bar\\baz")
  608. self.assertIsInstance(c, LocalGitClient)
  609. self.assertEqual("C:\\foo.bar\\baz", path)
  610. def test_error(self) -> None:
  611. # Need to use a known urlparse.uses_netloc URL scheme to get the
  612. # expected parsing of the URL on Python versions less than 2.6.5
  613. c, _path = get_transport_and_path("prospero://bar/baz")
  614. self.assertIsInstance(c, SSHGitClient)
  615. def test_http(self) -> None:
  616. url = "https://github.com/jelmer/dulwich"
  617. c, path = get_transport_and_path(url)
  618. self.assertIsInstance(c, HttpGitClient)
  619. self.assertEqual("/jelmer/dulwich", path)
  620. def test_http_auth(self) -> None:
  621. url = "https://user:passwd@github.com/jelmer/dulwich"
  622. c, path = get_transport_and_path(url)
  623. self.assertIsInstance(c, HttpGitClient)
  624. self.assertEqual("/jelmer/dulwich", path)
  625. self.assertEqual("user", c._username)
  626. self.assertEqual("passwd", c._password)
  627. def test_http_auth_with_username(self) -> None:
  628. url = "https://github.com/jelmer/dulwich"
  629. c, path = get_transport_and_path(url, username="user2", password="blah")
  630. self.assertIsInstance(c, HttpGitClient)
  631. self.assertEqual("/jelmer/dulwich", path)
  632. self.assertEqual("user2", c._username)
  633. self.assertEqual("blah", c._password)
  634. def test_http_auth_with_username_and_in_url(self) -> None:
  635. url = "https://user:passwd@github.com/jelmer/dulwich"
  636. c, path = get_transport_and_path(url, username="user2", password="blah")
  637. self.assertIsInstance(c, HttpGitClient)
  638. self.assertEqual("/jelmer/dulwich", path)
  639. # Explicitly provided credentials should override URL credentials
  640. self.assertEqual("user2", c._username)
  641. self.assertEqual("blah", c._password)
  642. def test_http_no_auth(self) -> None:
  643. url = "https://github.com/jelmer/dulwich"
  644. c, path = get_transport_and_path(url)
  645. self.assertIsInstance(c, HttpGitClient)
  646. self.assertEqual("/jelmer/dulwich", path)
  647. self.assertIs(None, c._username)
  648. self.assertIs(None, c._password)
  649. class TestGetTransportAndPathFromUrl(TestCase):
  650. def test_tcp(self) -> None:
  651. c, path = get_transport_and_path_from_url("git://foo.com/bar/baz")
  652. self.assertIsInstance(c, TCPGitClient)
  653. self.assertEqual("foo.com", c._host)
  654. self.assertEqual(TCP_GIT_PORT, c._port)
  655. self.assertEqual("/bar/baz", path)
  656. def test_tcp_port(self) -> None:
  657. c, path = get_transport_and_path_from_url("git://foo.com:1234/bar/baz")
  658. self.assertIsInstance(c, TCPGitClient)
  659. self.assertEqual("foo.com", c._host)
  660. self.assertEqual(1234, c._port)
  661. self.assertEqual("/bar/baz", path)
  662. def test_ssh_explicit(self) -> None:
  663. c, path = get_transport_and_path_from_url("git+ssh://foo.com/bar/baz")
  664. self.assertIsInstance(c, SSHGitClient)
  665. self.assertEqual("foo.com", c.host)
  666. self.assertEqual(None, c.port)
  667. self.assertEqual(None, c.username)
  668. self.assertEqual("/bar/baz", path)
  669. def test_ssh_port_explicit(self) -> None:
  670. c, path = get_transport_and_path_from_url("git+ssh://foo.com:1234/bar/baz")
  671. self.assertIsInstance(c, SSHGitClient)
  672. self.assertEqual("foo.com", c.host)
  673. self.assertEqual(1234, c.port)
  674. self.assertEqual("/bar/baz", path)
  675. def test_ssh_homepath(self) -> None:
  676. c, path = get_transport_and_path_from_url("git+ssh://foo.com/~/bar/baz")
  677. self.assertIsInstance(c, SSHGitClient)
  678. self.assertEqual("foo.com", c.host)
  679. self.assertEqual(None, c.port)
  680. self.assertEqual(None, c.username)
  681. self.assertEqual("/~/bar/baz", path)
  682. def test_ssh_port_homepath(self) -> None:
  683. c, path = get_transport_and_path_from_url("git+ssh://foo.com:1234/~/bar/baz")
  684. self.assertIsInstance(c, SSHGitClient)
  685. self.assertEqual("foo.com", c.host)
  686. self.assertEqual(1234, c.port)
  687. self.assertEqual("/~/bar/baz", path)
  688. def test_ssh_host_relpath(self) -> None:
  689. self.assertRaises(
  690. ValueError, get_transport_and_path_from_url, "foo.com:bar/baz"
  691. )
  692. def test_ssh_user_host_relpath(self) -> None:
  693. self.assertRaises(
  694. ValueError, get_transport_and_path_from_url, "user@foo.com:bar/baz"
  695. )
  696. def test_local_path(self) -> None:
  697. self.assertRaises(ValueError, get_transport_and_path_from_url, "foo.bar/baz")
  698. def test_error(self) -> None:
  699. # Need to use a known urlparse.uses_netloc URL scheme to get the
  700. # expected parsing of the URL on Python versions less than 2.6.5
  701. self.assertRaises(
  702. ValueError, get_transport_and_path_from_url, "prospero://bar/baz"
  703. )
  704. def test_http(self) -> None:
  705. url = "https://github.com/jelmer/dulwich"
  706. c, path = get_transport_and_path_from_url(url)
  707. self.assertIsInstance(c, HttpGitClient)
  708. self.assertEqual("https://github.com", c.get_url(b"/"))
  709. self.assertEqual("/jelmer/dulwich", path)
  710. def test_http_port(self) -> None:
  711. url = "https://github.com:9090/jelmer/dulwich"
  712. c, path = get_transport_and_path_from_url(url)
  713. self.assertEqual("https://github.com:9090", c.get_url(b"/"))
  714. self.assertIsInstance(c, HttpGitClient)
  715. self.assertEqual("/jelmer/dulwich", path)
  716. @patch("os.name", "posix")
  717. @patch("sys.platform", "linux")
  718. def test_file(self) -> None:
  719. c, path = get_transport_and_path_from_url("file:///home/jelmer/foo")
  720. self.assertIsInstance(c, LocalGitClient)
  721. self.assertEqual("/home/jelmer/foo", path)
  722. def test_win32_url_to_path(self):
  723. def check(url, expected):
  724. parsed = urlparse(url)
  725. self.assertEqual(_win32_url_to_path(parsed), expected)
  726. check("file:C:/foo.bar/baz", "C:\\foo.bar\\baz")
  727. check("file:/C:/foo.bar/baz", "C:\\foo.bar\\baz")
  728. check("file://C:/foo.bar/baz", "C:\\foo.bar\\baz")
  729. check("file:///C:/foo.bar/baz", "C:\\foo.bar\\baz")
  730. @patch("os.name", "nt")
  731. @patch("sys.platform", "win32")
  732. def test_file_win(self) -> None:
  733. expected = "C:\\foo.bar\\baz"
  734. for file_url in [
  735. "file:C:/foo.bar/baz",
  736. "file:/C:/foo.bar/baz",
  737. "file://C:/foo.bar/baz",
  738. "file:///C:/foo.bar/baz",
  739. ]:
  740. c, path = get_transport_and_path(file_url)
  741. self.assertIsInstance(c, LocalGitClient)
  742. self.assertEqual(path, expected)
  743. for remote_url in [
  744. "file://host.example.com/C:/foo.bar/baz"
  745. "file://host.example.com/C:/foo.bar/baz"
  746. "file:////host.example/foo.bar/baz",
  747. ]:
  748. with self.assertRaises(NotImplementedError):
  749. c, path = get_transport_and_path(remote_url)
  750. class TestSSHVendor:
  751. def __init__(self) -> None:
  752. self.host = None
  753. self.command = ""
  754. self.username = None
  755. self.port = None
  756. self.password = None
  757. self.key_filename = None
  758. def run_command(
  759. self,
  760. host,
  761. command,
  762. username=None,
  763. port=None,
  764. password=None,
  765. key_filename=None,
  766. ssh_command=None,
  767. protocol_version=None,
  768. ):
  769. self.host = host
  770. self.command = command
  771. self.username = username
  772. self.port = port
  773. self.password = password
  774. self.key_filename = key_filename
  775. self.ssh_command = ssh_command
  776. self.protocol_version = protocol_version
  777. class Subprocess:
  778. pass
  779. Subprocess.read = lambda: None
  780. Subprocess.write = lambda: None
  781. Subprocess.close = lambda: None
  782. Subprocess.can_read = lambda: None
  783. return Subprocess()
  784. class SSHGitClientTests(TestCase):
  785. def setUp(self) -> None:
  786. super().setUp()
  787. self.server = TestSSHVendor()
  788. self.real_vendor = client.get_ssh_vendor
  789. client.get_ssh_vendor = lambda: self.server
  790. self.client = SSHGitClient("git.samba.org")
  791. def tearDown(self) -> None:
  792. super().tearDown()
  793. client.get_ssh_vendor = self.real_vendor
  794. def test_get_url(self) -> None:
  795. path = "/tmp/repo.git"
  796. c = SSHGitClient("git.samba.org")
  797. url = c.get_url(path)
  798. self.assertEqual("ssh://git.samba.org/tmp/repo.git", url)
  799. def test_get_url_with_username_and_port(self) -> None:
  800. path = "/tmp/repo.git"
  801. c = SSHGitClient("git.samba.org", port=2222, username="user")
  802. url = c.get_url(path)
  803. self.assertEqual("ssh://user@git.samba.org:2222/tmp/repo.git", url)
  804. def test_default_command(self) -> None:
  805. self.assertEqual(b"git-upload-pack", self.client._get_cmd_path(b"upload-pack"))
  806. def test_alternative_command_path(self) -> None:
  807. self.client.alternative_paths[b"upload-pack"] = b"/usr/lib/git/git-upload-pack"
  808. self.assertEqual(
  809. b"/usr/lib/git/git-upload-pack",
  810. self.client._get_cmd_path(b"upload-pack"),
  811. )
  812. def test_alternative_command_path_spaces(self) -> None:
  813. self.client.alternative_paths[b"upload-pack"] = (
  814. b"/usr/lib/git/git-upload-pack -ibla"
  815. )
  816. self.assertEqual(
  817. b"/usr/lib/git/git-upload-pack -ibla",
  818. self.client._get_cmd_path(b"upload-pack"),
  819. )
  820. def test_connect(self) -> None:
  821. server = self.server
  822. client = self.client
  823. client.username = b"username"
  824. client.port = 1337
  825. client._connect(b"command", b"/path/to/repo")
  826. self.assertEqual(b"username", server.username)
  827. self.assertEqual(1337, server.port)
  828. self.assertEqual(b"git-command '/path/to/repo'", server.command)
  829. client._connect(b"relative-command", b"/~/path/to/repo")
  830. self.assertEqual(b"git-relative-command '~/path/to/repo'", server.command)
  831. def test_ssh_command_precedence(self) -> None:
  832. self.overrideEnv("GIT_SSH", "/path/to/ssh")
  833. test_client = SSHGitClient("git.samba.org")
  834. self.assertEqual(test_client.ssh_command, "/path/to/ssh")
  835. self.overrideEnv("GIT_SSH_COMMAND", "/path/to/ssh -o Option=Value")
  836. test_client = SSHGitClient("git.samba.org")
  837. self.assertEqual(test_client.ssh_command, "/path/to/ssh -o Option=Value")
  838. test_client = SSHGitClient("git.samba.org", ssh_command="ssh -o Option1=Value1")
  839. self.assertEqual(test_client.ssh_command, "ssh -o Option1=Value1")
  840. def test_ssh_command_config(self) -> None:
  841. # Test core.sshCommand config setting
  842. from dulwich.config import ConfigDict
  843. # No config, no environment - should default to "ssh"
  844. self.overrideEnv("GIT_SSH", None)
  845. self.overrideEnv("GIT_SSH_COMMAND", None)
  846. test_client = SSHGitClient("git.samba.org")
  847. self.assertEqual(test_client.ssh_command, "ssh")
  848. # Config with core.sshCommand
  849. config = ConfigDict()
  850. config.set((b"core",), b"sshCommand", b"ssh -o StrictHostKeyChecking=no")
  851. test_client = SSHGitClient("git.samba.org", config=config)
  852. self.assertEqual(test_client.ssh_command, "ssh -o StrictHostKeyChecking=no")
  853. # ssh_command parameter takes precedence over config
  854. test_client = SSHGitClient(
  855. "git.samba.org", config=config, ssh_command="custom-ssh"
  856. )
  857. self.assertEqual(test_client.ssh_command, "custom-ssh")
  858. # Environment variables take precedence over config when no ssh_command parameter
  859. self.overrideEnv("GIT_SSH_COMMAND", "/usr/bin/ssh -v")
  860. test_client = SSHGitClient("git.samba.org", config=config)
  861. self.assertEqual(test_client.ssh_command, "/usr/bin/ssh -v")
  862. def test_ssh_kwargs_passed_to_vendor(self) -> None:
  863. # Test that ssh_command and other kwargs are actually passed to the SSH vendor
  864. server = self.server
  865. client = self.client
  866. # Set custom ssh_command
  867. client.ssh_command = "custom-ssh-wrapper.sh -o Option=Value"
  868. client.password = "test-password"
  869. client.key_filename = "/path/to/key"
  870. # Connect and verify all kwargs are passed through
  871. client._connect(b"upload-pack", b"/path/to/repo")
  872. self.assertEqual(server.ssh_command, "custom-ssh-wrapper.sh -o Option=Value")
  873. self.assertEqual(server.password, "test-password")
  874. self.assertEqual(server.key_filename, "/path/to/key")
  875. class ReportStatusParserTests(TestCase):
  876. def test_invalid_pack(self) -> None:
  877. parser = ReportStatusParser()
  878. parser.handle_packet(b"unpack error - foo bar")
  879. parser.handle_packet(b"ok refs/foo/bar")
  880. parser.handle_packet(None)
  881. self.assertRaises(SendPackError, list, parser.check())
  882. def test_update_refs_error(self) -> None:
  883. parser = ReportStatusParser()
  884. parser.handle_packet(b"unpack ok")
  885. parser.handle_packet(b"ng refs/foo/bar need to pull")
  886. parser.handle_packet(None)
  887. self.assertEqual([(b"refs/foo/bar", "need to pull")], list(parser.check()))
  888. def test_ok(self) -> None:
  889. parser = ReportStatusParser()
  890. parser.handle_packet(b"unpack ok")
  891. parser.handle_packet(b"ok refs/foo/bar")
  892. parser.handle_packet(None)
  893. self.assertEqual([(b"refs/foo/bar", None)], list(parser.check()))
  894. class LocalGitClientTests(TestCase):
  895. def test_get_url(self) -> None:
  896. path = "/tmp/repo.git"
  897. c = LocalGitClient()
  898. url = c.get_url(path)
  899. self.assertEqual("file:///tmp/repo.git", url)
  900. def test_fetch_into_empty(self) -> None:
  901. c = LocalGitClient()
  902. target = tempfile.mkdtemp()
  903. self.addCleanup(shutil.rmtree, target)
  904. t = Repo.init_bare(target)
  905. self.addCleanup(t.close)
  906. s = open_repo("a.git")
  907. self.addCleanup(tear_down_repo, s)
  908. self.assertEqual(s.get_refs(), c.fetch(s.path, t).refs)
  909. def test_clone(self) -> None:
  910. c = LocalGitClient()
  911. s = open_repo("a.git")
  912. self.addCleanup(tear_down_repo, s)
  913. target = tempfile.mkdtemp()
  914. self.addCleanup(shutil.rmtree, target)
  915. result_repo = c.clone(s.path, target, mkdir=False)
  916. self.addCleanup(result_repo.close)
  917. expected = dict(s.get_refs())
  918. expected[b"refs/remotes/origin/HEAD"] = expected[b"HEAD"]
  919. expected[b"refs/remotes/origin/master"] = expected[b"refs/heads/master"]
  920. self.assertEqual(expected, result_repo.get_refs())
  921. def test_fetch_empty(self) -> None:
  922. c = LocalGitClient()
  923. s = open_repo("a.git")
  924. self.addCleanup(tear_down_repo, s)
  925. out = BytesIO()
  926. walker = {}
  927. ret = c.fetch_pack(
  928. s.path,
  929. lambda heads, depth=None: [],
  930. graph_walker=walker,
  931. pack_data=out.write,
  932. )
  933. self.assertEqual(
  934. {
  935. b"HEAD": b"a90fa2d900a17e99b433217e988c4eb4a2e9a097",
  936. b"refs/heads/master": b"a90fa2d900a17e99b433217e988c4eb4a2e9a097",
  937. b"refs/tags/mytag": b"28237f4dc30d0d462658d6b937b08a0f0b6ef55a",
  938. b"refs/tags/mytag-packed": b"b0931cadc54336e78a1d980420e3268903b57a50",
  939. },
  940. ret.refs,
  941. )
  942. self.assertEqual({b"HEAD": b"refs/heads/master"}, ret.symrefs)
  943. self.assertEqual(
  944. b"PACK\x00\x00\x00\x02\x00\x00\x00\x00\x02\x9d\x08"
  945. b"\x82;\xd8\xa8\xea\xb5\x10\xadj\xc7\\\x82<\xfd>\xd3\x1e",
  946. out.getvalue(),
  947. )
  948. def test_fetch_pack_none(self) -> None:
  949. c = LocalGitClient()
  950. s = open_repo("a.git")
  951. self.addCleanup(tear_down_repo, s)
  952. out = BytesIO()
  953. walker = MemoryRepo().get_graph_walker()
  954. ret = c.fetch_pack(
  955. s.path,
  956. lambda heads, depth=None: [b"a90fa2d900a17e99b433217e988c4eb4a2e9a097"],
  957. graph_walker=walker,
  958. pack_data=out.write,
  959. )
  960. self.assertEqual({b"HEAD": b"refs/heads/master"}, ret.symrefs)
  961. self.assertEqual(
  962. {
  963. b"HEAD": b"a90fa2d900a17e99b433217e988c4eb4a2e9a097",
  964. b"refs/heads/master": b"a90fa2d900a17e99b433217e988c4eb4a2e9a097",
  965. b"refs/tags/mytag": b"28237f4dc30d0d462658d6b937b08a0f0b6ef55a",
  966. b"refs/tags/mytag-packed": b"b0931cadc54336e78a1d980420e3268903b57a50",
  967. },
  968. ret.refs,
  969. )
  970. # Hardcoding is not ideal, but we'll fix that some other day..
  971. self.assertTrue(
  972. out.getvalue().startswith(b"PACK\x00\x00\x00\x02\x00\x00\x00\x07")
  973. )
  974. def test_send_pack_without_changes(self) -> None:
  975. local = open_repo("a.git")
  976. self.addCleanup(tear_down_repo, local)
  977. target = open_repo("a.git")
  978. self.addCleanup(tear_down_repo, target)
  979. self.send_and_verify(b"master", local, target)
  980. def test_send_pack_with_changes(self) -> None:
  981. local = open_repo("a.git")
  982. self.addCleanup(tear_down_repo, local)
  983. target_path = tempfile.mkdtemp()
  984. self.addCleanup(shutil.rmtree, target_path)
  985. with Repo.init_bare(target_path) as target:
  986. self.send_and_verify(b"master", local, target)
  987. def test_get_refs(self) -> None:
  988. local = open_repo("refs.git")
  989. self.addCleanup(tear_down_repo, local)
  990. client = LocalGitClient()
  991. result = client.get_refs(local.path)
  992. self.assertDictEqual(local.refs.as_dict(), result.refs)
  993. # Check that symrefs are detected correctly
  994. self.assertIn(b"HEAD", result.symrefs)
  995. def send_and_verify(self, branch, local, target) -> None:
  996. """Send branch from local to remote repository and verify it worked."""
  997. client = LocalGitClient()
  998. ref_name = b"refs/heads/" + branch
  999. result = client.send_pack(
  1000. target.path,
  1001. lambda _: {ref_name: local.refs[ref_name]},
  1002. local.generate_pack_data,
  1003. )
  1004. self.assertEqual(local.refs[ref_name], result.refs[ref_name])
  1005. self.assertIs(None, result.agent)
  1006. self.assertEqual({}, result.ref_status)
  1007. obj_local = local.get_object(result.refs[ref_name])
  1008. obj_target = target.get_object(result.refs[ref_name])
  1009. self.assertEqual(obj_local, obj_target)
  1010. class BundleClientTests(TestCase):
  1011. def setUp(self) -> None:
  1012. super().setUp()
  1013. self.tempdir = tempfile.mkdtemp()
  1014. self.addCleanup(shutil.rmtree, self.tempdir)
  1015. def _create_test_bundle(self):
  1016. """Create a test bundle file and return its path."""
  1017. # Create a simple repository
  1018. repo = MemoryRepo()
  1019. # Create some objects
  1020. blob = Blob.from_string(b"Hello world")
  1021. repo.object_store.add_object(blob)
  1022. tree = Tree()
  1023. tree.add(b"hello.txt", 0o100644, blob.id)
  1024. repo.object_store.add_object(tree)
  1025. commit = Commit()
  1026. commit.tree = tree.id
  1027. commit.message = b"Initial commit"
  1028. commit.author = commit.committer = b"Test User <test@example.com>"
  1029. commit.commit_time = commit.author_time = 1234567890
  1030. commit.commit_timezone = commit.author_timezone = 0
  1031. repo.object_store.add_object(commit)
  1032. repo.refs[b"refs/heads/master"] = commit.id
  1033. # Create bundle
  1034. bundle = create_bundle_from_repo(repo)
  1035. # Write bundle to file
  1036. bundle_path = os.path.join(self.tempdir, "test.bundle")
  1037. with open(bundle_path, "wb") as f:
  1038. write_bundle(f, bundle)
  1039. return bundle_path, repo
  1040. def test_is_bundle_file(self) -> None:
  1041. """Test bundle file detection."""
  1042. bundle_path, _ = self._create_test_bundle()
  1043. # Test positive case
  1044. self.assertTrue(BundleClient._is_bundle_file(bundle_path))
  1045. # Test negative case - regular file
  1046. regular_file = os.path.join(self.tempdir, "regular.txt")
  1047. with open(regular_file, "w") as f:
  1048. f.write("not a bundle")
  1049. self.assertFalse(BundleClient._is_bundle_file(regular_file))
  1050. # Test negative case - non-existent file
  1051. self.assertFalse(BundleClient._is_bundle_file("/non/existent/file"))
  1052. def test_get_refs(self) -> None:
  1053. """Test getting refs from bundle."""
  1054. bundle_path, _ = self._create_test_bundle()
  1055. client = BundleClient()
  1056. result = client.get_refs(bundle_path)
  1057. self.assertIn(b"refs/heads/master", result.refs)
  1058. self.assertEqual(result.symrefs, {})
  1059. def test_fetch_pack(self) -> None:
  1060. """Test fetching pack from bundle."""
  1061. bundle_path, _source_repo = self._create_test_bundle()
  1062. client = BundleClient()
  1063. pack_data = BytesIO()
  1064. def determine_wants(refs):
  1065. return list(refs.values())
  1066. class MockGraphWalker:
  1067. def next(self):
  1068. return None
  1069. def ack(self, sha):
  1070. pass
  1071. result = client.fetch_pack(
  1072. bundle_path, determine_wants, MockGraphWalker(), pack_data.write
  1073. )
  1074. # Verify we got refs back
  1075. self.assertIn(b"refs/heads/master", result.refs)
  1076. # Verify pack data was written
  1077. self.assertGreater(len(pack_data.getvalue()), 0)
  1078. def test_fetch(self) -> None:
  1079. """Test fetching from bundle into target repo."""
  1080. bundle_path, _source_repo = self._create_test_bundle()
  1081. client = BundleClient()
  1082. target_repo = MemoryRepo()
  1083. result = client.fetch(bundle_path, target_repo)
  1084. # Verify refs were imported
  1085. self.assertIn(b"refs/heads/master", result.refs)
  1086. # Verify objects were imported
  1087. master_id = result.refs[b"refs/heads/master"]
  1088. self.assertIn(master_id, target_repo.object_store)
  1089. # Verify the commit object is correct
  1090. commit = target_repo.object_store[master_id]
  1091. self.assertEqual(commit.message, b"Initial commit")
  1092. def test_send_pack_not_supported(self) -> None:
  1093. """Test that send_pack raises NotImplementedError."""
  1094. bundle_path, _ = self._create_test_bundle()
  1095. client = BundleClient()
  1096. with self.assertRaises(NotImplementedError):
  1097. client.send_pack(bundle_path, None, None)
  1098. def test_get_transport_and_path_bundle(self) -> None:
  1099. """Test that get_transport_and_path detects bundle files."""
  1100. bundle_path, _ = self._create_test_bundle()
  1101. client, path = get_transport_and_path(bundle_path)
  1102. self.assertIsInstance(client, BundleClient)
  1103. self.assertEqual(path, bundle_path)
  1104. class HttpGitClientTests(TestCase):
  1105. def test_get_url(self) -> None:
  1106. base_url = "https://github.com/jelmer/dulwich"
  1107. path = "/jelmer/dulwich"
  1108. c = HttpGitClient(base_url)
  1109. url = c.get_url(path)
  1110. self.assertEqual("https://github.com/jelmer/dulwich", url)
  1111. def test_get_url_bytes_path(self) -> None:
  1112. base_url = "https://github.com/jelmer/dulwich"
  1113. path_bytes = b"/jelmer/dulwich"
  1114. c = HttpGitClient(base_url)
  1115. url = c.get_url(path_bytes)
  1116. self.assertEqual("https://github.com/jelmer/dulwich", url)
  1117. def test_get_url_with_username_and_passwd(self) -> None:
  1118. base_url = "https://github.com/jelmer/dulwich"
  1119. path = "/jelmer/dulwich"
  1120. c = HttpGitClient(base_url, username="USERNAME", password="PASSWD")
  1121. url = c.get_url(path)
  1122. self.assertEqual("https://github.com/jelmer/dulwich", url)
  1123. def test_init_username_passwd_set(self) -> None:
  1124. url = "https://github.com/jelmer/dulwich"
  1125. c = HttpGitClient(url, config=None, username="user", password="passwd")
  1126. self.assertEqual("user", c._username)
  1127. self.assertEqual("passwd", c._password)
  1128. basic_auth = c.pool_manager.headers["authorization"]
  1129. auth_string = "{}:{}".format("user", "passwd")
  1130. b64_credentials = base64.b64encode(auth_string.encode("latin1"))
  1131. expected_basic_auth = "Basic {}".format(b64_credentials.decode("latin1"))
  1132. self.assertEqual(basic_auth, expected_basic_auth)
  1133. def test_init_username_set_no_password(self) -> None:
  1134. url = "https://github.com/jelmer/dulwich"
  1135. c = HttpGitClient(url, config=None, username="user")
  1136. self.assertEqual("user", c._username)
  1137. self.assertIsNone(c._password)
  1138. basic_auth = c.pool_manager.headers["authorization"]
  1139. auth_string = b"user:"
  1140. b64_credentials = base64.b64encode(auth_string)
  1141. expected_basic_auth = f"Basic {b64_credentials.decode('ascii')}"
  1142. self.assertEqual(basic_auth, expected_basic_auth)
  1143. def test_init_no_username_passwd(self) -> None:
  1144. url = "https://github.com/jelmer/dulwich"
  1145. c = HttpGitClient(url, config=None)
  1146. self.assertIs(None, c._username)
  1147. self.assertIs(None, c._password)
  1148. self.assertNotIn("authorization", c.pool_manager.headers)
  1149. def test_from_parsedurl_username_only(self) -> None:
  1150. username = "user"
  1151. url = f"https://{username}@github.com/jelmer/dulwich"
  1152. c = HttpGitClient.from_parsedurl(urlparse(url))
  1153. self.assertEqual(c._username, username)
  1154. self.assertEqual(c._password, None)
  1155. basic_auth = c.pool_manager.headers["authorization"]
  1156. auth_string = username.encode("ascii") + b":"
  1157. b64_credentials = base64.b64encode(auth_string)
  1158. expected_basic_auth = f"Basic {b64_credentials.decode('ascii')}"
  1159. self.assertEqual(basic_auth, expected_basic_auth)
  1160. def test_from_parsedurl_on_url_with_quoted_credentials(self) -> None:
  1161. original_username = "john|the|first"
  1162. quoted_username = urlquote(original_username)
  1163. original_password = "Ya#1$2%3"
  1164. quoted_password = urlquote(original_password)
  1165. url = f"https://{quoted_username}:{quoted_password}@github.com/jelmer/dulwich"
  1166. c = HttpGitClient.from_parsedurl(urlparse(url))
  1167. self.assertEqual(original_username, c._username)
  1168. self.assertEqual(original_password, c._password)
  1169. basic_auth = c.pool_manager.headers["authorization"]
  1170. auth_string = f"{original_username}:{original_password}"
  1171. b64_credentials = base64.b64encode(auth_string.encode("latin1"))
  1172. expected_basic_auth = "Basic {}".format(b64_credentials.decode("latin1"))
  1173. self.assertEqual(basic_auth, expected_basic_auth)
  1174. def test_url_redirect_location(self) -> None:
  1175. from urllib3.response import HTTPResponse
  1176. test_data = {
  1177. "https://gitlab.com/inkscape/inkscape/": {
  1178. "location": "https://gitlab.com/inkscape/inkscape.git/",
  1179. "redirect_url": "https://gitlab.com/inkscape/inkscape.git/",
  1180. "refs_data": (
  1181. b"001e# service=git-upload-pack\n00000032"
  1182. b"fb2bebf4919a011f0fd7cec085443d0031228e76 "
  1183. b"HEAD\n0000"
  1184. ),
  1185. },
  1186. "https://github.com/jelmer/dulwich/": {
  1187. "location": "https://github.com/jelmer/dulwich/",
  1188. "redirect_url": "https://github.com/jelmer/dulwich/",
  1189. "refs_data": (
  1190. b"001e# service=git-upload-pack\n00000032"
  1191. b"3ff25e09724aa4d86ea5bca7d5dd0399a3c8bfcf "
  1192. b"HEAD\n0000"
  1193. ),
  1194. },
  1195. # check for absolute-path URI reference as location
  1196. "https://codeberg.org/ashwinvis/radicale-sh.git/": {
  1197. "location": "/ashwinvis/radicale-auth-sh/",
  1198. "redirect_url": "https://codeberg.org/ashwinvis/radicale-auth-sh/",
  1199. "refs_data": (
  1200. b"001e# service=git-upload-pack\n00000032"
  1201. b"470f8603768b608fc988675de2fae8f963c21158 "
  1202. b"HEAD\n0000"
  1203. ),
  1204. },
  1205. }
  1206. tail = "info/refs?service=git-upload-pack"
  1207. # we need to mock urllib3.PoolManager as this test will fail
  1208. # otherwise without an active internet connection
  1209. class PoolManagerMock:
  1210. def __init__(self) -> None:
  1211. self.headers: dict[str, str] = {}
  1212. def request(
  1213. self,
  1214. method,
  1215. url,
  1216. fields=None,
  1217. headers=None,
  1218. redirect=True,
  1219. preload_content=True,
  1220. ):
  1221. base_url = url[: -len(tail)]
  1222. redirect_base_url = test_data[base_url]["location"]
  1223. redirect_url = redirect_base_url + tail
  1224. headers = {
  1225. "Content-Type": "application/x-git-upload-pack-advertisement"
  1226. }
  1227. body = test_data[base_url]["refs_data"]
  1228. # urllib3 handles automatic redirection by default
  1229. status = 200
  1230. request_url = redirect_url
  1231. # simulate urllib3 behavior when redirect parameter is False
  1232. if redirect is False:
  1233. request_url = url
  1234. if redirect_base_url != base_url:
  1235. body = b""
  1236. headers["location"] = test_data[base_url]["location"]
  1237. status = 301
  1238. return HTTPResponse(
  1239. body=BytesIO(body),
  1240. headers=headers,
  1241. request_method=method,
  1242. request_url=request_url,
  1243. preload_content=preload_content,
  1244. status=status,
  1245. )
  1246. pool_manager = PoolManagerMock()
  1247. for base_url in test_data.keys():
  1248. # instantiate HttpGitClient with mocked pool manager
  1249. c = HttpGitClient(base_url, pool_manager=pool_manager, config=None)
  1250. # call method that detects url redirection
  1251. _, _, processed_url, _, _ = c._discover_references(
  1252. b"git-upload-pack", base_url
  1253. )
  1254. # send the same request as the method above without redirection
  1255. resp = c.pool_manager.request("GET", base_url + tail, redirect=False)
  1256. # check expected behavior of urllib3
  1257. redirect_location = resp.get_redirect_location()
  1258. if resp.status == 200:
  1259. self.assertFalse(redirect_location)
  1260. if redirect_location:
  1261. # check that url redirection has been correctly detected
  1262. self.assertEqual(processed_url, test_data[base_url]["redirect_url"])
  1263. else:
  1264. # check also the no redirection case
  1265. self.assertEqual(processed_url, base_url)
  1266. def test_smart_request_content_type_with_directive_check(self) -> None:
  1267. from urllib3.response import HTTPResponse
  1268. # we need to mock urllib3.PoolManager as this test will fail
  1269. # otherwise without an active internet connection
  1270. class PoolManagerMock:
  1271. def __init__(self) -> None:
  1272. self.headers: dict[str, str] = {}
  1273. def request(
  1274. self,
  1275. method,
  1276. url,
  1277. fields=None,
  1278. headers=None,
  1279. redirect=True,
  1280. preload_content=True,
  1281. ):
  1282. return HTTPResponse(
  1283. headers={
  1284. "Content-Type": "application/x-git-upload-pack-result; charset=utf-8"
  1285. },
  1286. request_method=method,
  1287. request_url=url,
  1288. preload_content=preload_content,
  1289. status=200,
  1290. )
  1291. clone_url = "https://hacktivis.me/git/blog.git/"
  1292. client = HttpGitClient(clone_url, pool_manager=PoolManagerMock(), config=None)
  1293. self.assertTrue(client._smart_request("git-upload-pack", clone_url, data=None))
  1294. def test_urllib3_protocol_error(self) -> None:
  1295. from urllib3.exceptions import ProtocolError
  1296. from urllib3.response import HTTPResponse
  1297. error_msg = "protocol error"
  1298. # we need to mock urllib3.PoolManager as this test will fail
  1299. # otherwise without an active internet connection
  1300. class PoolManagerMock:
  1301. def __init__(self) -> None:
  1302. self.headers: dict[str, str] = {}
  1303. def request(
  1304. self,
  1305. method,
  1306. url,
  1307. fields=None,
  1308. headers=None,
  1309. redirect=True,
  1310. preload_content=True,
  1311. ):
  1312. response = HTTPResponse(
  1313. headers={"Content-Type": "application/x-git-upload-pack-result"},
  1314. request_method=method,
  1315. request_url=url,
  1316. preload_content=preload_content,
  1317. status=200,
  1318. )
  1319. def read(self) -> NoReturn:
  1320. raise ProtocolError(error_msg)
  1321. # override HTTPResponse.read to throw urllib3.exceptions.ProtocolError
  1322. response.read = read
  1323. return response
  1324. def check_heads(heads, **kwargs):
  1325. self.assertEqual(heads, {})
  1326. return []
  1327. clone_url = "https://git.example.org/user/project.git/"
  1328. client = HttpGitClient(clone_url, pool_manager=PoolManagerMock(), config=None)
  1329. with self.assertRaises(GitProtocolError, msg=error_msg):
  1330. client.fetch_pack(b"/", check_heads, None, None)
  1331. def test_fetch_pack_dumb_http(self) -> None:
  1332. import zlib
  1333. from urllib3.response import HTTPResponse
  1334. # Mock responses for dumb HTTP
  1335. info_refs_content = (
  1336. b"0123456789abcdef0123456789abcdef01234567\trefs/heads/master\n"
  1337. )
  1338. head_content = b"ref: refs/heads/master"
  1339. # Create a blob object for testing
  1340. blob_content = b"Hello, dumb HTTP!"
  1341. blob_sha = b"0123456789abcdef0123456789abcdef01234567"
  1342. blob_hex = blob_sha.decode("ascii")
  1343. blob_obj_data = (
  1344. b"blob " + str(len(blob_content)).encode() + b"\x00" + blob_content
  1345. )
  1346. blob_compressed = zlib.compress(blob_obj_data)
  1347. responses = {
  1348. "/HEAD": {
  1349. "status": 200,
  1350. "content": head_content,
  1351. "content_type": "text/plain",
  1352. },
  1353. "/git-upload-pack": {
  1354. "status": 404,
  1355. "content": b"Not Found",
  1356. "content_type": "text/plain",
  1357. },
  1358. "/info/refs": {
  1359. "status": 200,
  1360. "content": info_refs_content,
  1361. "content_type": "text/plain",
  1362. },
  1363. f"/objects/{blob_hex[:2]}/{blob_hex[2:]}": {
  1364. "status": 200,
  1365. "content": blob_compressed,
  1366. "content_type": "application/octet-stream",
  1367. },
  1368. }
  1369. class PoolManagerMock:
  1370. def __init__(self) -> None:
  1371. self.headers: dict[str, str] = {}
  1372. def request(
  1373. self,
  1374. method,
  1375. url,
  1376. fields=None,
  1377. headers=None,
  1378. redirect=True,
  1379. preload_content=True,
  1380. ):
  1381. # Extract path from URL
  1382. from urllib.parse import urlparse
  1383. parsed = urlparse(url)
  1384. path = parsed.path.rstrip("/")
  1385. # Find matching response
  1386. for pattern, resp_data in responses.items():
  1387. if path.endswith(pattern):
  1388. return HTTPResponse(
  1389. body=BytesIO(resp_data["content"]),
  1390. headers={
  1391. "Content-Type": resp_data.get(
  1392. "content_type", "text/plain"
  1393. )
  1394. },
  1395. request_method=method,
  1396. request_url=url,
  1397. preload_content=preload_content,
  1398. status=resp_data["status"],
  1399. )
  1400. # Default 404
  1401. return HTTPResponse(
  1402. body=BytesIO(b"Not Found"),
  1403. headers={"Content-Type": "text/plain"},
  1404. request_method=method,
  1405. request_url=url,
  1406. preload_content=preload_content,
  1407. status=404,
  1408. )
  1409. def determine_wants(heads, **kwargs):
  1410. # heads contains the refs with SHA values, just return the SHA we want
  1411. return [heads[b"refs/heads/master"]]
  1412. received_data = []
  1413. def pack_data_handler(data):
  1414. # Collect pack data
  1415. received_data.append(data)
  1416. clone_url = "https://git.example.org/repo.git/"
  1417. client = HttpGitClient(clone_url, pool_manager=PoolManagerMock(), config=None)
  1418. # Mock graph walker that says we don't have anything
  1419. class MockGraphWalker:
  1420. def ack(self, sha):
  1421. return []
  1422. graph_walker = MockGraphWalker()
  1423. result = client.fetch_pack(
  1424. b"/", determine_wants, graph_walker, pack_data_handler
  1425. )
  1426. # Verify we got the refs
  1427. expected_sha = blob_hex.encode("ascii")
  1428. self.assertEqual({b"refs/heads/master": expected_sha}, result.refs)
  1429. # Verify we received pack data
  1430. self.assertTrue(len(received_data) > 0)
  1431. pack_data = b"".join(received_data)
  1432. self.assertTrue(len(pack_data) > 0)
  1433. # The pack should be valid pack format
  1434. self.assertTrue(pack_data.startswith(b"PACK"))
  1435. # Pack header: PACK + version (4 bytes) + num objects (4 bytes)
  1436. self.assertEqual(pack_data[4:8], b"\x00\x00\x00\x02") # version 2
  1437. self.assertEqual(pack_data[8:12], b"\x00\x00\x00\x01") # 1 object
  1438. def test_timeout_configuration(self) -> None:
  1439. """Test that timeout parameter is properly configured."""
  1440. url = "https://github.com/jelmer/dulwich"
  1441. timeout = 30
  1442. c = HttpGitClient(url, timeout=timeout)
  1443. self.assertEqual(c._timeout, timeout)
  1444. def test_timeout_from_config(self) -> None:
  1445. """Test that timeout can be configured via git config."""
  1446. from dulwich.config import ConfigDict
  1447. url = "https://github.com/jelmer/dulwich"
  1448. config = ConfigDict()
  1449. config.set((b"http",), b"timeout", b"25")
  1450. c = HttpGitClient(url, config=config)
  1451. # The timeout should be set on the pool manager
  1452. # Since we can't easily access the timeout from the pool manager,
  1453. # we just verify the client was created successfully
  1454. self.assertIsNotNone(c.pool_manager)
  1455. def test_timeout_parameter_precedence(self) -> None:
  1456. """Test that explicit timeout parameter takes precedence over config."""
  1457. from dulwich.config import ConfigDict
  1458. url = "https://github.com/jelmer/dulwich"
  1459. config = ConfigDict()
  1460. config.set((b"http",), b"timeout", b"25")
  1461. c = HttpGitClient(url, config=config, timeout=15)
  1462. self.assertEqual(c._timeout, 15)
  1463. def test_http_extra_headers_from_config(self) -> None:
  1464. """Test that http.extraHeader config values are applied."""
  1465. from dulwich.config import ConfigDict
  1466. url = "https://github.com/jelmer/dulwich"
  1467. config = ConfigDict()
  1468. # Set a single extra header
  1469. config.set((b"http",), b"extraHeader", b"X-Custom-Header: test-value")
  1470. c = HttpGitClient(url, config=config)
  1471. # Check that the header was added to the pool manager
  1472. self.assertIn("X-Custom-Header", c.pool_manager.headers)
  1473. self.assertEqual(c.pool_manager.headers["X-Custom-Header"], "test-value")
  1474. def test_http_multiple_extra_headers_from_config(self) -> None:
  1475. """Test that multiple http.extraHeader config values are applied."""
  1476. from dulwich.config import ConfigDict
  1477. url = "https://github.com/jelmer/dulwich"
  1478. config = ConfigDict()
  1479. # Set multiple extra headers
  1480. config.set((b"http",), b"extraHeader", b"X-Header-1: value1")
  1481. config.add((b"http",), b"extraHeader", b"X-Header-2: value2")
  1482. config.add((b"http",), b"extraHeader", b"Authorization: Bearer token123")
  1483. c = HttpGitClient(url, config=config)
  1484. # Check that all headers were added to the pool manager
  1485. self.assertIn("X-Header-1", c.pool_manager.headers)
  1486. self.assertEqual(c.pool_manager.headers["X-Header-1"], "value1")
  1487. self.assertIn("X-Header-2", c.pool_manager.headers)
  1488. self.assertEqual(c.pool_manager.headers["X-Header-2"], "value2")
  1489. self.assertIn("Authorization", c.pool_manager.headers)
  1490. self.assertEqual(c.pool_manager.headers["Authorization"], "Bearer token123")
  1491. def test_get_url_preserves_credentials_from_url(self) -> None:
  1492. """Test that credentials from URL are preserved in get_url() (issue #1925)."""
  1493. # When credentials come from the URL (not passed explicitly),
  1494. # they should be included in get_url() so they're saved to git config
  1495. username = "ghp_token123"
  1496. url = f"https://{username}@github.com/jelmer/dulwich"
  1497. path = "/jelmer/dulwich"
  1498. c = HttpGitClient.from_parsedurl(urlparse(url))
  1499. reconstructed_url = c.get_url(path)
  1500. # Credentials should be preserved in the URL
  1501. self.assertIn(username, reconstructed_url)
  1502. self.assertEqual(
  1503. f"https://{username}@github.com/jelmer/dulwich", reconstructed_url
  1504. )
  1505. def test_get_url_preserves_credentials_with_password_from_url(self) -> None:
  1506. """Test that username:password from URL are preserved in get_url()."""
  1507. username = "user"
  1508. password = "pass"
  1509. url = f"https://{username}:{password}@github.com/jelmer/dulwich"
  1510. path = "/jelmer/dulwich"
  1511. c = HttpGitClient.from_parsedurl(urlparse(url))
  1512. reconstructed_url = c.get_url(path)
  1513. # Both username and password should be preserved
  1514. self.assertIn(username, reconstructed_url)
  1515. self.assertIn(password, reconstructed_url)
  1516. self.assertEqual(
  1517. f"https://{username}:{password}@github.com/jelmer/dulwich",
  1518. reconstructed_url,
  1519. )
  1520. def test_get_url_preserves_special_chars_in_credentials(self) -> None:
  1521. """Test that special characters in credentials are properly escaped."""
  1522. # URL-encoded credentials with special characters
  1523. original_username = "user@domain"
  1524. original_password = "p@ss:word"
  1525. quoted_username = urlquote(original_username, safe="")
  1526. quoted_password = urlquote(original_password, safe="")
  1527. url = f"https://{quoted_username}:{quoted_password}@github.com/jelmer/dulwich"
  1528. path = "/jelmer/dulwich"
  1529. c = HttpGitClient.from_parsedurl(urlparse(url))
  1530. reconstructed_url = c.get_url(path)
  1531. # The reconstructed URL should have properly escaped credentials
  1532. self.assertIn(quoted_username, reconstructed_url)
  1533. self.assertIn(quoted_password, reconstructed_url)
  1534. # Verify the URL is valid by parsing it back
  1535. parsed = urlparse(reconstructed_url)
  1536. from urllib.parse import unquote
  1537. self.assertEqual(unquote(parsed.username), original_username)
  1538. self.assertEqual(unquote(parsed.password), original_password)
  1539. def test_get_url_explicit_credentials_not_in_url(self) -> None:
  1540. """Test that explicitly passed credentials are NOT included in get_url()."""
  1541. # When credentials are passed explicitly (not from URL),
  1542. # they should NOT appear in get_url() for security
  1543. base_url = "https://github.com/jelmer/dulwich"
  1544. path = "/jelmer/dulwich"
  1545. username = "explicit_user"
  1546. password = "explicit_pass"
  1547. c = HttpGitClient(base_url, username=username, password=password)
  1548. url = c.get_url(path)
  1549. # Credentials should NOT be in the URL
  1550. self.assertNotIn(username, url)
  1551. self.assertNotIn(password, url)
  1552. self.assertEqual("https://github.com/jelmer/dulwich", url)
  1553. def test_pool_manager_parameter(self) -> None:
  1554. """Test that pool_manager parameter is properly passed through."""
  1555. import urllib3
  1556. # Create a custom pool manager
  1557. custom_pool_manager = urllib3.PoolManager()
  1558. # Test with get_transport_and_path_from_url
  1559. url = "https://github.com/jelmer/dulwich"
  1560. client, _path = get_transport_and_path_from_url(
  1561. url, pool_manager=custom_pool_manager
  1562. )
  1563. # Verify the client is an HTTP client and has our custom pool manager
  1564. self.assertIsInstance(client, HttpGitClient)
  1565. self.assertIs(client.pool_manager, custom_pool_manager)
  1566. # Test with get_transport_and_path
  1567. client2, _path2 = get_transport_and_path(url, pool_manager=custom_pool_manager)
  1568. # Verify the client is an HTTP client and has our custom pool manager
  1569. self.assertIsInstance(client2, HttpGitClient)
  1570. self.assertIs(client2.pool_manager, custom_pool_manager)
  1571. class TCPGitClientTests(TestCase):
  1572. def test_get_url(self) -> None:
  1573. host = "github.com"
  1574. path = "/jelmer/dulwich"
  1575. c = TCPGitClient(host)
  1576. url = c.get_url(path)
  1577. self.assertEqual("git://github.com/jelmer/dulwich", url)
  1578. def test_get_url_with_port(self) -> None:
  1579. host = "github.com"
  1580. path = "/jelmer/dulwich"
  1581. port = 9090
  1582. c = TCPGitClient(host, port=port)
  1583. url = c.get_url(path)
  1584. self.assertEqual("git://github.com:9090/jelmer/dulwich", url)
  1585. def test_get_url_with_ipv6(self) -> None:
  1586. host = "::1"
  1587. path = "/jelmer/dulwich"
  1588. c = TCPGitClient(host)
  1589. url = c.get_url(path)
  1590. self.assertEqual("git://[::1]/jelmer/dulwich", url)
  1591. def test_get_url_with_ipv6_and_port(self) -> None:
  1592. host = "2001:db8::1"
  1593. path = "/jelmer/dulwich"
  1594. port = 9090
  1595. c = TCPGitClient(host, port=port)
  1596. url = c.get_url(path)
  1597. self.assertEqual("git://[2001:db8::1]:9090/jelmer/dulwich", url)
  1598. def test_get_url_with_ipv6_default_port(self) -> None:
  1599. host = "2001:db8::1"
  1600. path = "/jelmer/dulwich"
  1601. port = TCP_GIT_PORT # Default port should not be included in URL
  1602. c = TCPGitClient(host, port=port)
  1603. url = c.get_url(path)
  1604. self.assertEqual("git://[2001:db8::1]/jelmer/dulwich", url)
  1605. class DefaultUrllib3ManagerTest(TestCase):
  1606. def test_no_config(self) -> None:
  1607. manager = default_urllib3_manager(config=None)
  1608. self.assertEqual(manager.connection_pool_kw["cert_reqs"], "CERT_REQUIRED")
  1609. def test_config_no_proxy(self) -> None:
  1610. import urllib3
  1611. manager = default_urllib3_manager(config=ConfigDict())
  1612. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  1613. self.assertIsInstance(manager, urllib3.PoolManager)
  1614. def test_config_no_proxy_custom_cls(self) -> None:
  1615. import urllib3
  1616. class CustomPoolManager(urllib3.PoolManager):
  1617. pass
  1618. manager = default_urllib3_manager(
  1619. config=ConfigDict(), pool_manager_cls=CustomPoolManager
  1620. )
  1621. self.assertIsInstance(manager, CustomPoolManager)
  1622. def test_config_ssl(self) -> None:
  1623. config = ConfigDict()
  1624. config.set(b"http", b"sslVerify", b"true")
  1625. manager = default_urllib3_manager(config=config)
  1626. self.assertEqual(manager.connection_pool_kw["cert_reqs"], "CERT_REQUIRED")
  1627. def test_config_no_ssl(self) -> None:
  1628. config = ConfigDict()
  1629. config.set(b"http", b"sslVerify", b"false")
  1630. manager = default_urllib3_manager(config=config)
  1631. self.assertEqual(manager.connection_pool_kw["cert_reqs"], "CERT_NONE")
  1632. def test_config_proxy(self) -> None:
  1633. import urllib3
  1634. config = ConfigDict()
  1635. config.set(b"http", b"proxy", b"http://localhost:3128/")
  1636. manager = default_urllib3_manager(config=config)
  1637. self.assertIsInstance(manager, urllib3.ProxyManager)
  1638. self.assertTrue(hasattr(manager, "proxy"))
  1639. self.assertEqual(manager.proxy.scheme, "http")
  1640. self.assertEqual(manager.proxy.host, "localhost")
  1641. self.assertEqual(manager.proxy.port, 3128)
  1642. def test_environment_proxy(self) -> None:
  1643. import urllib3
  1644. config = ConfigDict()
  1645. self.overrideEnv("http_proxy", "http://myproxy:8080")
  1646. manager = default_urllib3_manager(config=config)
  1647. self.assertIsInstance(manager, urllib3.ProxyManager)
  1648. self.assertTrue(hasattr(manager, "proxy"))
  1649. self.assertEqual(manager.proxy.scheme, "http")
  1650. self.assertEqual(manager.proxy.host, "myproxy")
  1651. self.assertEqual(manager.proxy.port, 8080)
  1652. def test_environment_empty_proxy(self) -> None:
  1653. import urllib3
  1654. config = ConfigDict()
  1655. self.overrideEnv("http_proxy", "")
  1656. manager = default_urllib3_manager(config=config)
  1657. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  1658. self.assertIsInstance(manager, urllib3.PoolManager)
  1659. def test_environment_no_proxy_1(self) -> None:
  1660. import urllib3
  1661. config = ConfigDict()
  1662. self.overrideEnv("http_proxy", "http://myproxy:8080")
  1663. self.overrideEnv("no_proxy", "xyz,abc.def.gh,abc.gh")
  1664. base_url = "http://xyz.abc.def.gh:8080/path/port"
  1665. manager = default_urllib3_manager(config=config, base_url=base_url)
  1666. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  1667. self.assertIsInstance(manager, urllib3.PoolManager)
  1668. def test_environment_no_proxy_2(self) -> None:
  1669. import urllib3
  1670. config = ConfigDict()
  1671. self.overrideEnv("http_proxy", "http://myproxy:8080")
  1672. self.overrideEnv("no_proxy", "xyz,abc.def.gh,abc.gh,ample.com")
  1673. base_url = "http://ample.com/path/port"
  1674. manager = default_urllib3_manager(config=config, base_url=base_url)
  1675. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  1676. self.assertIsInstance(manager, urllib3.PoolManager)
  1677. def test_environment_no_proxy_3(self) -> None:
  1678. import urllib3
  1679. config = ConfigDict()
  1680. self.overrideEnv("http_proxy", "http://myproxy:8080")
  1681. self.overrideEnv("no_proxy", "xyz,abc.def.gh,abc.gh,ample.com")
  1682. base_url = "http://ample.com:80/path/port"
  1683. manager = default_urllib3_manager(config=config, base_url=base_url)
  1684. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  1685. self.assertIsInstance(manager, urllib3.PoolManager)
  1686. def test_environment_no_proxy_4(self) -> None:
  1687. import urllib3
  1688. config = ConfigDict()
  1689. self.overrideEnv("http_proxy", "http://myproxy:8080")
  1690. self.overrideEnv("no_proxy", "xyz,abc.def.gh,abc.gh,ample.com")
  1691. base_url = "http://www.ample.com/path/port"
  1692. manager = default_urllib3_manager(config=config, base_url=base_url)
  1693. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  1694. self.assertIsInstance(manager, urllib3.PoolManager)
  1695. def test_environment_no_proxy_5(self) -> None:
  1696. import urllib3
  1697. config = ConfigDict()
  1698. self.overrideEnv("http_proxy", "http://myproxy:8080")
  1699. self.overrideEnv("no_proxy", "xyz,abc.def.gh,abc.gh,ample.com")
  1700. base_url = "http://www.example.com/path/port"
  1701. manager = default_urllib3_manager(config=config, base_url=base_url)
  1702. self.assertIsInstance(manager, urllib3.ProxyManager)
  1703. self.assertTrue(hasattr(manager, "proxy"))
  1704. self.assertEqual(manager.proxy.scheme, "http")
  1705. self.assertEqual(manager.proxy.host, "myproxy")
  1706. self.assertEqual(manager.proxy.port, 8080)
  1707. def test_environment_no_proxy_6(self) -> None:
  1708. import urllib3
  1709. config = ConfigDict()
  1710. self.overrideEnv("http_proxy", "http://myproxy:8080")
  1711. self.overrideEnv("no_proxy", "xyz,abc.def.gh,abc.gh,ample.com")
  1712. base_url = "http://ample.com.org/path/port"
  1713. manager = default_urllib3_manager(config=config, base_url=base_url)
  1714. self.assertIsInstance(manager, urllib3.ProxyManager)
  1715. self.assertTrue(hasattr(manager, "proxy"))
  1716. self.assertEqual(manager.proxy.scheme, "http")
  1717. self.assertEqual(manager.proxy.host, "myproxy")
  1718. self.assertEqual(manager.proxy.port, 8080)
  1719. def test_environment_no_proxy_ipv4_address_1(self) -> None:
  1720. import urllib3
  1721. config = ConfigDict()
  1722. self.overrideEnv("http_proxy", "http://myproxy:8080")
  1723. self.overrideEnv("no_proxy", "xyz,abc.def.gh,192.168.0.10,ample.com")
  1724. base_url = "http://192.168.0.10/path/port"
  1725. manager = default_urllib3_manager(config=config, base_url=base_url)
  1726. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  1727. self.assertIsInstance(manager, urllib3.PoolManager)
  1728. def test_environment_no_proxy_ipv4_address_2(self) -> None:
  1729. import urllib3
  1730. config = ConfigDict()
  1731. self.overrideEnv("http_proxy", "http://myproxy:8080")
  1732. self.overrideEnv("no_proxy", "xyz,abc.def.gh,192.168.0.10,ample.com")
  1733. base_url = "http://192.168.0.10:8888/path/port"
  1734. manager = default_urllib3_manager(config=config, base_url=base_url)
  1735. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  1736. self.assertIsInstance(manager, urllib3.PoolManager)
  1737. def test_environment_no_proxy_ipv4_address_3(self) -> None:
  1738. import urllib3
  1739. config = ConfigDict()
  1740. self.overrideEnv("http_proxy", "http://myproxy:8080")
  1741. self.overrideEnv(
  1742. "no_proxy", "xyz,abc.def.gh,ff80:1::/64,192.168.0.0/24,ample.com"
  1743. )
  1744. base_url = "http://192.168.0.10/path/port"
  1745. manager = default_urllib3_manager(config=config, base_url=base_url)
  1746. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  1747. self.assertIsInstance(manager, urllib3.PoolManager)
  1748. def test_environment_no_proxy_ipv6_address_1(self) -> None:
  1749. import urllib3
  1750. config = ConfigDict()
  1751. self.overrideEnv("http_proxy", "http://myproxy:8080")
  1752. self.overrideEnv("no_proxy", "xyz,abc.def.gh,ff80:1::affe,ample.com")
  1753. base_url = "http://[ff80:1::affe]/path/port"
  1754. manager = default_urllib3_manager(config=config, base_url=base_url)
  1755. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  1756. self.assertIsInstance(manager, urllib3.PoolManager)
  1757. def test_environment_no_proxy_ipv6_address_2(self) -> None:
  1758. import urllib3
  1759. config = ConfigDict()
  1760. self.overrideEnv("http_proxy", "http://myproxy:8080")
  1761. self.overrideEnv("no_proxy", "xyz,abc.def.gh,ff80:1::affe,ample.com")
  1762. base_url = "http://[ff80:1::affe]:1234/path/port"
  1763. manager = default_urllib3_manager(config=config, base_url=base_url)
  1764. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  1765. self.assertIsInstance(manager, urllib3.PoolManager)
  1766. def test_environment_no_proxy_ipv6_address_3(self) -> None:
  1767. import urllib3
  1768. config = ConfigDict()
  1769. self.overrideEnv("http_proxy", "http://myproxy:8080")
  1770. self.overrideEnv(
  1771. "no_proxy", "xyz,abc.def.gh,192.168.0.0/24,ff80:1::/64,ample.com"
  1772. )
  1773. base_url = "http://[ff80:1::affe]/path/port"
  1774. manager = default_urllib3_manager(config=config, base_url=base_url)
  1775. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  1776. self.assertIsInstance(manager, urllib3.PoolManager)
  1777. def test_config_proxy_custom_cls(self) -> None:
  1778. import urllib3
  1779. class CustomProxyManager(urllib3.ProxyManager):
  1780. pass
  1781. config = ConfigDict()
  1782. config.set(b"http", b"proxy", b"http://localhost:3128/")
  1783. manager = default_urllib3_manager(
  1784. config=config, proxy_manager_cls=CustomProxyManager
  1785. )
  1786. self.assertIsInstance(manager, CustomProxyManager)
  1787. def test_config_proxy_creds(self) -> None:
  1788. import urllib3
  1789. config = ConfigDict()
  1790. config.set(b"http", b"proxy", b"http://jelmer:example@localhost:3128/")
  1791. manager = default_urllib3_manager(config=config)
  1792. assert isinstance(manager, urllib3.ProxyManager)
  1793. self.assertEqual(
  1794. manager.proxy_headers, {"proxy-authorization": "Basic amVsbWVyOmV4YW1wbGU="}
  1795. )
  1796. def test_config_no_verify_ssl(self) -> None:
  1797. manager = default_urllib3_manager(config=None, cert_reqs="CERT_NONE")
  1798. self.assertEqual(manager.connection_pool_kw["cert_reqs"], "CERT_NONE")
  1799. def test_timeout_parameter(self) -> None:
  1800. """Test that timeout parameter is passed to urllib3 manager."""
  1801. timeout = 30
  1802. manager = default_urllib3_manager(config=None, timeout=timeout)
  1803. self.assertEqual(manager.connection_pool_kw["timeout"], timeout)
  1804. def test_timeout_from_config(self) -> None:
  1805. """Test that timeout can be configured via git config."""
  1806. from dulwich.config import ConfigDict
  1807. config = ConfigDict()
  1808. config.set((b"http",), b"timeout", b"25")
  1809. manager = default_urllib3_manager(config=config)
  1810. self.assertEqual(manager.connection_pool_kw["timeout"], 25)
  1811. def test_timeout_parameter_precedence(self) -> None:
  1812. """Test that explicit timeout parameter takes precedence over config."""
  1813. from dulwich.config import ConfigDict
  1814. config = ConfigDict()
  1815. config.set((b"http",), b"timeout", b"25")
  1816. manager = default_urllib3_manager(config=config, timeout=15)
  1817. self.assertEqual(manager.connection_pool_kw["timeout"], 15)
  1818. class SubprocessSSHVendorTests(TestCase):
  1819. def setUp(self) -> None:
  1820. # Monkey Patch client subprocess popen
  1821. self._orig_popen = dulwich.client.subprocess.Popen
  1822. dulwich.client.subprocess.Popen = DummyPopen
  1823. def tearDown(self) -> None:
  1824. dulwich.client.subprocess.Popen = self._orig_popen
  1825. def test_run_command_dashes(self) -> None:
  1826. vendor = SubprocessSSHVendor()
  1827. self.assertRaises(
  1828. StrangeHostname,
  1829. vendor.run_command,
  1830. "--weird-host",
  1831. "git-clone-url",
  1832. )
  1833. def test_run_command_password(self) -> None:
  1834. vendor = SubprocessSSHVendor()
  1835. self.assertRaises(
  1836. NotImplementedError,
  1837. vendor.run_command,
  1838. "host",
  1839. "git-clone-url",
  1840. password="12345",
  1841. )
  1842. def test_run_command_password_and_privkey(self) -> None:
  1843. vendor = SubprocessSSHVendor()
  1844. self.assertRaises(
  1845. NotImplementedError,
  1846. vendor.run_command,
  1847. "host",
  1848. "git-clone-url",
  1849. password="12345",
  1850. key_filename="/tmp/id_rsa",
  1851. )
  1852. def test_run_command_with_port_username_and_privkey(self) -> None:
  1853. expected = [
  1854. "ssh",
  1855. "-x",
  1856. "-p",
  1857. "2200",
  1858. "-i",
  1859. "/tmp/id_rsa",
  1860. ]
  1861. if DEFAULT_GIT_PROTOCOL_VERSION_FETCH:
  1862. expected += [
  1863. "-o",
  1864. f"SetEnv GIT_PROTOCOL=version={DEFAULT_GIT_PROTOCOL_VERSION_FETCH}",
  1865. ]
  1866. expected += [
  1867. "user@host",
  1868. "git-clone-url",
  1869. ]
  1870. vendor = SubprocessSSHVendor()
  1871. command = vendor.run_command(
  1872. "host",
  1873. "git-clone-url",
  1874. username="user",
  1875. port="2200",
  1876. key_filename="/tmp/id_rsa",
  1877. )
  1878. args = command.proc.args
  1879. self.assertListEqual(expected, args[0])
  1880. def test_run_with_ssh_command(self) -> None:
  1881. expected = [
  1882. "/path/to/ssh",
  1883. "-o",
  1884. "Option=Value",
  1885. "-x",
  1886. ]
  1887. if DEFAULT_GIT_PROTOCOL_VERSION_FETCH:
  1888. expected += [
  1889. "-o",
  1890. f"SetEnv GIT_PROTOCOL=version={DEFAULT_GIT_PROTOCOL_VERSION_FETCH}",
  1891. ]
  1892. expected += [
  1893. "host",
  1894. "git-clone-url",
  1895. ]
  1896. vendor = SubprocessSSHVendor()
  1897. command = vendor.run_command(
  1898. "host",
  1899. "git-clone-url",
  1900. ssh_command="/path/to/ssh -o Option=Value",
  1901. )
  1902. args = command.proc.args
  1903. self.assertListEqual(expected, args[0])
  1904. class PLinkSSHVendorTests(TestCase):
  1905. def setUp(self) -> None:
  1906. # Monkey Patch client subprocess popen
  1907. self._orig_popen = dulwich.client.subprocess.Popen
  1908. dulwich.client.subprocess.Popen = DummyPopen
  1909. def tearDown(self) -> None:
  1910. dulwich.client.subprocess.Popen = self._orig_popen
  1911. def test_run_command_dashes(self) -> None:
  1912. vendor = PLinkSSHVendor()
  1913. self.assertRaises(
  1914. StrangeHostname,
  1915. vendor.run_command,
  1916. "--weird-host",
  1917. "git-clone-url",
  1918. )
  1919. def test_run_command_password_and_privkey(self) -> None:
  1920. vendor = PLinkSSHVendor()
  1921. warnings.simplefilter("always", UserWarning)
  1922. self.addCleanup(warnings.resetwarnings)
  1923. warnings_list, restore_warnings = setup_warning_catcher()
  1924. self.addCleanup(restore_warnings)
  1925. command = vendor.run_command(
  1926. "host",
  1927. "git-clone-url",
  1928. password="12345",
  1929. key_filename="/tmp/id_rsa",
  1930. )
  1931. expected_warning = UserWarning(
  1932. "Invoking PLink with a password exposes the password in the process list."
  1933. )
  1934. for w in warnings_list:
  1935. if type(w) is type(expected_warning) and w.args == expected_warning.args:
  1936. break
  1937. else:
  1938. raise AssertionError(
  1939. f"Expected warning {expected_warning!r} not in {warnings_list!r}"
  1940. )
  1941. args = command.proc.args
  1942. if sys.platform == "win32":
  1943. binary = ["plink.exe", "-ssh"]
  1944. else:
  1945. binary = ["plink", "-ssh"]
  1946. expected = [
  1947. *binary,
  1948. "-pw",
  1949. "12345",
  1950. "-i",
  1951. "/tmp/id_rsa",
  1952. "host",
  1953. "git-clone-url",
  1954. ]
  1955. self.assertListEqual(expected, args[0])
  1956. def test_run_command_password(self) -> None:
  1957. if sys.platform == "win32":
  1958. binary = ["plink.exe", "-ssh"]
  1959. else:
  1960. binary = ["plink", "-ssh"]
  1961. expected = [*binary, "-pw", "12345", "host", "git-clone-url"]
  1962. vendor = PLinkSSHVendor()
  1963. warnings.simplefilter("always", UserWarning)
  1964. self.addCleanup(warnings.resetwarnings)
  1965. warnings_list, restore_warnings = setup_warning_catcher()
  1966. self.addCleanup(restore_warnings)
  1967. command = vendor.run_command("host", "git-clone-url", password="12345")
  1968. expected_warning = UserWarning(
  1969. "Invoking PLink with a password exposes the password in the process list."
  1970. )
  1971. for w in warnings_list:
  1972. if type(w) is type(expected_warning) and w.args == expected_warning.args:
  1973. break
  1974. else:
  1975. raise AssertionError(
  1976. f"Expected warning {expected_warning!r} not in {warnings_list!r}"
  1977. )
  1978. args = command.proc.args
  1979. self.assertListEqual(expected, args[0])
  1980. def test_run_command_with_port_username_and_privkey(self) -> None:
  1981. if sys.platform == "win32":
  1982. binary = ["plink.exe", "-ssh"]
  1983. else:
  1984. binary = ["plink", "-ssh"]
  1985. expected = [
  1986. *binary,
  1987. "-P",
  1988. "2200",
  1989. "-i",
  1990. "/tmp/id_rsa",
  1991. "user@host",
  1992. "git-clone-url",
  1993. ]
  1994. vendor = PLinkSSHVendor()
  1995. command = vendor.run_command(
  1996. "host",
  1997. "git-clone-url",
  1998. username="user",
  1999. port="2200",
  2000. key_filename="/tmp/id_rsa",
  2001. )
  2002. args = command.proc.args
  2003. self.assertListEqual(expected, args[0])
  2004. def test_run_with_ssh_command(self) -> None:
  2005. expected = [
  2006. "/path/to/plink",
  2007. "-ssh",
  2008. "host",
  2009. "git-clone-url",
  2010. ]
  2011. vendor = PLinkSSHVendor()
  2012. command = vendor.run_command(
  2013. "host",
  2014. "git-clone-url",
  2015. ssh_command="/path/to/plink",
  2016. )
  2017. args = command.proc.args
  2018. self.assertListEqual(expected, args[0])
  2019. class RsyncUrlTests(TestCase):
  2020. def test_simple(self) -> None:
  2021. self.assertEqual(parse_rsync_url("foo:bar/path"), (None, "foo", "bar/path"))
  2022. self.assertEqual(
  2023. parse_rsync_url("user@foo:bar/path"), ("user", "foo", "bar/path")
  2024. )
  2025. def test_path(self) -> None:
  2026. self.assertRaises(ValueError, parse_rsync_url, "/path")
  2027. class CheckWantsTests(TestCase):
  2028. def test_fine(self) -> None:
  2029. check_wants(
  2030. [b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262"],
  2031. {b"refs/heads/blah": b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262"},
  2032. )
  2033. def test_missing(self) -> None:
  2034. self.assertRaises(
  2035. InvalidWants,
  2036. check_wants,
  2037. [b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262"],
  2038. {b"refs/heads/blah": b"3f3dc7a53fb752a6961d3a56683df46d4d3bf262"},
  2039. )
  2040. def test_annotated(self) -> None:
  2041. self.assertRaises(
  2042. InvalidWants,
  2043. check_wants,
  2044. [b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262"],
  2045. {
  2046. b"refs/heads/blah": b"3f3dc7a53fb752a6961d3a56683df46d4d3bf262",
  2047. b"refs/heads/blah^{}": b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262",
  2048. },
  2049. )
  2050. class FetchPackResultTests(TestCase):
  2051. def test_eq(self) -> None:
  2052. self.assertEqual(
  2053. FetchPackResult(
  2054. {b"refs/heads/master": b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262"},
  2055. {},
  2056. b"user/agent",
  2057. ),
  2058. FetchPackResult(
  2059. {b"refs/heads/master": b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262"},
  2060. {},
  2061. b"user/agent",
  2062. ),
  2063. )
  2064. class GitCredentialStoreTests(TestCase):
  2065. @classmethod
  2066. def setUpClass(cls) -> None:
  2067. with tempfile.NamedTemporaryFile(delete=False) as f:
  2068. f.write(b"https://user:pass@example.org\n")
  2069. cls.fname = f.name
  2070. @classmethod
  2071. def tearDownClass(cls) -> None:
  2072. os.unlink(cls.fname)
  2073. def test_nonmatching_scheme(self) -> None:
  2074. result = list(
  2075. get_credentials_from_store("http", "example.org", fnames=[self.fname])
  2076. )
  2077. self.assertEqual(result, [])
  2078. def test_nonmatching_hostname(self) -> None:
  2079. result = list(
  2080. get_credentials_from_store("https", "noentry.org", fnames=[self.fname])
  2081. )
  2082. self.assertEqual(result, [])
  2083. def test_match_without_username(self) -> None:
  2084. result = list(
  2085. get_credentials_from_store("https", "example.org", fnames=[self.fname])
  2086. )
  2087. self.assertEqual(result, [("user", "pass")])
  2088. def test_match_with_matching_username(self) -> None:
  2089. result = list(
  2090. get_credentials_from_store(
  2091. "https", "example.org", "user", fnames=[self.fname]
  2092. )
  2093. )
  2094. self.assertEqual(result, [("user", "pass")])
  2095. def test_no_match_with_nonmatching_username(self) -> None:
  2096. result = list(
  2097. get_credentials_from_store(
  2098. "https", "example.org", "otheruser", fnames=[self.fname]
  2099. )
  2100. )
  2101. self.assertEqual(result, [])
  2102. class RemoteErrorFromStderrTests(TestCase):
  2103. def test_nothing(self) -> None:
  2104. self.assertEqual(_remote_error_from_stderr(None), HangupException())
  2105. def test_error_line(self) -> None:
  2106. b = BytesIO(
  2107. b"""\
  2108. This is some random output.
  2109. ERROR: This is the actual error
  2110. with a tail
  2111. """
  2112. )
  2113. self.assertEqual(
  2114. _remote_error_from_stderr(b),
  2115. GitProtocolError("This is the actual error"),
  2116. )
  2117. def test_no_error_line(self) -> None:
  2118. b = BytesIO(
  2119. b"""\
  2120. This is output without an error line.
  2121. And this line is just random noise, too.
  2122. """
  2123. )
  2124. self.assertEqual(
  2125. _remote_error_from_stderr(b),
  2126. HangupException(
  2127. [
  2128. b"This is output without an error line.",
  2129. b"And this line is just random noise, too.",
  2130. ]
  2131. ),
  2132. )
  2133. class TestExtractAgentAndSymrefs(TestCase):
  2134. def test_extract_agent_and_symrefs(self) -> None:
  2135. (symrefs, agent) = _extract_symrefs_and_agent(
  2136. [b"agent=git/2.31.1", b"symref=HEAD:refs/heads/master"]
  2137. )
  2138. self.assertEqual(agent, b"git/2.31.1")
  2139. self.assertEqual(symrefs, {b"HEAD": b"refs/heads/master"})