test_client.py 112 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094
  1. # test_client.py -- Tests for the git protocol, client side
  2. # Copyright (C) 2009 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. import base64
  22. import os
  23. import shutil
  24. import sys
  25. import tempfile
  26. import warnings
  27. from io import BytesIO
  28. from typing import NoReturn
  29. from unittest.mock import MagicMock, Mock, patch
  30. from urllib.parse import quote as urlquote
  31. from urllib.parse import urlparse
  32. import dulwich
  33. from dulwich import client
  34. from dulwich.bundle import create_bundle_from_repo, write_bundle
  35. from dulwich.client import (
  36. AuthCallbackPoolManager,
  37. BundleClient,
  38. FetchPackResult,
  39. GitProtocolError,
  40. HangupException,
  41. HttpGitClient,
  42. InvalidWants,
  43. LocalGitClient,
  44. PLinkSSHVendor,
  45. ReportStatusParser,
  46. SendPackError,
  47. SSHGitClient,
  48. StrangeHostname,
  49. SubprocessSSHVendor,
  50. TCPGitClient,
  51. TraditionalGitClient,
  52. Urllib3HttpGitClient,
  53. _extract_symrefs_and_agent,
  54. _remote_error_from_stderr,
  55. _win32_url_to_path,
  56. check_wants,
  57. default_urllib3_manager,
  58. get_credentials_from_store,
  59. get_transport_and_path,
  60. get_transport_and_path_from_url,
  61. parse_rsync_url,
  62. )
  63. from dulwich.config import ConfigDict
  64. from dulwich.object_format import DEFAULT_OBJECT_FORMAT
  65. from dulwich.objects import ZERO_SHA, Blob, Commit, Tree
  66. from dulwich.pack import pack_objects_to_data, write_pack_data, write_pack_objects
  67. from dulwich.protocol import DEFAULT_GIT_PROTOCOL_VERSION_FETCH, TCP_GIT_PORT, Protocol
  68. from dulwich.repo import MemoryRepo, Repo
  69. from dulwich.tests.utils import open_repo, setup_warning_catcher, tear_down_repo
  70. from . import TestCase, skipIf
  71. class DummyClient(TraditionalGitClient):
  72. def __init__(self, can_read, read, write) -> None:
  73. self.can_read = can_read
  74. self.read = read
  75. self.write = write
  76. TraditionalGitClient.__init__(self)
  77. def _connect(self, service, path, protocol_version=None):
  78. return Protocol(self.read, self.write), self.can_read, None
  79. class DummyPopen:
  80. def __init__(self, *args, **kwards) -> None:
  81. self.stdin = BytesIO(b"stdin")
  82. self.stdout = BytesIO(b"stdout")
  83. self.stderr = BytesIO(b"stderr")
  84. self.returncode = 0
  85. self.args = args
  86. self.kwargs = kwards
  87. def communicate(self, *args, **kwards):
  88. return ("Running", "")
  89. def wait(self, *args, **kwards) -> bool:
  90. return False
  91. # TODO(durin42): add unit-level tests of GitClient
  92. class GitClientTests(TestCase):
  93. def setUp(self) -> None:
  94. super().setUp()
  95. self.rout = BytesIO()
  96. self.rin = BytesIO()
  97. self.client = DummyClient(lambda x: True, self.rin.read, self.rout.write)
  98. def test_caps(self) -> None:
  99. agent_cap = "agent=dulwich/{}.{}.{}".format(*dulwich.__version__).encode(
  100. "ascii"
  101. )
  102. self.assertEqual(
  103. {
  104. b"multi_ack",
  105. b"side-band-64k",
  106. b"ofs-delta",
  107. b"thin-pack",
  108. b"multi_ack_detailed",
  109. b"shallow",
  110. agent_cap,
  111. },
  112. set(self.client._fetch_capabilities),
  113. )
  114. self.assertEqual(
  115. {
  116. b"delete-refs",
  117. b"ofs-delta",
  118. b"report-status",
  119. b"side-band-64k",
  120. agent_cap,
  121. },
  122. set(self.client._send_capabilities),
  123. )
  124. def test_archive_ack(self) -> None:
  125. self.rin.write(b"0009NACK\n0000")
  126. self.rin.seek(0)
  127. self.client.archive(b"bla", b"HEAD", None, None)
  128. self.assertEqual(self.rout.getvalue(), b"0011argument HEAD0000")
  129. def test_fetch_empty(self) -> None:
  130. self.rin.write(b"0000")
  131. self.rin.seek(0)
  132. def check_heads(heads, **kwargs):
  133. self.assertEqual(heads, {})
  134. return []
  135. ret = self.client.fetch_pack(b"/", check_heads, None, None)
  136. self.assertEqual({}, ret.refs)
  137. self.assertEqual({}, ret.symrefs)
  138. def test_fetch_pack_ignores_magic_ref(self) -> None:
  139. self.rin.write(
  140. b"00000000000000000000000000000000000000000000 capabilities^{}"
  141. b"\x00 multi_ack "
  142. b"thin-pack side-band side-band-64k ofs-delta shallow no-progress "
  143. b"include-tag\n"
  144. b"0000"
  145. )
  146. self.rin.seek(0)
  147. def check_heads(heads, **kwargs):
  148. self.assertEqual({}, heads)
  149. return []
  150. ret = self.client.fetch_pack(b"bla", check_heads, None, None, None)
  151. self.assertEqual({}, ret.refs)
  152. self.assertEqual({}, ret.symrefs)
  153. self.assertEqual(self.rout.getvalue(), b"0000")
  154. def test_fetch_pack_none(self) -> None:
  155. self.rin.write(
  156. b"008855dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7 HEAD\x00multi_ack "
  157. b"thin-pack side-band side-band-64k ofs-delta shallow no-progress "
  158. b"include-tag\n"
  159. b"0000"
  160. )
  161. self.rin.seek(0)
  162. ret = self.client.fetch_pack(
  163. b"bla", lambda heads, depth=None: [], None, None, None
  164. )
  165. self.assertEqual(
  166. {b"HEAD": b"55dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7"}, ret.refs
  167. )
  168. self.assertEqual({}, ret.symrefs)
  169. self.assertEqual(self.rout.getvalue(), b"0000")
  170. def test_handle_upload_pack_head_deepen_since(self) -> None:
  171. # Test that deepen-since command is properly sent
  172. from dulwich.client import _handle_upload_pack_head
  173. self.rin.write(b"0008NAK\n0000")
  174. self.rin.seek(0)
  175. class DummyGraphWalker:
  176. def __iter__(self):
  177. return self
  178. def __next__(self):
  179. return None
  180. proto = Protocol(self.rin.read, self.rout.write)
  181. capabilities = [b"shallow", b"deepen-since"]
  182. wants = [b"55dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7"]
  183. graph_walker = DummyGraphWalker()
  184. _handle_upload_pack_head(
  185. proto=proto,
  186. capabilities=capabilities,
  187. graph_walker=graph_walker,
  188. wants=wants,
  189. can_read=None,
  190. depth=None,
  191. protocol_version=0,
  192. shallow_since="2023-01-01T00:00:00Z",
  193. )
  194. # Verify the deepen-since command was sent
  195. output = self.rout.getvalue()
  196. self.assertIn(b"deepen-since 2023-01-01T00:00:00Z\n", output)
  197. def test_handle_upload_pack_head_deepen_not(self) -> None:
  198. # Test that deepen-not command is properly sent
  199. from dulwich.client import _handle_upload_pack_head
  200. self.rin.write(b"0008NAK\n0000")
  201. self.rin.seek(0)
  202. class DummyGraphWalker:
  203. def __iter__(self):
  204. return self
  205. def __next__(self):
  206. return None
  207. proto = Protocol(self.rin.read, self.rout.write)
  208. capabilities = [b"shallow", b"deepen-not"]
  209. wants = [b"55dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7"]
  210. graph_walker = DummyGraphWalker()
  211. _handle_upload_pack_head(
  212. proto=proto,
  213. capabilities=capabilities,
  214. graph_walker=graph_walker,
  215. wants=wants,
  216. can_read=None,
  217. depth=None,
  218. protocol_version=0,
  219. shallow_exclude=["refs/heads/excluded"],
  220. )
  221. # Verify the deepen-not command was sent
  222. output = self.rout.getvalue()
  223. self.assertIn(b"deepen-not refs/heads/excluded\n", output)
  224. def test_handle_upload_pack_head_deepen_not_multiple(self) -> None:
  225. # Test that multiple deepen-not commands are properly sent
  226. from dulwich.client import _handle_upload_pack_head
  227. self.rin.write(b"0008NAK\n0000")
  228. self.rin.seek(0)
  229. class DummyGraphWalker:
  230. def __iter__(self):
  231. return self
  232. def __next__(self):
  233. return None
  234. proto = Protocol(self.rin.read, self.rout.write)
  235. capabilities = [b"shallow", b"deepen-not"]
  236. wants = [b"55dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7"]
  237. graph_walker = DummyGraphWalker()
  238. _handle_upload_pack_head(
  239. proto=proto,
  240. capabilities=capabilities,
  241. graph_walker=graph_walker,
  242. wants=wants,
  243. can_read=None,
  244. depth=None,
  245. protocol_version=0,
  246. shallow_exclude=["refs/heads/excluded1", "refs/heads/excluded2"],
  247. )
  248. # Verify both deepen-not commands were sent
  249. output = self.rout.getvalue()
  250. self.assertIn(b"deepen-not refs/heads/excluded1\n", output)
  251. self.assertIn(b"deepen-not refs/heads/excluded2\n", output)
  252. def test_handle_upload_pack_head_deepen_since_and_not(self) -> None:
  253. # Test that deepen-since and deepen-not can be used together
  254. from dulwich.client import _handle_upload_pack_head
  255. self.rin.write(b"0008NAK\n0000")
  256. self.rin.seek(0)
  257. class DummyGraphWalker:
  258. def __iter__(self):
  259. return self
  260. def __next__(self):
  261. return None
  262. proto = Protocol(self.rin.read, self.rout.write)
  263. capabilities = [b"shallow", b"deepen-since", b"deepen-not"]
  264. wants = [b"55dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7"]
  265. graph_walker = DummyGraphWalker()
  266. _handle_upload_pack_head(
  267. proto=proto,
  268. capabilities=capabilities,
  269. graph_walker=graph_walker,
  270. wants=wants,
  271. can_read=None,
  272. depth=None,
  273. protocol_version=0,
  274. shallow_since="2023-01-01T00:00:00Z",
  275. shallow_exclude=["refs/heads/excluded"],
  276. )
  277. # Verify both deepen-since and deepen-not commands were sent
  278. output = self.rout.getvalue()
  279. self.assertIn(b"deepen-since 2023-01-01T00:00:00Z\n", output)
  280. self.assertIn(b"deepen-not refs/heads/excluded\n", output)
  281. def test_send_pack_no_sideband64k_with_update_ref_error(self) -> None:
  282. # No side-bank-64k reported by server shouldn't try to parse
  283. # side band data
  284. pkts = [
  285. b"55dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7 capabilities^{}"
  286. b"\x00 report-status delete-refs ofs-delta\n",
  287. b"",
  288. b"unpack ok",
  289. b"ng refs/foo/bar pre-receive hook declined",
  290. b"",
  291. ]
  292. for pkt in pkts:
  293. if pkt == b"":
  294. self.rin.write(b"0000")
  295. else:
  296. self.rin.write(("%04x" % (len(pkt) + 4)).encode("ascii") + pkt)
  297. self.rin.seek(0)
  298. tree = Tree()
  299. commit = Commit()
  300. commit.tree = tree
  301. commit.parents = []
  302. commit.author = commit.committer = b"test user"
  303. commit.commit_time = commit.author_time = 1174773719
  304. commit.commit_timezone = commit.author_timezone = 0
  305. commit.encoding = b"UTF-8"
  306. commit.message = b"test message"
  307. def update_refs(refs):
  308. return {
  309. b"refs/foo/bar": commit.id,
  310. }
  311. def generate_pack_data(have, want, *, ofs_delta=False, progress=None):
  312. return pack_objects_to_data(
  313. [
  314. (commit, None),
  315. (tree, b""),
  316. ]
  317. )
  318. result = self.client.send_pack("blah", update_refs, generate_pack_data)
  319. self.assertEqual(
  320. {b"refs/foo/bar": "pre-receive hook declined"}, result.ref_status
  321. )
  322. self.assertEqual({b"refs/foo/bar": commit.id}, result.refs)
  323. def test_send_pack_none(self) -> None:
  324. # Set ref to current value
  325. self.rin.write(
  326. b"0078310ca9477129b8586fa2afc779c1f57cf64bba6c "
  327. b"refs/heads/master\x00 report-status delete-refs "
  328. b"side-band-64k quiet ofs-delta\n"
  329. b"0000"
  330. )
  331. self.rin.seek(0)
  332. def update_refs(refs):
  333. return {b"refs/heads/master": b"310ca9477129b8586fa2afc779c1f57cf64bba6c"}
  334. def generate_pack_data(have, want, *, ofs_delta=False, progress=None):
  335. return 0, []
  336. self.client.send_pack(b"/", update_refs, generate_pack_data)
  337. self.assertEqual(self.rout.getvalue(), b"0000")
  338. def test_send_pack_keep_and_delete(self) -> None:
  339. self.rin.write(
  340. b"0063310ca9477129b8586fa2afc779c1f57cf64bba6c "
  341. b"refs/heads/master\x00report-status delete-refs ofs-delta\n"
  342. b"003f310ca9477129b8586fa2afc779c1f57cf64bba6c refs/heads/keepme\n"
  343. b"0000000eunpack ok\n"
  344. b"0019ok refs/heads/master\n"
  345. b"0000"
  346. )
  347. self.rin.seek(0)
  348. def update_refs(refs):
  349. return {b"refs/heads/master": ZERO_SHA}
  350. def generate_pack_data(have, want, *, ofs_delta=False, progress=None):
  351. return 0, []
  352. self.client.send_pack(b"/", update_refs, generate_pack_data)
  353. self.assertEqual(
  354. self.rout.getvalue(),
  355. b"008b310ca9477129b8586fa2afc779c1f57cf64bba6c "
  356. b"0000000000000000000000000000000000000000 "
  357. b"refs/heads/master\x00delete-refs ofs-delta report-status0000",
  358. )
  359. def test_send_pack_delete_only(self) -> None:
  360. self.rin.write(
  361. b"0063310ca9477129b8586fa2afc779c1f57cf64bba6c "
  362. b"refs/heads/master\x00report-status delete-refs ofs-delta\n"
  363. b"0000000eunpack ok\n"
  364. b"0019ok refs/heads/master\n"
  365. b"0000"
  366. )
  367. self.rin.seek(0)
  368. def update_refs(refs):
  369. return {b"refs/heads/master": ZERO_SHA}
  370. def generate_pack_data(have, want, *, ofs_delta=False, progress=None):
  371. return 0, []
  372. self.client.send_pack(b"/", update_refs, generate_pack_data)
  373. self.assertEqual(
  374. self.rout.getvalue(),
  375. b"008b310ca9477129b8586fa2afc779c1f57cf64bba6c "
  376. b"0000000000000000000000000000000000000000 "
  377. b"refs/heads/master\x00delete-refs ofs-delta report-status0000",
  378. )
  379. def test_send_pack_new_ref_only(self) -> None:
  380. self.rin.write(
  381. b"0063310ca9477129b8586fa2afc779c1f57cf64bba6c "
  382. b"refs/heads/master\x00report-status delete-refs ofs-delta\n"
  383. b"0000000eunpack ok\n"
  384. b"0019ok refs/heads/blah12\n"
  385. b"0000"
  386. )
  387. self.rin.seek(0)
  388. def update_refs(refs):
  389. return {
  390. b"refs/heads/blah12": b"310ca9477129b8586fa2afc779c1f57cf64bba6c",
  391. b"refs/heads/master": b"310ca9477129b8586fa2afc779c1f57cf64bba6c",
  392. }
  393. def generate_pack_data(have, want, *, ofs_delta=False, progress=None):
  394. return 0, []
  395. f = BytesIO()
  396. write_pack_objects(f.write, [], object_format=DEFAULT_OBJECT_FORMAT)
  397. self.client.send_pack("/", update_refs, generate_pack_data)
  398. self.assertEqual(
  399. self.rout.getvalue(),
  400. b"008b0000000000000000000000000000000000000000 "
  401. b"310ca9477129b8586fa2afc779c1f57cf64bba6c "
  402. b"refs/heads/blah12\x00delete-refs ofs-delta report-status0000"
  403. + f.getvalue(),
  404. )
  405. def test_send_pack_new_ref(self) -> None:
  406. self.rin.write(
  407. b"0064310ca9477129b8586fa2afc779c1f57cf64bba6c "
  408. b"refs/heads/master\x00 report-status delete-refs ofs-delta\n"
  409. b"0000000eunpack ok\n"
  410. b"0019ok refs/heads/blah12\n"
  411. b"0000"
  412. )
  413. self.rin.seek(0)
  414. tree = Tree()
  415. commit = Commit()
  416. commit.tree = tree
  417. commit.parents = []
  418. commit.author = commit.committer = b"test user"
  419. commit.commit_time = commit.author_time = 1174773719
  420. commit.commit_timezone = commit.author_timezone = 0
  421. commit.encoding = b"UTF-8"
  422. commit.message = b"test message"
  423. def update_refs(refs):
  424. return {
  425. b"refs/heads/blah12": commit.id,
  426. b"refs/heads/master": b"310ca9477129b8586fa2afc779c1f57cf64bba6c",
  427. }
  428. def generate_pack_data(have, want, *, ofs_delta=False, progress=None):
  429. return pack_objects_to_data(
  430. [
  431. (commit, None),
  432. (tree, b""),
  433. ]
  434. )
  435. f = BytesIO()
  436. count, records = generate_pack_data(None, None)
  437. from dulwich.object_format import DEFAULT_OBJECT_FORMAT
  438. write_pack_data(
  439. f.write, records, num_records=count, object_format=DEFAULT_OBJECT_FORMAT
  440. )
  441. self.client.send_pack(b"/", update_refs, generate_pack_data)
  442. self.assertEqual(
  443. self.rout.getvalue(),
  444. b"008b0000000000000000000000000000000000000000 "
  445. + commit.id
  446. + b" refs/heads/blah12\x00delete-refs ofs-delta report-status0000"
  447. + f.getvalue(),
  448. )
  449. def test_send_pack_no_deleteref_delete_only(self) -> None:
  450. pkts = [
  451. b"310ca9477129b8586fa2afc779c1f57cf64bba6c refs/heads/master"
  452. b"\x00 report-status ofs-delta\n",
  453. b"",
  454. b"",
  455. ]
  456. for pkt in pkts:
  457. if pkt == b"":
  458. self.rin.write(b"0000")
  459. else:
  460. self.rin.write(("%04x" % (len(pkt) + 4)).encode("ascii") + pkt)
  461. self.rin.seek(0)
  462. def update_refs(refs):
  463. return {b"refs/heads/master": ZERO_SHA}
  464. def generate_pack_data(have, want, *, ofs_delta=False, progress=None):
  465. return 0, []
  466. result = self.client.send_pack(b"/", update_refs, generate_pack_data)
  467. self.assertEqual(
  468. result.ref_status,
  469. {b"refs/heads/master": "remote does not support deleting refs"},
  470. )
  471. self.assertEqual(
  472. result.refs,
  473. {b"refs/heads/master": b"310ca9477129b8586fa2afc779c1f57cf64bba6c"},
  474. )
  475. self.assertEqual(self.rout.getvalue(), b"0000")
  476. class TestGetTransportAndPath(TestCase):
  477. def test_tcp(self) -> None:
  478. c, path = get_transport_and_path("git://foo.com/bar/baz")
  479. self.assertIsInstance(c, TCPGitClient)
  480. self.assertEqual("foo.com", c._host)
  481. self.assertEqual(TCP_GIT_PORT, c._port)
  482. self.assertEqual("/bar/baz", path)
  483. def test_tcp_port(self) -> None:
  484. c, path = get_transport_and_path("git://foo.com:1234/bar/baz")
  485. self.assertIsInstance(c, TCPGitClient)
  486. self.assertEqual("foo.com", c._host)
  487. self.assertEqual(1234, c._port)
  488. self.assertEqual("/bar/baz", path)
  489. def test_tcp_ipv6(self) -> None:
  490. c, path = get_transport_and_path("git://[::1]/bar/baz")
  491. self.assertIsInstance(c, TCPGitClient)
  492. self.assertEqual("::1", c._host)
  493. self.assertEqual(TCP_GIT_PORT, c._port)
  494. self.assertEqual("/bar/baz", path)
  495. def test_tcp_ipv6_port(self) -> None:
  496. c, path = get_transport_and_path("git://[2001:db8::1]:1234/bar/baz")
  497. self.assertIsInstance(c, TCPGitClient)
  498. self.assertEqual("2001:db8::1", c._host)
  499. self.assertEqual(1234, c._port)
  500. self.assertEqual("/bar/baz", path)
  501. def test_git_ssh_explicit(self) -> None:
  502. c, path = get_transport_and_path("git+ssh://foo.com/bar/baz")
  503. self.assertIsInstance(c, SSHGitClient)
  504. self.assertEqual("foo.com", c.host)
  505. self.assertEqual(None, c.port)
  506. self.assertEqual(None, c.username)
  507. self.assertEqual("/bar/baz", path)
  508. def test_ssh_explicit(self) -> None:
  509. c, path = get_transport_and_path("ssh://foo.com/bar/baz")
  510. self.assertIsInstance(c, SSHGitClient)
  511. self.assertEqual("foo.com", c.host)
  512. self.assertEqual(None, c.port)
  513. self.assertEqual(None, c.username)
  514. self.assertEqual("/bar/baz", path)
  515. def test_ssh_port_explicit(self) -> None:
  516. c, path = get_transport_and_path("git+ssh://foo.com:1234/bar/baz")
  517. self.assertIsInstance(c, SSHGitClient)
  518. self.assertEqual("foo.com", c.host)
  519. self.assertEqual(1234, c.port)
  520. self.assertEqual("/bar/baz", path)
  521. def test_username_and_port_explicit_unknown_scheme(self) -> None:
  522. c, path = get_transport_and_path("unknown://git@server:7999/dply/stuff.git")
  523. self.assertIsInstance(c, SSHGitClient)
  524. self.assertEqual("unknown", c.host)
  525. self.assertEqual("//git@server:7999/dply/stuff.git", path)
  526. def test_username_and_port_explicit(self) -> None:
  527. c, path = get_transport_and_path("ssh://git@server:7999/dply/stuff.git")
  528. self.assertIsInstance(c, SSHGitClient)
  529. self.assertEqual("git", c.username)
  530. self.assertEqual("server", c.host)
  531. self.assertEqual(7999, c.port)
  532. self.assertEqual("/dply/stuff.git", path)
  533. def test_ssh_abspath_doubleslash(self) -> None:
  534. c, path = get_transport_and_path("git+ssh://foo.com//bar/baz")
  535. self.assertIsInstance(c, SSHGitClient)
  536. self.assertEqual("foo.com", c.host)
  537. self.assertEqual(None, c.port)
  538. self.assertEqual(None, c.username)
  539. self.assertEqual("//bar/baz", path)
  540. def test_ssh_port(self) -> None:
  541. c, path = get_transport_and_path("git+ssh://foo.com:1234/bar/baz")
  542. self.assertIsInstance(c, SSHGitClient)
  543. self.assertEqual("foo.com", c.host)
  544. self.assertEqual(1234, c.port)
  545. self.assertEqual("/bar/baz", path)
  546. def test_ssh_implicit(self) -> None:
  547. c, path = get_transport_and_path("foo:/bar/baz")
  548. self.assertIsInstance(c, SSHGitClient)
  549. self.assertEqual("foo", c.host)
  550. self.assertEqual(None, c.port)
  551. self.assertEqual(None, c.username)
  552. self.assertEqual("/bar/baz", path)
  553. def test_ssh_host(self) -> None:
  554. c, path = get_transport_and_path("foo.com:/bar/baz")
  555. self.assertIsInstance(c, SSHGitClient)
  556. self.assertEqual("foo.com", c.host)
  557. self.assertEqual(None, c.port)
  558. self.assertEqual(None, c.username)
  559. self.assertEqual("/bar/baz", path)
  560. def test_ssh_user_host(self) -> None:
  561. c, path = get_transport_and_path("user@foo.com:/bar/baz")
  562. self.assertIsInstance(c, SSHGitClient)
  563. self.assertEqual("foo.com", c.host)
  564. self.assertEqual(None, c.port)
  565. self.assertEqual("user", c.username)
  566. self.assertEqual("/bar/baz", path)
  567. def test_ssh_relpath(self) -> None:
  568. c, path = get_transport_and_path("foo:bar/baz")
  569. self.assertIsInstance(c, SSHGitClient)
  570. self.assertEqual("foo", c.host)
  571. self.assertEqual(None, c.port)
  572. self.assertEqual(None, c.username)
  573. self.assertEqual("bar/baz", path)
  574. def test_ssh_host_relpath(self) -> None:
  575. c, path = get_transport_and_path("foo.com:bar/baz")
  576. self.assertIsInstance(c, SSHGitClient)
  577. self.assertEqual("foo.com", c.host)
  578. self.assertEqual(None, c.port)
  579. self.assertEqual(None, c.username)
  580. self.assertEqual("bar/baz", path)
  581. def test_ssh_user_host_relpath(self) -> None:
  582. c, path = get_transport_and_path("user@foo.com:bar/baz")
  583. self.assertIsInstance(c, SSHGitClient)
  584. self.assertEqual("foo.com", c.host)
  585. self.assertEqual(None, c.port)
  586. self.assertEqual("user", c.username)
  587. self.assertEqual("bar/baz", path)
  588. def test_local(self) -> None:
  589. c, path = get_transport_and_path("foo.bar/baz")
  590. self.assertIsInstance(c, LocalGitClient)
  591. self.assertEqual("foo.bar/baz", path)
  592. def test_ssh_with_config(self) -> None:
  593. # Test that core.sshCommand from config is passed to SSHGitClient
  594. config = ConfigDict()
  595. c, _path = get_transport_and_path(
  596. "ssh://git@github.com/user/repo.git", config=config
  597. )
  598. self.assertIsInstance(c, SSHGitClient)
  599. self.assertEqual(c.ssh_command, "ssh") # Now defaults to "ssh"
  600. config.set((b"core",), b"sshCommand", b"custom-ssh -o CustomOption=yes")
  601. c, _path = get_transport_and_path(
  602. "ssh://git@github.com/user/repo.git", config=config
  603. )
  604. self.assertIsInstance(c, SSHGitClient)
  605. self.assertEqual("custom-ssh -o CustomOption=yes", c.ssh_command)
  606. # Test rsync-style URL also gets the config
  607. c, _path = get_transport_and_path("git@github.com:user/repo.git", config=config)
  608. self.assertIsInstance(c, SSHGitClient)
  609. self.assertEqual("custom-ssh -o CustomOption=yes", c.ssh_command)
  610. @skipIf(sys.platform != "win32", "Behaviour only happens on windows.")
  611. def test_local_abs_windows_path(self) -> None:
  612. c, path = get_transport_and_path("C:\\foo.bar\\baz")
  613. self.assertIsInstance(c, LocalGitClient)
  614. self.assertEqual("C:\\foo.bar\\baz", path)
  615. def test_error(self) -> None:
  616. # Need to use a known urlparse.uses_netloc URL scheme to get the
  617. # expected parsing of the URL on Python versions less than 2.6.5
  618. c, _path = get_transport_and_path("prospero://bar/baz")
  619. self.assertIsInstance(c, SSHGitClient)
  620. def test_http(self) -> None:
  621. url = "https://github.com/jelmer/dulwich"
  622. c, path = get_transport_and_path(url)
  623. self.assertIsInstance(c, HttpGitClient)
  624. self.assertEqual("/jelmer/dulwich", path)
  625. def test_http_auth(self) -> None:
  626. url = "https://user:passwd@github.com/jelmer/dulwich"
  627. c, path = get_transport_and_path(url)
  628. self.assertIsInstance(c, HttpGitClient)
  629. self.assertEqual("/jelmer/dulwich", path)
  630. self.assertEqual("user", c._username)
  631. self.assertEqual("passwd", c._password)
  632. def test_http_auth_with_username(self) -> None:
  633. url = "https://github.com/jelmer/dulwich"
  634. c, path = get_transport_and_path(url, username="user2", password="blah")
  635. self.assertIsInstance(c, HttpGitClient)
  636. self.assertEqual("/jelmer/dulwich", path)
  637. self.assertEqual("user2", c._username)
  638. self.assertEqual("blah", c._password)
  639. def test_http_auth_with_username_and_in_url(self) -> None:
  640. url = "https://user:passwd@github.com/jelmer/dulwich"
  641. c, path = get_transport_and_path(url, username="user2", password="blah")
  642. self.assertIsInstance(c, HttpGitClient)
  643. self.assertEqual("/jelmer/dulwich", path)
  644. # Explicitly provided credentials should override URL credentials
  645. self.assertEqual("user2", c._username)
  646. self.assertEqual("blah", c._password)
  647. def test_http_no_auth(self) -> None:
  648. url = "https://github.com/jelmer/dulwich"
  649. c, path = get_transport_and_path(url)
  650. self.assertIsInstance(c, HttpGitClient)
  651. self.assertEqual("/jelmer/dulwich", path)
  652. self.assertIs(None, c._username)
  653. self.assertIs(None, c._password)
  654. def test_ssh_with_key_filename_and_ssh_command(self) -> None:
  655. # Test that key_filename and ssh_command are passed through to SSHGitClient
  656. c, path = get_transport_and_path(
  657. "ssh://git@github.com/user/repo.git",
  658. key_filename="/path/to/id_rsa",
  659. ssh_command="custom-ssh -o StrictHostKeyChecking=no",
  660. )
  661. self.assertIsInstance(c, SSHGitClient)
  662. self.assertEqual("/user/repo.git", path)
  663. self.assertEqual("/path/to/id_rsa", c.key_filename)
  664. self.assertEqual("custom-ssh -o StrictHostKeyChecking=no", c.ssh_command)
  665. class TestGetTransportAndPathFromUrl(TestCase):
  666. def test_tcp(self) -> None:
  667. c, path = get_transport_and_path_from_url("git://foo.com/bar/baz")
  668. self.assertIsInstance(c, TCPGitClient)
  669. self.assertEqual("foo.com", c._host)
  670. self.assertEqual(TCP_GIT_PORT, c._port)
  671. self.assertEqual("/bar/baz", path)
  672. def test_tcp_port(self) -> None:
  673. c, path = get_transport_and_path_from_url("git://foo.com:1234/bar/baz")
  674. self.assertIsInstance(c, TCPGitClient)
  675. self.assertEqual("foo.com", c._host)
  676. self.assertEqual(1234, c._port)
  677. self.assertEqual("/bar/baz", path)
  678. def test_ssh_explicit(self) -> None:
  679. c, path = get_transport_and_path_from_url("git+ssh://foo.com/bar/baz")
  680. self.assertIsInstance(c, SSHGitClient)
  681. self.assertEqual("foo.com", c.host)
  682. self.assertEqual(None, c.port)
  683. self.assertEqual(None, c.username)
  684. self.assertEqual("/bar/baz", path)
  685. def test_ssh_port_explicit(self) -> None:
  686. c, path = get_transport_and_path_from_url("git+ssh://foo.com:1234/bar/baz")
  687. self.assertIsInstance(c, SSHGitClient)
  688. self.assertEqual("foo.com", c.host)
  689. self.assertEqual(1234, c.port)
  690. self.assertEqual("/bar/baz", path)
  691. def test_ssh_homepath(self) -> None:
  692. c, path = get_transport_and_path_from_url("git+ssh://foo.com/~/bar/baz")
  693. self.assertIsInstance(c, SSHGitClient)
  694. self.assertEqual("foo.com", c.host)
  695. self.assertEqual(None, c.port)
  696. self.assertEqual(None, c.username)
  697. self.assertEqual("/~/bar/baz", path)
  698. def test_ssh_port_homepath(self) -> None:
  699. c, path = get_transport_and_path_from_url("git+ssh://foo.com:1234/~/bar/baz")
  700. self.assertIsInstance(c, SSHGitClient)
  701. self.assertEqual("foo.com", c.host)
  702. self.assertEqual(1234, c.port)
  703. self.assertEqual("/~/bar/baz", path)
  704. def test_ssh_host_relpath(self) -> None:
  705. self.assertRaises(
  706. ValueError, get_transport_and_path_from_url, "foo.com:bar/baz"
  707. )
  708. def test_ssh_user_host_relpath(self) -> None:
  709. self.assertRaises(
  710. ValueError, get_transport_and_path_from_url, "user@foo.com:bar/baz"
  711. )
  712. def test_local_path(self) -> None:
  713. self.assertRaises(ValueError, get_transport_and_path_from_url, "foo.bar/baz")
  714. def test_error(self) -> None:
  715. # Need to use a known urlparse.uses_netloc URL scheme to get the
  716. # expected parsing of the URL on Python versions less than 2.6.5
  717. self.assertRaises(
  718. ValueError, get_transport_and_path_from_url, "prospero://bar/baz"
  719. )
  720. def test_http(self) -> None:
  721. url = "https://github.com/jelmer/dulwich"
  722. c, path = get_transport_and_path_from_url(url)
  723. self.assertIsInstance(c, HttpGitClient)
  724. self.assertEqual("https://github.com", c.get_url(b"/"))
  725. self.assertEqual("/jelmer/dulwich", path)
  726. def test_http_port(self) -> None:
  727. url = "https://github.com:9090/jelmer/dulwich"
  728. c, path = get_transport_and_path_from_url(url)
  729. self.assertEqual("https://github.com:9090", c.get_url(b"/"))
  730. self.assertIsInstance(c, HttpGitClient)
  731. self.assertEqual("/jelmer/dulwich", path)
  732. @patch("os.name", "posix")
  733. @patch("sys.platform", "linux")
  734. def test_file(self) -> None:
  735. c, path = get_transport_and_path_from_url("file:///home/jelmer/foo")
  736. self.assertIsInstance(c, LocalGitClient)
  737. self.assertEqual("/home/jelmer/foo", path)
  738. def test_win32_url_to_path(self):
  739. def check(url, expected):
  740. parsed = urlparse(url)
  741. self.assertEqual(_win32_url_to_path(parsed), expected)
  742. check("file:C:/foo.bar/baz", "C:\\foo.bar\\baz")
  743. check("file:/C:/foo.bar/baz", "C:\\foo.bar\\baz")
  744. check("file://C:/foo.bar/baz", "C:\\foo.bar\\baz")
  745. check("file:///C:/foo.bar/baz", "C:\\foo.bar\\baz")
  746. @patch("os.name", "nt")
  747. @patch("sys.platform", "win32")
  748. def test_file_win(self) -> None:
  749. expected = "C:\\foo.bar\\baz"
  750. for file_url in [
  751. "file:C:/foo.bar/baz",
  752. "file:/C:/foo.bar/baz",
  753. "file://C:/foo.bar/baz",
  754. "file:///C:/foo.bar/baz",
  755. ]:
  756. c, path = get_transport_and_path(file_url)
  757. self.assertIsInstance(c, LocalGitClient)
  758. self.assertEqual(path, expected)
  759. for remote_url in [
  760. "file://host.example.com/C:/foo.bar/baz"
  761. "file://host.example.com/C:/foo.bar/baz"
  762. "file:////host.example/foo.bar/baz",
  763. ]:
  764. with self.assertRaises(NotImplementedError):
  765. c, path = get_transport_and_path(remote_url)
  766. class TestSSHVendor:
  767. def __init__(self) -> None:
  768. self.host = None
  769. self.command = ""
  770. self.username = None
  771. self.port = None
  772. self.password = None
  773. self.key_filename = None
  774. def run_command(
  775. self,
  776. host,
  777. command,
  778. username=None,
  779. port=None,
  780. password=None,
  781. key_filename=None,
  782. ssh_command=None,
  783. protocol_version=None,
  784. ):
  785. self.host = host
  786. self.command = command
  787. self.username = username
  788. self.port = port
  789. self.password = password
  790. self.key_filename = key_filename
  791. self.ssh_command = ssh_command
  792. self.protocol_version = protocol_version
  793. class Subprocess:
  794. def read(self, *args):
  795. return None
  796. def write(self, *args):
  797. return None
  798. def close(self):
  799. pass
  800. def can_read(self):
  801. return None
  802. return Subprocess()
  803. class SSHGitClientTests(TestCase):
  804. def setUp(self) -> None:
  805. super().setUp()
  806. self.server = TestSSHVendor()
  807. self.real_vendor = client.get_ssh_vendor
  808. client.get_ssh_vendor = lambda: self.server
  809. self.client = SSHGitClient("git.samba.org")
  810. def tearDown(self) -> None:
  811. super().tearDown()
  812. client.get_ssh_vendor = self.real_vendor
  813. def test_get_url(self) -> None:
  814. path = "/tmp/repo.git"
  815. c = SSHGitClient("git.samba.org")
  816. url = c.get_url(path)
  817. self.assertEqual("ssh://git.samba.org/tmp/repo.git", url)
  818. def test_get_url_with_username_and_port(self) -> None:
  819. path = "/tmp/repo.git"
  820. c = SSHGitClient("git.samba.org", port=2222, username="user")
  821. url = c.get_url(path)
  822. self.assertEqual("ssh://user@git.samba.org:2222/tmp/repo.git", url)
  823. def test_default_command(self) -> None:
  824. self.assertEqual(b"git-upload-pack", self.client._get_cmd_path(b"upload-pack"))
  825. def test_alternative_command_path(self) -> None:
  826. self.client.alternative_paths[b"upload-pack"] = b"/usr/lib/git/git-upload-pack"
  827. self.assertEqual(
  828. b"/usr/lib/git/git-upload-pack",
  829. self.client._get_cmd_path(b"upload-pack"),
  830. )
  831. def test_alternative_command_path_spaces(self) -> None:
  832. self.client.alternative_paths[b"upload-pack"] = (
  833. b"/usr/lib/git/git-upload-pack -ibla"
  834. )
  835. self.assertEqual(
  836. b"/usr/lib/git/git-upload-pack -ibla",
  837. self.client._get_cmd_path(b"upload-pack"),
  838. )
  839. def test_connect(self) -> None:
  840. server = self.server
  841. client = self.client
  842. client.username = b"username"
  843. client.port = 1337
  844. proto, _, _ = client._connect(b"command", b"/path/to/repo")
  845. try:
  846. self.assertEqual(b"username", server.username)
  847. self.assertEqual(1337, server.port)
  848. self.assertEqual(b"git-command '/path/to/repo'", server.command)
  849. finally:
  850. proto.close()
  851. proto, _, _ = client._connect(b"relative-command", b"/~/path/to/repo")
  852. try:
  853. self.assertEqual(b"git-relative-command '~/path/to/repo'", server.command)
  854. finally:
  855. proto.close()
  856. def test_ssh_command_precedence(self) -> None:
  857. self.overrideEnv("GIT_SSH", "/path/to/ssh")
  858. test_client = SSHGitClient("git.samba.org")
  859. self.assertEqual(test_client.ssh_command, "/path/to/ssh")
  860. self.overrideEnv("GIT_SSH_COMMAND", "/path/to/ssh -o Option=Value")
  861. test_client = SSHGitClient("git.samba.org")
  862. self.assertEqual(test_client.ssh_command, "/path/to/ssh -o Option=Value")
  863. test_client = SSHGitClient("git.samba.org", ssh_command="ssh -o Option1=Value1")
  864. self.assertEqual(test_client.ssh_command, "ssh -o Option1=Value1")
  865. def test_ssh_command_config(self) -> None:
  866. # Test core.sshCommand config setting
  867. # No config, no environment - should default to "ssh"
  868. self.overrideEnv("GIT_SSH", None)
  869. self.overrideEnv("GIT_SSH_COMMAND", None)
  870. test_client = SSHGitClient("git.samba.org")
  871. self.assertEqual(test_client.ssh_command, "ssh")
  872. # Config with core.sshCommand
  873. config = ConfigDict()
  874. config.set((b"core",), b"sshCommand", b"ssh -o StrictHostKeyChecking=no")
  875. test_client = SSHGitClient("git.samba.org", config=config)
  876. self.assertEqual(test_client.ssh_command, "ssh -o StrictHostKeyChecking=no")
  877. # ssh_command parameter takes precedence over config
  878. test_client = SSHGitClient(
  879. "git.samba.org", config=config, ssh_command="custom-ssh"
  880. )
  881. self.assertEqual(test_client.ssh_command, "custom-ssh")
  882. # Environment variables take precedence over config when no ssh_command parameter
  883. self.overrideEnv("GIT_SSH_COMMAND", "/usr/bin/ssh -v")
  884. test_client = SSHGitClient("git.samba.org", config=config)
  885. self.assertEqual(test_client.ssh_command, "/usr/bin/ssh -v")
  886. def test_ssh_kwargs_passed_to_vendor(self) -> None:
  887. # Test that ssh_command and other kwargs are actually passed to the SSH vendor
  888. server = self.server
  889. client = self.client
  890. # Set custom ssh_command
  891. client.ssh_command = "custom-ssh-wrapper.sh -o Option=Value"
  892. client.password = "test-password"
  893. client.key_filename = "/path/to/key"
  894. # Connect and verify all kwargs are passed through
  895. proto, _, _ = client._connect(b"upload-pack", b"/path/to/repo")
  896. self.addCleanup(proto.close)
  897. self.assertEqual(server.ssh_command, "custom-ssh-wrapper.sh -o Option=Value")
  898. self.assertEqual(server.password, "test-password")
  899. self.assertEqual(server.key_filename, "/path/to/key")
  900. class ReportStatusParserTests(TestCase):
  901. def test_invalid_pack(self) -> None:
  902. parser = ReportStatusParser()
  903. parser.handle_packet(b"unpack error - foo bar")
  904. parser.handle_packet(b"ok refs/foo/bar")
  905. parser.handle_packet(None)
  906. self.assertRaises(SendPackError, list, parser.check())
  907. def test_update_refs_error(self) -> None:
  908. parser = ReportStatusParser()
  909. parser.handle_packet(b"unpack ok")
  910. parser.handle_packet(b"ng refs/foo/bar need to pull")
  911. parser.handle_packet(None)
  912. self.assertEqual([(b"refs/foo/bar", "need to pull")], list(parser.check()))
  913. def test_ok(self) -> None:
  914. parser = ReportStatusParser()
  915. parser.handle_packet(b"unpack ok")
  916. parser.handle_packet(b"ok refs/foo/bar")
  917. parser.handle_packet(None)
  918. self.assertEqual([(b"refs/foo/bar", None)], list(parser.check()))
  919. class LocalGitClientTests(TestCase):
  920. def test_get_url(self) -> None:
  921. path = "/tmp/repo.git"
  922. c = LocalGitClient()
  923. url = c.get_url(path)
  924. self.assertEqual("file:///tmp/repo.git", url)
  925. def test_fetch_into_empty(self) -> None:
  926. c = LocalGitClient()
  927. target = tempfile.mkdtemp()
  928. self.addCleanup(shutil.rmtree, target)
  929. t = Repo.init_bare(target)
  930. self.addCleanup(t.close)
  931. s = open_repo("a.git")
  932. self.addCleanup(tear_down_repo, s)
  933. self.assertEqual(s.get_refs(), c.fetch(s.path, t).refs)
  934. def test_clone(self) -> None:
  935. c = LocalGitClient()
  936. s = open_repo("a.git")
  937. self.addCleanup(tear_down_repo, s)
  938. target = tempfile.mkdtemp()
  939. self.addCleanup(shutil.rmtree, target)
  940. result_repo = c.clone(s.path, target, mkdir=False)
  941. self.addCleanup(result_repo.close)
  942. expected = dict(s.get_refs())
  943. expected[b"refs/remotes/origin/HEAD"] = expected[b"HEAD"]
  944. expected[b"refs/remotes/origin/master"] = expected[b"refs/heads/master"]
  945. self.assertEqual(expected, result_repo.get_refs())
  946. def test_clone_sha256_local(self) -> None:
  947. """Test that cloning a SHA-256 local repo creates a SHA-256 clone."""
  948. client = LocalGitClient()
  949. # Create a SHA-256 source repository
  950. source_path = tempfile.mkdtemp()
  951. self.addCleanup(shutil.rmtree, source_path)
  952. source_repo = Repo.init(source_path, object_format="sha256")
  953. # Verify source is SHA-256
  954. self.assertEqual("sha256", source_repo.object_format.name)
  955. source_repo.close()
  956. # Clone the repository
  957. target_path = tempfile.mkdtemp()
  958. self.addCleanup(shutil.rmtree, target_path)
  959. cloned_repo = client.clone(source_path, target_path, mkdir=False)
  960. self.addCleanup(cloned_repo.close)
  961. # Verify the clone uses SHA-256
  962. self.assertEqual("sha256", cloned_repo.object_format.name)
  963. # Verify the config has the correct objectformat extension
  964. config = cloned_repo.get_config()
  965. self.assertEqual(b"sha256", config.get((b"extensions",), b"objectformat"))
  966. def test_clone_sha1_local(self) -> None:
  967. """Test that cloning a SHA-1 local repo creates a SHA-1 clone."""
  968. client = LocalGitClient()
  969. # Create a SHA-1 source repository
  970. source_path = tempfile.mkdtemp()
  971. self.addCleanup(shutil.rmtree, source_path)
  972. source_repo = Repo.init(source_path, object_format="sha1")
  973. # Verify source is SHA-1
  974. self.assertEqual("sha1", source_repo.object_format.name)
  975. source_repo.close()
  976. # Clone the repository
  977. target_path = tempfile.mkdtemp()
  978. self.addCleanup(shutil.rmtree, target_path)
  979. cloned_repo = client.clone(source_path, target_path, mkdir=False)
  980. self.addCleanup(cloned_repo.close)
  981. # Verify the clone uses SHA-1
  982. self.assertEqual("sha1", cloned_repo.object_format.name)
  983. def test_fetch_empty(self) -> None:
  984. c = LocalGitClient()
  985. s = open_repo("a.git")
  986. self.addCleanup(tear_down_repo, s)
  987. out = BytesIO()
  988. walker = {}
  989. ret = c.fetch_pack(
  990. s.path,
  991. lambda heads, depth=None: [],
  992. graph_walker=walker,
  993. pack_data=out.write,
  994. )
  995. self.assertEqual(
  996. {
  997. b"HEAD": b"a90fa2d900a17e99b433217e988c4eb4a2e9a097",
  998. b"refs/heads/master": b"a90fa2d900a17e99b433217e988c4eb4a2e9a097",
  999. b"refs/tags/mytag": b"28237f4dc30d0d462658d6b937b08a0f0b6ef55a",
  1000. b"refs/tags/mytag-packed": b"b0931cadc54336e78a1d980420e3268903b57a50",
  1001. },
  1002. ret.refs,
  1003. )
  1004. self.assertEqual({b"HEAD": b"refs/heads/master"}, ret.symrefs)
  1005. self.assertEqual(
  1006. b"PACK\x00\x00\x00\x02\x00\x00\x00\x00\x02\x9d\x08"
  1007. b"\x82;\xd8\xa8\xea\xb5\x10\xadj\xc7\\\x82<\xfd>\xd3\x1e",
  1008. out.getvalue(),
  1009. )
  1010. def test_fetch_pack_none(self) -> None:
  1011. c = LocalGitClient()
  1012. s = open_repo("a.git")
  1013. self.addCleanup(tear_down_repo, s)
  1014. out = BytesIO()
  1015. walker = MemoryRepo().get_graph_walker()
  1016. ret = c.fetch_pack(
  1017. s.path,
  1018. lambda heads, depth=None: [b"a90fa2d900a17e99b433217e988c4eb4a2e9a097"],
  1019. graph_walker=walker,
  1020. pack_data=out.write,
  1021. )
  1022. self.assertEqual({b"HEAD": b"refs/heads/master"}, ret.symrefs)
  1023. self.assertEqual(
  1024. {
  1025. b"HEAD": b"a90fa2d900a17e99b433217e988c4eb4a2e9a097",
  1026. b"refs/heads/master": b"a90fa2d900a17e99b433217e988c4eb4a2e9a097",
  1027. b"refs/tags/mytag": b"28237f4dc30d0d462658d6b937b08a0f0b6ef55a",
  1028. b"refs/tags/mytag-packed": b"b0931cadc54336e78a1d980420e3268903b57a50",
  1029. },
  1030. ret.refs,
  1031. )
  1032. # Hardcoding is not ideal, but we'll fix that some other day..
  1033. self.assertTrue(
  1034. out.getvalue().startswith(b"PACK\x00\x00\x00\x02\x00\x00\x00\x07")
  1035. )
  1036. def test_send_pack_without_changes(self) -> None:
  1037. local = open_repo("a.git")
  1038. self.addCleanup(tear_down_repo, local)
  1039. target = open_repo("a.git")
  1040. self.addCleanup(tear_down_repo, target)
  1041. self.send_and_verify(b"master", local, target)
  1042. def test_send_pack_with_changes(self) -> None:
  1043. local = open_repo("a.git")
  1044. self.addCleanup(tear_down_repo, local)
  1045. target_path = tempfile.mkdtemp()
  1046. self.addCleanup(shutil.rmtree, target_path)
  1047. with Repo.init_bare(target_path) as target:
  1048. self.send_and_verify(b"master", local, target)
  1049. def test_get_refs(self) -> None:
  1050. local = open_repo("refs.git")
  1051. self.addCleanup(tear_down_repo, local)
  1052. client = LocalGitClient()
  1053. result = client.get_refs(local.path)
  1054. self.assertDictEqual(local.refs.as_dict(), result.refs)
  1055. # Check that symrefs are detected correctly
  1056. self.assertIn(b"HEAD", result.symrefs)
  1057. def test_fetch_object_format_mismatch_sha256_to_sha1(self) -> None:
  1058. """Test that fetching from SHA-256 to non-empty SHA-1 repository fails."""
  1059. from dulwich.objects import Blob
  1060. client = LocalGitClient()
  1061. # Create SHA-256 source repository
  1062. sha256_path = tempfile.mkdtemp()
  1063. self.addCleanup(shutil.rmtree, sha256_path)
  1064. sha256_repo = Repo.init(sha256_path, object_format="sha256")
  1065. self.addCleanup(sha256_repo.close)
  1066. # Create SHA-1 target repository with an object (so it can't be auto-changed)
  1067. sha1_path = tempfile.mkdtemp()
  1068. self.addCleanup(shutil.rmtree, sha1_path)
  1069. sha1_repo = Repo.init(sha1_path, object_format="sha1")
  1070. self.addCleanup(sha1_repo.close)
  1071. # Add an object to make the repo non-empty
  1072. blob = Blob.from_string(b"test content")
  1073. sha1_repo.object_store.add_object(blob)
  1074. # Attempt to fetch should raise AssertionError (repo not empty)
  1075. with self.assertRaises(AssertionError) as cm:
  1076. client.fetch(sha256_path, sha1_repo)
  1077. self.assertIn("Cannot change object format", str(cm.exception))
  1078. self.assertIn("already contains objects", str(cm.exception))
  1079. def test_fetch_object_format_mismatch_sha1_to_sha256(self) -> None:
  1080. """Test that fetching from SHA-1 to non-empty SHA-256 repository fails."""
  1081. from dulwich.objects import Blob
  1082. client = LocalGitClient()
  1083. # Create SHA-1 source repository
  1084. sha1_path = tempfile.mkdtemp()
  1085. self.addCleanup(shutil.rmtree, sha1_path)
  1086. sha1_repo = Repo.init(sha1_path, object_format="sha1")
  1087. self.addCleanup(sha1_repo.close)
  1088. # Create SHA-256 target repository with an object (so it can't be auto-changed)
  1089. sha256_path = tempfile.mkdtemp()
  1090. self.addCleanup(shutil.rmtree, sha256_path)
  1091. sha256_repo = Repo.init(sha256_path, object_format="sha256")
  1092. self.addCleanup(sha256_repo.close)
  1093. # Add an object to make the repo non-empty
  1094. blob = Blob.from_string(b"test content")
  1095. sha256_repo.object_store.add_object(blob)
  1096. # Attempt to fetch should raise AssertionError (repo not empty)
  1097. with self.assertRaises(AssertionError) as cm:
  1098. client.fetch(sha1_path, sha256_repo)
  1099. self.assertIn("Cannot change object format", str(cm.exception))
  1100. self.assertIn("already contains objects", str(cm.exception))
  1101. def test_fetch_object_format_same(self) -> None:
  1102. """Test that fetching between repositories with same object format works."""
  1103. client = LocalGitClient()
  1104. # Create SHA-256 source repository
  1105. sha256_src = tempfile.mkdtemp()
  1106. self.addCleanup(shutil.rmtree, sha256_src)
  1107. src_repo = Repo.init(sha256_src, object_format="sha256")
  1108. self.addCleanup(src_repo.close)
  1109. # Create SHA-256 target repository
  1110. sha256_dst = tempfile.mkdtemp()
  1111. self.addCleanup(shutil.rmtree, sha256_dst)
  1112. dst_repo = Repo.init(sha256_dst, object_format="sha256")
  1113. self.addCleanup(dst_repo.close)
  1114. # Fetch should succeed without error
  1115. result = client.fetch(sha256_src, dst_repo)
  1116. self.assertIsNotNone(result)
  1117. def send_and_verify(self, branch, local, target) -> None:
  1118. """Send branch from local to remote repository and verify it worked."""
  1119. client = LocalGitClient()
  1120. ref_name = b"refs/heads/" + branch
  1121. result = client.send_pack(
  1122. target.path,
  1123. lambda _: {ref_name: local.refs[ref_name]},
  1124. local.generate_pack_data,
  1125. )
  1126. self.assertEqual(local.refs[ref_name], result.refs[ref_name])
  1127. self.assertIs(None, result.agent)
  1128. self.assertEqual({}, result.ref_status)
  1129. obj_local = local.get_object(result.refs[ref_name])
  1130. obj_target = target.get_object(result.refs[ref_name])
  1131. self.assertEqual(obj_local, obj_target)
  1132. class BundleClientTests(TestCase):
  1133. def setUp(self) -> None:
  1134. super().setUp()
  1135. self.tempdir = tempfile.mkdtemp()
  1136. self.addCleanup(shutil.rmtree, self.tempdir)
  1137. def _create_test_bundle(self):
  1138. """Create a test bundle file and return its path."""
  1139. # Create a simple repository
  1140. repo = MemoryRepo()
  1141. # Create some objects
  1142. blob = Blob.from_string(b"Hello world")
  1143. repo.object_store.add_object(blob)
  1144. tree = Tree()
  1145. tree.add(b"hello.txt", 0o100644, blob.id)
  1146. repo.object_store.add_object(tree)
  1147. commit = Commit()
  1148. commit.tree = tree.id
  1149. commit.message = b"Initial commit"
  1150. commit.author = commit.committer = b"Test User <test@example.com>"
  1151. commit.commit_time = commit.author_time = 1234567890
  1152. commit.commit_timezone = commit.author_timezone = 0
  1153. repo.object_store.add_object(commit)
  1154. repo.refs[b"refs/heads/master"] = commit.id
  1155. # Create bundle
  1156. bundle = create_bundle_from_repo(repo)
  1157. # Write bundle to file
  1158. bundle_path = os.path.join(self.tempdir, "test.bundle")
  1159. with open(bundle_path, "wb") as f:
  1160. write_bundle(f, bundle)
  1161. return bundle_path, repo
  1162. def test_is_bundle_file(self) -> None:
  1163. """Test bundle file detection."""
  1164. bundle_path, _ = self._create_test_bundle()
  1165. # Test positive case
  1166. self.assertTrue(BundleClient._is_bundle_file(bundle_path))
  1167. # Test negative case - regular file
  1168. regular_file = os.path.join(self.tempdir, "regular.txt")
  1169. with open(regular_file, "w") as f:
  1170. f.write("not a bundle")
  1171. self.assertFalse(BundleClient._is_bundle_file(regular_file))
  1172. # Test negative case - non-existent file
  1173. self.assertFalse(BundleClient._is_bundle_file("/non/existent/file"))
  1174. def test_get_refs(self) -> None:
  1175. """Test getting refs from bundle."""
  1176. bundle_path, _ = self._create_test_bundle()
  1177. client = BundleClient()
  1178. result = client.get_refs(bundle_path)
  1179. self.assertIn(b"refs/heads/master", result.refs)
  1180. self.assertEqual(result.symrefs, {})
  1181. def test_fetch_pack(self) -> None:
  1182. """Test fetching pack from bundle."""
  1183. bundle_path, _source_repo = self._create_test_bundle()
  1184. client = BundleClient()
  1185. pack_data = BytesIO()
  1186. def determine_wants(refs):
  1187. return list(refs.values())
  1188. class MockGraphWalker:
  1189. def next(self):
  1190. return None
  1191. def ack(self, sha):
  1192. pass
  1193. result = client.fetch_pack(
  1194. bundle_path, determine_wants, MockGraphWalker(), pack_data.write
  1195. )
  1196. # Verify we got refs back
  1197. self.assertIn(b"refs/heads/master", result.refs)
  1198. # Verify pack data was written
  1199. self.assertGreater(len(pack_data.getvalue()), 0)
  1200. def test_fetch(self) -> None:
  1201. """Test fetching from bundle into target repo."""
  1202. bundle_path, _source_repo = self._create_test_bundle()
  1203. client = BundleClient()
  1204. target_repo = MemoryRepo()
  1205. self.addCleanup(target_repo.close)
  1206. result = client.fetch(bundle_path, target_repo)
  1207. # Verify refs were imported
  1208. self.assertIn(b"refs/heads/master", result.refs)
  1209. # Verify objects were imported
  1210. master_id = result.refs[b"refs/heads/master"]
  1211. self.assertIn(master_id, target_repo.object_store)
  1212. # Verify the commit object is correct
  1213. commit = target_repo.object_store[master_id]
  1214. self.assertEqual(commit.message, b"Initial commit")
  1215. def test_send_pack_not_supported(self) -> None:
  1216. """Test that send_pack raises NotImplementedError."""
  1217. bundle_path, _ = self._create_test_bundle()
  1218. client = BundleClient()
  1219. with self.assertRaises(NotImplementedError):
  1220. client.send_pack(bundle_path, None, None)
  1221. def test_get_transport_and_path_bundle(self) -> None:
  1222. """Test that get_transport_and_path detects bundle files."""
  1223. bundle_path, _ = self._create_test_bundle()
  1224. client, path = get_transport_and_path(bundle_path)
  1225. self.assertIsInstance(client, BundleClient)
  1226. self.assertEqual(path, bundle_path)
  1227. class HttpGitClientTests(TestCase):
  1228. def test_get_url(self) -> None:
  1229. base_url = "https://github.com/jelmer/dulwich"
  1230. path = "/jelmer/dulwich"
  1231. c = HttpGitClient(base_url)
  1232. url = c.get_url(path)
  1233. self.assertEqual("https://github.com/jelmer/dulwich", url)
  1234. def test_get_url_bytes_path(self) -> None:
  1235. base_url = "https://github.com/jelmer/dulwich"
  1236. path_bytes = b"/jelmer/dulwich"
  1237. c = HttpGitClient(base_url)
  1238. url = c.get_url(path_bytes)
  1239. self.assertEqual("https://github.com/jelmer/dulwich", url)
  1240. def test_get_url_with_username_and_passwd(self) -> None:
  1241. base_url = "https://github.com/jelmer/dulwich"
  1242. path = "/jelmer/dulwich"
  1243. c = HttpGitClient(base_url, username="USERNAME", password="PASSWD")
  1244. url = c.get_url(path)
  1245. self.assertEqual("https://github.com/jelmer/dulwich", url)
  1246. def test_init_username_passwd_set(self) -> None:
  1247. url = "https://github.com/jelmer/dulwich"
  1248. c = HttpGitClient(url, config=None, username="user", password="passwd")
  1249. self.assertEqual("user", c._username)
  1250. self.assertEqual("passwd", c._password)
  1251. basic_auth = c.pool_manager.headers["authorization"]
  1252. auth_string = "{}:{}".format("user", "passwd")
  1253. b64_credentials = base64.b64encode(auth_string.encode("latin1"))
  1254. expected_basic_auth = "Basic {}".format(b64_credentials.decode("latin1"))
  1255. self.assertEqual(basic_auth, expected_basic_auth)
  1256. def test_init_username_set_no_password(self) -> None:
  1257. url = "https://github.com/jelmer/dulwich"
  1258. c = HttpGitClient(url, config=None, username="user")
  1259. self.assertEqual("user", c._username)
  1260. self.assertIsNone(c._password)
  1261. basic_auth = c.pool_manager.headers["authorization"]
  1262. auth_string = b"user:"
  1263. b64_credentials = base64.b64encode(auth_string)
  1264. expected_basic_auth = f"Basic {b64_credentials.decode('ascii')}"
  1265. self.assertEqual(basic_auth, expected_basic_auth)
  1266. def test_init_no_username_passwd(self) -> None:
  1267. url = "https://github.com/jelmer/dulwich"
  1268. c = HttpGitClient(url, config=None)
  1269. self.assertIs(None, c._username)
  1270. self.assertIs(None, c._password)
  1271. self.assertNotIn("authorization", c.pool_manager.headers)
  1272. def test_from_parsedurl_username_only(self) -> None:
  1273. username = "user"
  1274. url = f"https://{username}@github.com/jelmer/dulwich"
  1275. c = HttpGitClient.from_parsedurl(urlparse(url))
  1276. self.assertEqual(c._username, username)
  1277. self.assertEqual(c._password, None)
  1278. basic_auth = c.pool_manager.headers["authorization"]
  1279. auth_string = username.encode("ascii") + b":"
  1280. b64_credentials = base64.b64encode(auth_string)
  1281. expected_basic_auth = f"Basic {b64_credentials.decode('ascii')}"
  1282. self.assertEqual(basic_auth, expected_basic_auth)
  1283. def test_from_parsedurl_on_url_with_quoted_credentials(self) -> None:
  1284. original_username = "john|the|first"
  1285. quoted_username = urlquote(original_username)
  1286. original_password = "Ya#1$2%3"
  1287. quoted_password = urlquote(original_password)
  1288. url = f"https://{quoted_username}:{quoted_password}@github.com/jelmer/dulwich"
  1289. c = HttpGitClient.from_parsedurl(urlparse(url))
  1290. self.assertEqual(original_username, c._username)
  1291. self.assertEqual(original_password, c._password)
  1292. basic_auth = c.pool_manager.headers["authorization"]
  1293. auth_string = f"{original_username}:{original_password}"
  1294. b64_credentials = base64.b64encode(auth_string.encode("latin1"))
  1295. expected_basic_auth = "Basic {}".format(b64_credentials.decode("latin1"))
  1296. self.assertEqual(basic_auth, expected_basic_auth)
  1297. def test_url_redirect_location(self) -> None:
  1298. from urllib3.response import HTTPResponse
  1299. test_data = {
  1300. "https://gitlab.com/inkscape/inkscape/": {
  1301. "location": "https://gitlab.com/inkscape/inkscape.git/",
  1302. "redirect_url": "https://gitlab.com/inkscape/inkscape.git/",
  1303. "refs_data": (
  1304. b"001e# service=git-upload-pack\n00000032"
  1305. b"fb2bebf4919a011f0fd7cec085443d0031228e76 "
  1306. b"HEAD\n0000"
  1307. ),
  1308. },
  1309. "https://github.com/jelmer/dulwich/": {
  1310. "location": "https://github.com/jelmer/dulwich/",
  1311. "redirect_url": "https://github.com/jelmer/dulwich/",
  1312. "refs_data": (
  1313. b"001e# service=git-upload-pack\n00000032"
  1314. b"3ff25e09724aa4d86ea5bca7d5dd0399a3c8bfcf "
  1315. b"HEAD\n0000"
  1316. ),
  1317. },
  1318. # check for absolute-path URI reference as location
  1319. "https://codeberg.org/ashwinvis/radicale-sh.git/": {
  1320. "location": "/ashwinvis/radicale-auth-sh/",
  1321. "redirect_url": "https://codeberg.org/ashwinvis/radicale-auth-sh/",
  1322. "refs_data": (
  1323. b"001e# service=git-upload-pack\n00000032"
  1324. b"470f8603768b608fc988675de2fae8f963c21158 "
  1325. b"HEAD\n0000"
  1326. ),
  1327. },
  1328. }
  1329. tail = "info/refs?service=git-upload-pack"
  1330. # we need to mock urllib3.PoolManager as this test will fail
  1331. # otherwise without an active internet connection
  1332. class PoolManagerMock:
  1333. def __init__(self) -> None:
  1334. self.headers: dict[str, str] = {}
  1335. def request(
  1336. self,
  1337. method,
  1338. url,
  1339. fields=None,
  1340. headers=None,
  1341. redirect=True,
  1342. preload_content=True,
  1343. ):
  1344. base_url = url[: -len(tail)]
  1345. redirect_base_url = test_data[base_url]["location"]
  1346. redirect_url = redirect_base_url + tail
  1347. headers = {
  1348. "Content-Type": "application/x-git-upload-pack-advertisement"
  1349. }
  1350. body = test_data[base_url]["refs_data"]
  1351. # urllib3 handles automatic redirection by default
  1352. status = 200
  1353. request_url = redirect_url
  1354. # simulate urllib3 behavior when redirect parameter is False
  1355. if redirect is False:
  1356. request_url = url
  1357. if redirect_base_url != base_url:
  1358. body = b""
  1359. headers["location"] = test_data[base_url]["location"]
  1360. status = 301
  1361. return HTTPResponse(
  1362. body=BytesIO(body),
  1363. headers=headers,
  1364. request_method=method,
  1365. request_url=request_url,
  1366. preload_content=preload_content,
  1367. status=status,
  1368. )
  1369. pool_manager = PoolManagerMock()
  1370. for base_url in test_data.keys():
  1371. # instantiate HttpGitClient with mocked pool manager
  1372. c = HttpGitClient(base_url, pool_manager=pool_manager, config=None)
  1373. # call method that detects url redirection
  1374. _, _, processed_url, _, _ = c._discover_references(
  1375. b"git-upload-pack", base_url
  1376. )
  1377. # send the same request as the method above without redirection
  1378. resp = c.pool_manager.request("GET", base_url + tail, redirect=False)
  1379. # check expected behavior of urllib3
  1380. redirect_location = resp.get_redirect_location()
  1381. if resp.status == 200:
  1382. self.assertFalse(redirect_location)
  1383. if redirect_location:
  1384. # check that url redirection has been correctly detected
  1385. self.assertEqual(processed_url, test_data[base_url]["redirect_url"])
  1386. else:
  1387. # check also the no redirection case
  1388. self.assertEqual(processed_url, base_url)
  1389. def test_smart_request_content_type_with_directive_check(self) -> None:
  1390. from urllib3.response import HTTPResponse
  1391. # we need to mock urllib3.PoolManager as this test will fail
  1392. # otherwise without an active internet connection
  1393. class PoolManagerMock:
  1394. def __init__(self) -> None:
  1395. self.headers: dict[str, str] = {}
  1396. def request(
  1397. self,
  1398. method,
  1399. url,
  1400. fields=None,
  1401. headers=None,
  1402. redirect=True,
  1403. preload_content=True,
  1404. ):
  1405. return HTTPResponse(
  1406. headers={
  1407. "Content-Type": "application/x-git-upload-pack-result; charset=utf-8"
  1408. },
  1409. request_method=method,
  1410. request_url=url,
  1411. preload_content=preload_content,
  1412. status=200,
  1413. )
  1414. clone_url = "https://hacktivis.me/git/blog.git/"
  1415. client = HttpGitClient(clone_url, pool_manager=PoolManagerMock(), config=None)
  1416. self.assertTrue(client._smart_request("git-upload-pack", clone_url, data=None))
  1417. def test_urllib3_protocol_error(self) -> None:
  1418. from urllib3.exceptions import ProtocolError
  1419. from urllib3.response import HTTPResponse
  1420. error_msg = "protocol error"
  1421. # we need to mock urllib3.PoolManager as this test will fail
  1422. # otherwise without an active internet connection
  1423. class PoolManagerMock:
  1424. def __init__(self) -> None:
  1425. self.headers: dict[str, str] = {}
  1426. def request(
  1427. self,
  1428. method,
  1429. url,
  1430. fields=None,
  1431. headers=None,
  1432. redirect=True,
  1433. preload_content=True,
  1434. ):
  1435. response = HTTPResponse(
  1436. headers={"Content-Type": "application/x-git-upload-pack-result"},
  1437. request_method=method,
  1438. request_url=url,
  1439. preload_content=preload_content,
  1440. status=200,
  1441. )
  1442. def read(self) -> NoReturn:
  1443. raise ProtocolError(error_msg)
  1444. # override HTTPResponse.read to throw urllib3.exceptions.ProtocolError
  1445. response.read = read
  1446. return response
  1447. def check_heads(heads, **kwargs):
  1448. self.assertEqual(heads, {})
  1449. return []
  1450. clone_url = "https://git.example.org/user/project.git/"
  1451. client = HttpGitClient(clone_url, pool_manager=PoolManagerMock(), config=None)
  1452. with self.assertRaises(GitProtocolError, msg=error_msg):
  1453. client.fetch_pack(b"/", check_heads, None, None)
  1454. def test_fetch_pack_dumb_http(self) -> None:
  1455. import zlib
  1456. from urllib3.response import HTTPResponse
  1457. # Mock responses for dumb HTTP
  1458. info_refs_content = (
  1459. b"0123456789abcdef0123456789abcdef01234567\trefs/heads/master\n"
  1460. )
  1461. head_content = b"ref: refs/heads/master"
  1462. # Create a blob object for testing
  1463. blob_content = b"Hello, dumb HTTP!"
  1464. blob_sha = b"0123456789abcdef0123456789abcdef01234567"
  1465. blob_hex = blob_sha.decode("ascii")
  1466. blob_obj_data = (
  1467. b"blob " + str(len(blob_content)).encode() + b"\x00" + blob_content
  1468. )
  1469. blob_compressed = zlib.compress(blob_obj_data)
  1470. responses = {
  1471. "/HEAD": {
  1472. "status": 200,
  1473. "content": head_content,
  1474. "content_type": "text/plain",
  1475. },
  1476. "/git-upload-pack": {
  1477. "status": 404,
  1478. "content": b"Not Found",
  1479. "content_type": "text/plain",
  1480. },
  1481. "/info/refs": {
  1482. "status": 200,
  1483. "content": info_refs_content,
  1484. "content_type": "text/plain",
  1485. },
  1486. f"/objects/{blob_hex[:2]}/{blob_hex[2:]}": {
  1487. "status": 200,
  1488. "content": blob_compressed,
  1489. "content_type": "application/octet-stream",
  1490. },
  1491. }
  1492. class PoolManagerMock:
  1493. def __init__(self) -> None:
  1494. self.headers: dict[str, str] = {}
  1495. def request(
  1496. self,
  1497. method,
  1498. url,
  1499. fields=None,
  1500. headers=None,
  1501. redirect=True,
  1502. preload_content=True,
  1503. ):
  1504. # Extract path from URL
  1505. from urllib.parse import urlparse
  1506. parsed = urlparse(url)
  1507. path = parsed.path.rstrip("/")
  1508. # Find matching response
  1509. for pattern, resp_data in responses.items():
  1510. if path.endswith(pattern):
  1511. return HTTPResponse(
  1512. body=BytesIO(resp_data["content"]),
  1513. headers={
  1514. "Content-Type": resp_data.get(
  1515. "content_type", "text/plain"
  1516. )
  1517. },
  1518. request_method=method,
  1519. request_url=url,
  1520. preload_content=preload_content,
  1521. status=resp_data["status"],
  1522. )
  1523. # Default 404
  1524. return HTTPResponse(
  1525. body=BytesIO(b"Not Found"),
  1526. headers={"Content-Type": "text/plain"},
  1527. request_method=method,
  1528. request_url=url,
  1529. preload_content=preload_content,
  1530. status=404,
  1531. )
  1532. def determine_wants(heads, **kwargs):
  1533. # heads contains the refs with SHA values, just return the SHA we want
  1534. return [heads[b"refs/heads/master"]]
  1535. received_data = []
  1536. def pack_data_handler(data):
  1537. # Collect pack data
  1538. received_data.append(data)
  1539. clone_url = "https://git.example.org/repo.git/"
  1540. client = HttpGitClient(clone_url, pool_manager=PoolManagerMock(), config=None)
  1541. # Mock graph walker that says we don't have anything
  1542. class MockGraphWalker:
  1543. def ack(self, sha):
  1544. return []
  1545. graph_walker = MockGraphWalker()
  1546. result = client.fetch_pack(
  1547. b"/", determine_wants, graph_walker, pack_data_handler
  1548. )
  1549. # Verify we got the refs
  1550. expected_sha = blob_hex.encode("ascii")
  1551. self.assertEqual({b"refs/heads/master": expected_sha}, result.refs)
  1552. # Verify we received pack data
  1553. self.assertTrue(len(received_data) > 0)
  1554. pack_data = b"".join(received_data)
  1555. self.assertTrue(len(pack_data) > 0)
  1556. # The pack should be valid pack format
  1557. self.assertTrue(pack_data.startswith(b"PACK"))
  1558. # Pack header: PACK + version (4 bytes) + num objects (4 bytes)
  1559. self.assertEqual(pack_data[4:8], b"\x00\x00\x00\x02") # version 2
  1560. self.assertEqual(pack_data[8:12], b"\x00\x00\x00\x01") # 1 object
  1561. def test_timeout_configuration(self) -> None:
  1562. """Test that timeout parameter is properly configured."""
  1563. url = "https://github.com/jelmer/dulwich"
  1564. timeout = 30
  1565. c = HttpGitClient(url, timeout=timeout)
  1566. self.assertEqual(c._timeout, timeout)
  1567. def test_timeout_from_config(self) -> None:
  1568. """Test that timeout can be configured via git config."""
  1569. url = "https://github.com/jelmer/dulwich"
  1570. config = ConfigDict()
  1571. config.set((b"http",), b"timeout", b"25")
  1572. c = HttpGitClient(url, config=config)
  1573. # The timeout should be set on the pool manager
  1574. # Since we can't easily access the timeout from the pool manager,
  1575. # we just verify the client was created successfully
  1576. self.assertIsNotNone(c.pool_manager)
  1577. def test_timeout_parameter_precedence(self) -> None:
  1578. """Test that explicit timeout parameter takes precedence over config."""
  1579. url = "https://github.com/jelmer/dulwich"
  1580. config = ConfigDict()
  1581. config.set((b"http",), b"timeout", b"25")
  1582. c = HttpGitClient(url, config=config, timeout=15)
  1583. self.assertEqual(c._timeout, 15)
  1584. def test_http_extra_headers_from_config(self) -> None:
  1585. """Test that http.extraHeader config values are applied."""
  1586. from dulwich.config import ConfigDict
  1587. url = "https://github.com/jelmer/dulwich"
  1588. config = ConfigDict()
  1589. # Set a single extra header
  1590. config.set((b"http",), b"extraHeader", b"X-Custom-Header: test-value")
  1591. c = HttpGitClient(url, config=config)
  1592. # Check that the header was added to the pool manager
  1593. self.assertIn("X-Custom-Header", c.pool_manager.headers)
  1594. self.assertEqual(c.pool_manager.headers["X-Custom-Header"], "test-value")
  1595. def test_http_multiple_extra_headers_from_config(self) -> None:
  1596. """Test that multiple http.extraHeader config values are applied."""
  1597. from dulwich.config import ConfigDict
  1598. url = "https://github.com/jelmer/dulwich"
  1599. config = ConfigDict()
  1600. # Set multiple extra headers
  1601. config.set((b"http",), b"extraHeader", b"X-Header-1: value1")
  1602. config.add((b"http",), b"extraHeader", b"X-Header-2: value2")
  1603. config.add((b"http",), b"extraHeader", b"Authorization: Bearer token123")
  1604. c = HttpGitClient(url, config=config)
  1605. # Check that all headers were added to the pool manager
  1606. self.assertIn("X-Header-1", c.pool_manager.headers)
  1607. self.assertEqual(c.pool_manager.headers["X-Header-1"], "value1")
  1608. self.assertIn("X-Header-2", c.pool_manager.headers)
  1609. self.assertEqual(c.pool_manager.headers["X-Header-2"], "value2")
  1610. self.assertIn("Authorization", c.pool_manager.headers)
  1611. self.assertEqual(c.pool_manager.headers["Authorization"], "Bearer token123")
  1612. def test_http_extra_headers_per_url_config(self) -> None:
  1613. """Test that per-URL http.extraHeader config values are applied (issue #882)."""
  1614. from dulwich.config import ConfigDict
  1615. url = "https://github.com/jelmer/dulwich"
  1616. config = ConfigDict()
  1617. # Set URL-specific extra header
  1618. config.set(
  1619. (b"http", b"https://github.com/"),
  1620. b"extraHeader",
  1621. b"Authorization: basic token123",
  1622. )
  1623. c = HttpGitClient(url, config=config)
  1624. # Check that the header was added to the pool manager
  1625. self.assertIn("Authorization", c.pool_manager.headers)
  1626. self.assertEqual(c.pool_manager.headers["Authorization"], "basic token123")
  1627. def test_http_extra_headers_url_specificity(self) -> None:
  1628. """Test that more specific URL configs override less specific ones."""
  1629. from dulwich.config import ConfigDict
  1630. url = "https://github.com/jelmer/dulwich"
  1631. config = ConfigDict()
  1632. # Set global header
  1633. config.set((b"http",), b"extraHeader", b"X-Global: global-value")
  1634. # Set host-specific header (overrides global)
  1635. config.set(
  1636. (b"http", b"https://github.com/"), b"extraHeader", b"X-Global: github-value"
  1637. )
  1638. config.add(
  1639. (b"http", b"https://github.com/"),
  1640. b"extraHeader",
  1641. b"Authorization: Bearer token123",
  1642. )
  1643. c = HttpGitClient(url, config=config)
  1644. # More specific setting should win
  1645. self.assertEqual(c.pool_manager.headers["X-Global"], "github-value")
  1646. self.assertEqual(c.pool_manager.headers["Authorization"], "Bearer token123")
  1647. def test_http_extra_headers_multiple_url_configs(self) -> None:
  1648. """Test that different URLs can have different extra headers."""
  1649. from dulwich.config import ConfigDict
  1650. config = ConfigDict()
  1651. # Set different headers for different URLs
  1652. config.set(
  1653. (b"http", b"https://github.com/"),
  1654. b"extraHeader",
  1655. b"Authorization: Bearer github-token",
  1656. )
  1657. config.set(
  1658. (b"http", b"https://gitlab.com/"),
  1659. b"extraHeader",
  1660. b"Authorization: Bearer gitlab-token",
  1661. )
  1662. # Test GitHub URL
  1663. c1 = HttpGitClient("https://github.com/user/repo", config=config)
  1664. self.assertEqual(
  1665. c1.pool_manager.headers["Authorization"], "Bearer github-token"
  1666. )
  1667. # Test GitLab URL
  1668. c2 = HttpGitClient("https://gitlab.com/user/repo", config=config)
  1669. self.assertEqual(
  1670. c2.pool_manager.headers["Authorization"], "Bearer gitlab-token"
  1671. )
  1672. def test_http_extra_headers_no_match(self) -> None:
  1673. """Test that non-matching URL configs don't apply."""
  1674. from dulwich.config import ConfigDict
  1675. url = "https://example.com/repo"
  1676. config = ConfigDict()
  1677. # Set header only for GitHub
  1678. config.set(
  1679. (b"http", b"https://github.com/"),
  1680. b"extraHeader",
  1681. b"Authorization: Bearer token123",
  1682. )
  1683. c = HttpGitClient(url, config=config)
  1684. # Authorization header should not be present for example.com
  1685. self.assertNotIn("Authorization", c.pool_manager.headers)
  1686. def test_http_extra_headers_invalid_format(self) -> None:
  1687. """Test that invalid extra headers trigger warnings."""
  1688. import logging
  1689. from dulwich.config import ConfigDict
  1690. url = "https://github.com/jelmer/dulwich"
  1691. config = ConfigDict()
  1692. # Set valid header
  1693. config.set((b"http",), b"extraHeader", b"X-Valid: valid-value")
  1694. # Set invalid headers (no colon-space separator)
  1695. config.add((b"http",), b"extraHeader", b"X-Invalid-No-Separator")
  1696. # Set empty header
  1697. config.add((b"http",), b"extraHeader", b"")
  1698. # Set another valid header to verify we continue processing
  1699. config.add((b"http",), b"extraHeader", b"X-Another-Valid: another-value")
  1700. with self.assertLogs("dulwich.client", level=logging.WARNING) as cm:
  1701. c = HttpGitClient(url, config=config)
  1702. # Check that warnings were logged
  1703. self.assertEqual(len(cm.output), 2)
  1704. self.assertIn("missing ': ' separator", cm.output[0])
  1705. self.assertIn("empty http.extraHeader", cm.output[1])
  1706. # Valid headers should still be applied
  1707. self.assertIn("X-Valid", c.pool_manager.headers)
  1708. self.assertEqual(c.pool_manager.headers["X-Valid"], "valid-value")
  1709. self.assertIn("X-Another-Valid", c.pool_manager.headers)
  1710. self.assertEqual(c.pool_manager.headers["X-Another-Valid"], "another-value")
  1711. # Invalid header should not be present
  1712. self.assertNotIn("X-Invalid-No-Separator", c.pool_manager.headers)
  1713. def test_get_url_preserves_credentials_from_url(self) -> None:
  1714. """Test that credentials from URL are preserved in get_url() (issue #1925)."""
  1715. # When credentials come from the URL (not passed explicitly),
  1716. # they should be included in get_url() so they're saved to git config
  1717. username = "ghp_token123"
  1718. url = f"https://{username}@github.com/jelmer/dulwich"
  1719. path = "/jelmer/dulwich"
  1720. c = HttpGitClient.from_parsedurl(urlparse(url))
  1721. reconstructed_url = c.get_url(path)
  1722. # Credentials should be preserved in the URL
  1723. self.assertIn(username, reconstructed_url)
  1724. self.assertEqual(
  1725. f"https://{username}@github.com/jelmer/dulwich", reconstructed_url
  1726. )
  1727. def test_get_url_preserves_credentials_with_password_from_url(self) -> None:
  1728. """Test that username:password from URL are preserved in get_url()."""
  1729. username = "user"
  1730. password = "pass"
  1731. url = f"https://{username}:{password}@github.com/jelmer/dulwich"
  1732. path = "/jelmer/dulwich"
  1733. c = HttpGitClient.from_parsedurl(urlparse(url))
  1734. reconstructed_url = c.get_url(path)
  1735. # Both username and password should be preserved
  1736. self.assertIn(username, reconstructed_url)
  1737. self.assertIn(password, reconstructed_url)
  1738. self.assertEqual(
  1739. f"https://{username}:{password}@github.com/jelmer/dulwich",
  1740. reconstructed_url,
  1741. )
  1742. def test_get_url_preserves_special_chars_in_credentials(self) -> None:
  1743. """Test that special characters in credentials are properly escaped."""
  1744. # URL-encoded credentials with special characters
  1745. original_username = "user@domain"
  1746. original_password = "p@ss:word"
  1747. quoted_username = urlquote(original_username, safe="")
  1748. quoted_password = urlquote(original_password, safe="")
  1749. url = f"https://{quoted_username}:{quoted_password}@github.com/jelmer/dulwich"
  1750. path = "/jelmer/dulwich"
  1751. c = HttpGitClient.from_parsedurl(urlparse(url))
  1752. reconstructed_url = c.get_url(path)
  1753. # The reconstructed URL should have properly escaped credentials
  1754. self.assertIn(quoted_username, reconstructed_url)
  1755. self.assertIn(quoted_password, reconstructed_url)
  1756. # Verify the URL is valid by parsing it back
  1757. parsed = urlparse(reconstructed_url)
  1758. from urllib.parse import unquote
  1759. self.assertEqual(unquote(parsed.username), original_username)
  1760. self.assertEqual(unquote(parsed.password), original_password)
  1761. def test_get_url_explicit_credentials_not_in_url(self) -> None:
  1762. """Test that explicitly passed credentials are NOT included in get_url()."""
  1763. # When credentials are passed explicitly (not from URL),
  1764. # they should NOT appear in get_url() for security
  1765. base_url = "https://github.com/jelmer/dulwich"
  1766. path = "/jelmer/dulwich"
  1767. username = "explicit_user"
  1768. password = "explicit_pass"
  1769. c = HttpGitClient(base_url, username=username, password=password)
  1770. url = c.get_url(path)
  1771. # Credentials should NOT be in the URL
  1772. self.assertNotIn(username, url)
  1773. self.assertNotIn(password, url)
  1774. self.assertEqual("https://github.com/jelmer/dulwich", url)
  1775. def test_pool_manager_parameter(self) -> None:
  1776. """Test that pool_manager parameter is properly passed through."""
  1777. import urllib3
  1778. # Create a custom pool manager
  1779. custom_pool_manager = urllib3.PoolManager()
  1780. # Test with get_transport_and_path_from_url
  1781. url = "https://github.com/jelmer/dulwich"
  1782. client, _path = get_transport_and_path_from_url(
  1783. url, pool_manager=custom_pool_manager
  1784. )
  1785. # Verify the client is an HTTP client and has our custom pool manager
  1786. self.assertIsInstance(client, HttpGitClient)
  1787. self.assertIs(client.pool_manager, custom_pool_manager)
  1788. # Test with get_transport_and_path
  1789. client2, _path2 = get_transport_and_path(url, pool_manager=custom_pool_manager)
  1790. # Verify the client is an HTTP client and has our custom pool manager
  1791. self.assertIsInstance(client2, HttpGitClient)
  1792. self.assertIs(client2.pool_manager, custom_pool_manager)
  1793. def test_urllib3_subclass_support(self) -> None:
  1794. """Test that subclasses of Urllib3HttpGitClient are properly supported.
  1795. This test verifies that the bug fix for commit d1f41c5c works correctly.
  1796. Previously, the code used `cls is Urllib3HttpGitClient` which failed for
  1797. subclasses. Now it uses `issubclass(cls, Urllib3HttpGitClient)` which
  1798. correctly handles subclasses.
  1799. """
  1800. # Create a custom subclass of Urllib3HttpGitClient
  1801. class CustomUrllib3HttpGitClient(Urllib3HttpGitClient):
  1802. def __init__(self, *args, **kwargs):
  1803. super().__init__(*args, **kwargs)
  1804. self.custom_attribute = "custom_value"
  1805. # Test with AbstractHttpGitClient.from_parsedurl directly
  1806. # This is how subclasses use the client
  1807. from urllib.parse import urlparse
  1808. parsed = urlparse("https://github.com/jelmer/dulwich")
  1809. config = ConfigDict()
  1810. client = CustomUrllib3HttpGitClient.from_parsedurl(parsed, config=config)
  1811. # Verify the client is our custom subclass
  1812. self.assertIsInstance(client, CustomUrllib3HttpGitClient)
  1813. self.assertIsInstance(client, Urllib3HttpGitClient)
  1814. self.assertEqual("custom_value", client.custom_attribute)
  1815. # Verify the config was passed through (this was the bug - it wasn't passed to subclasses before)
  1816. self.assertIsNotNone(client.config)
  1817. def test_auth_callbacks(self) -> None:
  1818. url = "https://github.com/jelmer/dulwich"
  1819. def auth_callback(url, www_authenticate, attempt):
  1820. return {"username": "user", "password": "pass"}
  1821. def proxy_auth_callback(url, proxy_authenticate, attempt):
  1822. return {"username": "proxy_user", "password": "proxy_pass"}
  1823. c = HttpGitClient(
  1824. url, auth_callback=auth_callback, proxy_auth_callback=proxy_auth_callback
  1825. )
  1826. # Check that the pool manager is wrapped with AuthCallbackPoolManager
  1827. self.assertIsInstance(c.pool_manager, AuthCallbackPoolManager)
  1828. self.assertEqual(c._auth_callback, auth_callback)
  1829. self.assertEqual(c._proxy_auth_callback, proxy_auth_callback)
  1830. class TCPGitClientTests(TestCase):
  1831. def test_get_url(self) -> None:
  1832. host = "github.com"
  1833. path = "/jelmer/dulwich"
  1834. c = TCPGitClient(host)
  1835. url = c.get_url(path)
  1836. self.assertEqual("git://github.com/jelmer/dulwich", url)
  1837. def test_get_url_with_port(self) -> None:
  1838. host = "github.com"
  1839. path = "/jelmer/dulwich"
  1840. port = 9090
  1841. c = TCPGitClient(host, port=port)
  1842. url = c.get_url(path)
  1843. self.assertEqual("git://github.com:9090/jelmer/dulwich", url)
  1844. def test_get_url_with_ipv6(self) -> None:
  1845. host = "::1"
  1846. path = "/jelmer/dulwich"
  1847. c = TCPGitClient(host)
  1848. url = c.get_url(path)
  1849. self.assertEqual("git://[::1]/jelmer/dulwich", url)
  1850. def test_get_url_with_ipv6_and_port(self) -> None:
  1851. host = "2001:db8::1"
  1852. path = "/jelmer/dulwich"
  1853. port = 9090
  1854. c = TCPGitClient(host, port=port)
  1855. url = c.get_url(path)
  1856. self.assertEqual("git://[2001:db8::1]:9090/jelmer/dulwich", url)
  1857. def test_get_url_with_ipv6_default_port(self) -> None:
  1858. host = "2001:db8::1"
  1859. path = "/jelmer/dulwich"
  1860. port = TCP_GIT_PORT # Default port should not be included in URL
  1861. c = TCPGitClient(host, port=port)
  1862. url = c.get_url(path)
  1863. self.assertEqual("git://[2001:db8::1]/jelmer/dulwich", url)
  1864. class AuthCallbackPoolManagerTest(TestCase):
  1865. def test_http_auth_callback(self) -> None:
  1866. # Create a mock pool manager
  1867. mock_pool_manager = Mock()
  1868. mock_response = Mock()
  1869. # First request returns 401
  1870. mock_response.status = 401
  1871. mock_response.headers = {"WWW-Authenticate": 'Basic realm="test"'}
  1872. # Second request (after auth) returns 200
  1873. mock_response_success = Mock()
  1874. mock_response_success.status = 200
  1875. mock_response_success.headers = {}
  1876. mock_pool_manager.request = MagicMock(
  1877. side_effect=[mock_response, mock_response_success]
  1878. )
  1879. # Auth callback that returns credentials
  1880. def auth_callback(url, www_authenticate, attempt):
  1881. if attempt == 1:
  1882. return {"username": "testuser", "password": "testpass"}
  1883. return None
  1884. # Create the wrapper
  1885. auth_manager = AuthCallbackPoolManager(
  1886. mock_pool_manager, auth_callback=auth_callback
  1887. )
  1888. # Make request
  1889. result = auth_manager.request("GET", "https://example.com/test")
  1890. # Verify two requests were made
  1891. self.assertEqual(mock_pool_manager.request.call_count, 2)
  1892. # Verify auth headers were added on second request
  1893. second_call_kwargs = mock_pool_manager.request.call_args_list[1][1]
  1894. self.assertIn("headers", second_call_kwargs)
  1895. # urllib3 returns lowercase header names
  1896. self.assertIn("authorization", second_call_kwargs["headers"])
  1897. # Result should be the successful response
  1898. self.assertEqual(result, mock_response_success)
  1899. def test_proxy_auth_callback(self) -> None:
  1900. # Create a mock pool manager
  1901. mock_pool_manager = Mock()
  1902. mock_response = Mock()
  1903. # First request returns 407
  1904. mock_response.status = 407
  1905. mock_response.headers = {"Proxy-Authenticate": 'Basic realm="proxy"'}
  1906. # Second request (after auth) returns 200
  1907. mock_response_success = Mock()
  1908. mock_response_success.status = 200
  1909. mock_response_success.headers = {}
  1910. mock_pool_manager.request = MagicMock(
  1911. side_effect=[mock_response, mock_response_success]
  1912. )
  1913. # Proxy auth callback that returns credentials
  1914. def proxy_auth_callback(url, proxy_authenticate, attempt):
  1915. if attempt == 1:
  1916. return {"username": "proxyuser", "password": "proxypass"}
  1917. return None
  1918. # Create the wrapper
  1919. auth_manager = AuthCallbackPoolManager(
  1920. mock_pool_manager, proxy_auth_callback=proxy_auth_callback
  1921. )
  1922. # Make request
  1923. result = auth_manager.request("GET", "https://example.com/test")
  1924. # Verify two requests were made
  1925. self.assertEqual(mock_pool_manager.request.call_count, 2)
  1926. # Verify proxy auth headers were added on second request
  1927. second_call_kwargs = mock_pool_manager.request.call_args_list[1][1]
  1928. self.assertIn("headers", second_call_kwargs)
  1929. # urllib3 returns lowercase header names
  1930. self.assertIn("proxy-authorization", second_call_kwargs["headers"])
  1931. # Result should be the successful response
  1932. self.assertEqual(result, mock_response_success)
  1933. def test_max_attempts(self) -> None:
  1934. # Create a mock pool manager that always returns 401
  1935. mock_pool_manager = Mock()
  1936. mock_response = Mock()
  1937. mock_response.status = 401
  1938. mock_response.headers = {"WWW-Authenticate": 'Basic realm="test"'}
  1939. mock_pool_manager.request.return_value = mock_response
  1940. # Auth callback that always returns credentials
  1941. def auth_callback(url, www_authenticate, attempt):
  1942. return {"username": "user", "password": "pass"}
  1943. # Create the wrapper
  1944. auth_manager = AuthCallbackPoolManager(
  1945. mock_pool_manager, auth_callback=auth_callback
  1946. )
  1947. # Make request
  1948. result = auth_manager.request("GET", "https://example.com/test")
  1949. # Should have made 3 attempts (initial + 2 retries)
  1950. self.assertEqual(mock_pool_manager.request.call_count, 3)
  1951. # Result should be the last 401 response
  1952. self.assertEqual(result.status, 401)
  1953. class DefaultUrllib3ManagerTest(TestCase):
  1954. def test_no_config(self) -> None:
  1955. manager = default_urllib3_manager(config=None)
  1956. self.assertEqual(manager.connection_pool_kw["cert_reqs"], "CERT_REQUIRED")
  1957. def test_auth_callbacks(self) -> None:
  1958. def auth_callback(url, www_authenticate, attempt):
  1959. return {"username": "user", "password": "pass"}
  1960. def proxy_auth_callback(url, proxy_authenticate, attempt):
  1961. return {"username": "proxy_user", "password": "proxy_pass"}
  1962. manager = default_urllib3_manager(
  1963. config=None,
  1964. auth_callback=auth_callback,
  1965. proxy_auth_callback=proxy_auth_callback,
  1966. )
  1967. self.assertIsInstance(manager, AuthCallbackPoolManager)
  1968. self.assertEqual(manager._auth_callback, auth_callback)
  1969. self.assertEqual(manager._proxy_auth_callback, proxy_auth_callback)
  1970. def test_proxy_auth_method_unsupported(self) -> None:
  1971. import os
  1972. # Test with config
  1973. config = ConfigDict()
  1974. config.set((b"http",), b"proxy", b"http://user@proxy.example.com:8080")
  1975. config.set((b"http",), b"proxyAuthMethod", b"digest")
  1976. with self.assertRaises(NotImplementedError) as cm:
  1977. default_urllib3_manager(config=config)
  1978. self.assertIn("digest", str(cm.exception))
  1979. self.assertIn("not supported", str(cm.exception))
  1980. # Test with environment variable
  1981. config = ConfigDict()
  1982. config.set((b"http",), b"proxy", b"http://user@proxy.example.com:8080")
  1983. old_env = os.environ.get("GIT_HTTP_PROXY_AUTHMETHOD")
  1984. try:
  1985. os.environ["GIT_HTTP_PROXY_AUTHMETHOD"] = "ntlm"
  1986. with self.assertRaises(NotImplementedError) as cm:
  1987. default_urllib3_manager(config=config)
  1988. self.assertIn("ntlm", str(cm.exception))
  1989. self.assertIn("not supported", str(cm.exception))
  1990. finally:
  1991. if old_env is None:
  1992. os.environ.pop("GIT_HTTP_PROXY_AUTHMETHOD", None)
  1993. else:
  1994. os.environ["GIT_HTTP_PROXY_AUTHMETHOD"] = old_env
  1995. def test_proxy_auth_method_supported(self) -> None:
  1996. # Test basic auth method
  1997. config = ConfigDict()
  1998. config.set((b"http",), b"proxy", b"http://user@proxy.example.com:8080")
  1999. config.set((b"http",), b"proxyAuthMethod", b"basic")
  2000. # Should not raise
  2001. manager = default_urllib3_manager(config=config)
  2002. self.assertIsNotNone(manager)
  2003. # Test anyauth (default)
  2004. config.set((b"http",), b"proxyAuthMethod", b"anyauth")
  2005. manager = default_urllib3_manager(config=config)
  2006. self.assertIsNotNone(manager)
  2007. def test_config_no_proxy(self) -> None:
  2008. import urllib3
  2009. manager = default_urllib3_manager(config=ConfigDict())
  2010. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  2011. self.assertIsInstance(manager, urllib3.PoolManager)
  2012. def test_config_no_proxy_custom_cls(self) -> None:
  2013. import urllib3
  2014. class CustomPoolManager(urllib3.PoolManager):
  2015. pass
  2016. manager = default_urllib3_manager(
  2017. config=ConfigDict(), pool_manager_cls=CustomPoolManager
  2018. )
  2019. self.assertIsInstance(manager, CustomPoolManager)
  2020. def test_config_ssl(self) -> None:
  2021. config = ConfigDict()
  2022. config.set(b"http", b"sslVerify", b"true")
  2023. manager = default_urllib3_manager(config=config)
  2024. self.assertEqual(manager.connection_pool_kw["cert_reqs"], "CERT_REQUIRED")
  2025. def test_config_no_ssl(self) -> None:
  2026. config = ConfigDict()
  2027. config.set(b"http", b"sslVerify", b"false")
  2028. manager = default_urllib3_manager(config=config)
  2029. self.assertEqual(manager.connection_pool_kw["cert_reqs"], "CERT_NONE")
  2030. def test_config_proxy(self) -> None:
  2031. import urllib3
  2032. config = ConfigDict()
  2033. config.set(b"http", b"proxy", b"http://localhost:3128/")
  2034. manager = default_urllib3_manager(config=config)
  2035. self.assertIsInstance(manager, urllib3.ProxyManager)
  2036. self.assertTrue(hasattr(manager, "proxy"))
  2037. self.assertEqual(manager.proxy.scheme, "http")
  2038. self.assertEqual(manager.proxy.host, "localhost")
  2039. self.assertEqual(manager.proxy.port, 3128)
  2040. def test_environment_proxy(self) -> None:
  2041. import urllib3
  2042. config = ConfigDict()
  2043. self.overrideEnv("http_proxy", "http://myproxy:8080")
  2044. manager = default_urllib3_manager(config=config)
  2045. self.assertIsInstance(manager, urllib3.ProxyManager)
  2046. self.assertTrue(hasattr(manager, "proxy"))
  2047. self.assertEqual(manager.proxy.scheme, "http")
  2048. self.assertEqual(manager.proxy.host, "myproxy")
  2049. self.assertEqual(manager.proxy.port, 8080)
  2050. def test_environment_empty_proxy(self) -> None:
  2051. import urllib3
  2052. config = ConfigDict()
  2053. self.overrideEnv("http_proxy", "")
  2054. manager = default_urllib3_manager(config=config)
  2055. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  2056. self.assertIsInstance(manager, urllib3.PoolManager)
  2057. def test_environment_no_proxy_1(self) -> None:
  2058. import urllib3
  2059. config = ConfigDict()
  2060. self.overrideEnv("http_proxy", "http://myproxy:8080")
  2061. self.overrideEnv("no_proxy", "xyz,abc.def.gh,abc.gh")
  2062. base_url = "http://xyz.abc.def.gh:8080/path/port"
  2063. manager = default_urllib3_manager(config=config, base_url=base_url)
  2064. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  2065. self.assertIsInstance(manager, urllib3.PoolManager)
  2066. def test_environment_no_proxy_2(self) -> None:
  2067. import urllib3
  2068. config = ConfigDict()
  2069. self.overrideEnv("http_proxy", "http://myproxy:8080")
  2070. self.overrideEnv("no_proxy", "xyz,abc.def.gh,abc.gh,ample.com")
  2071. base_url = "http://ample.com/path/port"
  2072. manager = default_urllib3_manager(config=config, base_url=base_url)
  2073. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  2074. self.assertIsInstance(manager, urllib3.PoolManager)
  2075. def test_environment_no_proxy_3(self) -> None:
  2076. import urllib3
  2077. config = ConfigDict()
  2078. self.overrideEnv("http_proxy", "http://myproxy:8080")
  2079. self.overrideEnv("no_proxy", "xyz,abc.def.gh,abc.gh,ample.com")
  2080. base_url = "http://ample.com:80/path/port"
  2081. manager = default_urllib3_manager(config=config, base_url=base_url)
  2082. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  2083. self.assertIsInstance(manager, urllib3.PoolManager)
  2084. def test_environment_no_proxy_4(self) -> None:
  2085. import urllib3
  2086. config = ConfigDict()
  2087. self.overrideEnv("http_proxy", "http://myproxy:8080")
  2088. self.overrideEnv("no_proxy", "xyz,abc.def.gh,abc.gh,ample.com")
  2089. base_url = "http://www.ample.com/path/port"
  2090. manager = default_urllib3_manager(config=config, base_url=base_url)
  2091. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  2092. self.assertIsInstance(manager, urllib3.PoolManager)
  2093. def test_environment_no_proxy_5(self) -> None:
  2094. import urllib3
  2095. config = ConfigDict()
  2096. self.overrideEnv("http_proxy", "http://myproxy:8080")
  2097. self.overrideEnv("no_proxy", "xyz,abc.def.gh,abc.gh,ample.com")
  2098. base_url = "http://www.example.com/path/port"
  2099. manager = default_urllib3_manager(config=config, base_url=base_url)
  2100. self.assertIsInstance(manager, urllib3.ProxyManager)
  2101. self.assertTrue(hasattr(manager, "proxy"))
  2102. self.assertEqual(manager.proxy.scheme, "http")
  2103. self.assertEqual(manager.proxy.host, "myproxy")
  2104. self.assertEqual(manager.proxy.port, 8080)
  2105. def test_environment_no_proxy_6(self) -> None:
  2106. import urllib3
  2107. config = ConfigDict()
  2108. self.overrideEnv("http_proxy", "http://myproxy:8080")
  2109. self.overrideEnv("no_proxy", "xyz,abc.def.gh,abc.gh,ample.com")
  2110. base_url = "http://ample.com.org/path/port"
  2111. manager = default_urllib3_manager(config=config, base_url=base_url)
  2112. self.assertIsInstance(manager, urllib3.ProxyManager)
  2113. self.assertTrue(hasattr(manager, "proxy"))
  2114. self.assertEqual(manager.proxy.scheme, "http")
  2115. self.assertEqual(manager.proxy.host, "myproxy")
  2116. self.assertEqual(manager.proxy.port, 8080)
  2117. def test_environment_no_proxy_ipv4_address_1(self) -> None:
  2118. import urllib3
  2119. config = ConfigDict()
  2120. self.overrideEnv("http_proxy", "http://myproxy:8080")
  2121. self.overrideEnv("no_proxy", "xyz,abc.def.gh,192.168.0.10,ample.com")
  2122. base_url = "http://192.168.0.10/path/port"
  2123. manager = default_urllib3_manager(config=config, base_url=base_url)
  2124. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  2125. self.assertIsInstance(manager, urllib3.PoolManager)
  2126. def test_environment_no_proxy_ipv4_address_2(self) -> None:
  2127. import urllib3
  2128. config = ConfigDict()
  2129. self.overrideEnv("http_proxy", "http://myproxy:8080")
  2130. self.overrideEnv("no_proxy", "xyz,abc.def.gh,192.168.0.10,ample.com")
  2131. base_url = "http://192.168.0.10:8888/path/port"
  2132. manager = default_urllib3_manager(config=config, base_url=base_url)
  2133. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  2134. self.assertIsInstance(manager, urllib3.PoolManager)
  2135. def test_environment_no_proxy_ipv4_address_3(self) -> None:
  2136. import urllib3
  2137. config = ConfigDict()
  2138. self.overrideEnv("http_proxy", "http://myproxy:8080")
  2139. self.overrideEnv(
  2140. "no_proxy", "xyz,abc.def.gh,ff80:1::/64,192.168.0.0/24,ample.com"
  2141. )
  2142. base_url = "http://192.168.0.10/path/port"
  2143. manager = default_urllib3_manager(config=config, base_url=base_url)
  2144. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  2145. self.assertIsInstance(manager, urllib3.PoolManager)
  2146. def test_environment_no_proxy_ipv6_address_1(self) -> None:
  2147. import urllib3
  2148. config = ConfigDict()
  2149. self.overrideEnv("http_proxy", "http://myproxy:8080")
  2150. self.overrideEnv("no_proxy", "xyz,abc.def.gh,ff80:1::affe,ample.com")
  2151. base_url = "http://[ff80:1::affe]/path/port"
  2152. manager = default_urllib3_manager(config=config, base_url=base_url)
  2153. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  2154. self.assertIsInstance(manager, urllib3.PoolManager)
  2155. def test_environment_no_proxy_ipv6_address_2(self) -> None:
  2156. import urllib3
  2157. config = ConfigDict()
  2158. self.overrideEnv("http_proxy", "http://myproxy:8080")
  2159. self.overrideEnv("no_proxy", "xyz,abc.def.gh,ff80:1::affe,ample.com")
  2160. base_url = "http://[ff80:1::affe]:1234/path/port"
  2161. manager = default_urllib3_manager(config=config, base_url=base_url)
  2162. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  2163. self.assertIsInstance(manager, urllib3.PoolManager)
  2164. def test_environment_no_proxy_ipv6_address_3(self) -> None:
  2165. import urllib3
  2166. config = ConfigDict()
  2167. self.overrideEnv("http_proxy", "http://myproxy:8080")
  2168. self.overrideEnv(
  2169. "no_proxy", "xyz,abc.def.gh,192.168.0.0/24,ff80:1::/64,ample.com"
  2170. )
  2171. base_url = "http://[ff80:1::affe]/path/port"
  2172. manager = default_urllib3_manager(config=config, base_url=base_url)
  2173. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  2174. self.assertIsInstance(manager, urllib3.PoolManager)
  2175. def test_config_proxy_custom_cls(self) -> None:
  2176. import urllib3
  2177. class CustomProxyManager(urllib3.ProxyManager):
  2178. pass
  2179. config = ConfigDict()
  2180. config.set(b"http", b"proxy", b"http://localhost:3128/")
  2181. manager = default_urllib3_manager(
  2182. config=config, proxy_manager_cls=CustomProxyManager
  2183. )
  2184. self.assertIsInstance(manager, CustomProxyManager)
  2185. def test_config_proxy_creds(self) -> None:
  2186. import urllib3
  2187. config = ConfigDict()
  2188. config.set(b"http", b"proxy", b"http://jelmer:example@localhost:3128/")
  2189. manager = default_urllib3_manager(config=config)
  2190. assert isinstance(manager, urllib3.ProxyManager)
  2191. self.assertEqual(
  2192. manager.proxy_headers, {"proxy-authorization": "Basic amVsbWVyOmV4YW1wbGU="}
  2193. )
  2194. def test_config_no_verify_ssl(self) -> None:
  2195. manager = default_urllib3_manager(config=None, cert_reqs="CERT_NONE")
  2196. self.assertEqual(manager.connection_pool_kw["cert_reqs"], "CERT_NONE")
  2197. def test_timeout_parameter(self) -> None:
  2198. """Test that timeout parameter is passed to urllib3 manager."""
  2199. timeout = 30
  2200. manager = default_urllib3_manager(config=None, timeout=timeout)
  2201. self.assertEqual(manager.connection_pool_kw["timeout"], timeout)
  2202. def test_timeout_from_config(self) -> None:
  2203. """Test that timeout can be configured via git config."""
  2204. config = ConfigDict()
  2205. config.set((b"http",), b"timeout", b"25")
  2206. manager = default_urllib3_manager(config=config)
  2207. self.assertEqual(manager.connection_pool_kw["timeout"], 25)
  2208. def test_timeout_parameter_precedence(self) -> None:
  2209. """Test that explicit timeout parameter takes precedence over config."""
  2210. config = ConfigDict()
  2211. config.set((b"http",), b"timeout", b"25")
  2212. manager = default_urllib3_manager(config=config, timeout=15)
  2213. self.assertEqual(manager.connection_pool_kw["timeout"], 15)
  2214. class SubprocessSSHVendorTests(TestCase):
  2215. def setUp(self) -> None:
  2216. # Monkey Patch client subprocess popen
  2217. self._orig_popen = dulwich.client.subprocess.Popen
  2218. dulwich.client.subprocess.Popen = DummyPopen
  2219. def tearDown(self) -> None:
  2220. dulwich.client.subprocess.Popen = self._orig_popen
  2221. def test_run_command_dashes(self) -> None:
  2222. vendor = SubprocessSSHVendor()
  2223. self.assertRaises(
  2224. StrangeHostname,
  2225. vendor.run_command,
  2226. "--weird-host",
  2227. "git-clone-url",
  2228. )
  2229. def test_run_command_password(self) -> None:
  2230. vendor = SubprocessSSHVendor()
  2231. self.assertRaises(
  2232. NotImplementedError,
  2233. vendor.run_command,
  2234. "host",
  2235. "git-clone-url",
  2236. password="12345",
  2237. )
  2238. def test_run_command_password_and_privkey(self) -> None:
  2239. vendor = SubprocessSSHVendor()
  2240. self.assertRaises(
  2241. NotImplementedError,
  2242. vendor.run_command,
  2243. "host",
  2244. "git-clone-url",
  2245. password="12345",
  2246. key_filename="/tmp/id_rsa",
  2247. )
  2248. def test_run_command_with_port_username_and_privkey(self) -> None:
  2249. expected = [
  2250. "ssh",
  2251. "-x",
  2252. "-p",
  2253. "2200",
  2254. "-i",
  2255. "/tmp/id_rsa",
  2256. ]
  2257. if DEFAULT_GIT_PROTOCOL_VERSION_FETCH:
  2258. expected += [
  2259. "-o",
  2260. f"SetEnv GIT_PROTOCOL=version={DEFAULT_GIT_PROTOCOL_VERSION_FETCH}",
  2261. ]
  2262. expected += [
  2263. "user@host",
  2264. "git-clone-url",
  2265. ]
  2266. vendor = SubprocessSSHVendor()
  2267. command = vendor.run_command(
  2268. "host",
  2269. "git-clone-url",
  2270. username="user",
  2271. port="2200",
  2272. key_filename="/tmp/id_rsa",
  2273. )
  2274. args = command.proc.args
  2275. self.assertListEqual(expected, args[0])
  2276. def test_run_with_ssh_command(self) -> None:
  2277. expected = [
  2278. "/path/to/ssh",
  2279. "-o",
  2280. "Option=Value",
  2281. "-x",
  2282. ]
  2283. if DEFAULT_GIT_PROTOCOL_VERSION_FETCH:
  2284. expected += [
  2285. "-o",
  2286. f"SetEnv GIT_PROTOCOL=version={DEFAULT_GIT_PROTOCOL_VERSION_FETCH}",
  2287. ]
  2288. expected += [
  2289. "host",
  2290. "git-clone-url",
  2291. ]
  2292. vendor = SubprocessSSHVendor()
  2293. command = vendor.run_command(
  2294. "host",
  2295. "git-clone-url",
  2296. ssh_command="/path/to/ssh -o Option=Value",
  2297. )
  2298. args = command.proc.args
  2299. self.assertListEqual(expected, args[0])
  2300. class PLinkSSHVendorTests(TestCase):
  2301. def setUp(self) -> None:
  2302. # Monkey Patch client subprocess popen
  2303. self._orig_popen = dulwich.client.subprocess.Popen
  2304. dulwich.client.subprocess.Popen = DummyPopen
  2305. def tearDown(self) -> None:
  2306. dulwich.client.subprocess.Popen = self._orig_popen
  2307. def test_run_command_dashes(self) -> None:
  2308. vendor = PLinkSSHVendor()
  2309. self.assertRaises(
  2310. StrangeHostname,
  2311. vendor.run_command,
  2312. "--weird-host",
  2313. "git-clone-url",
  2314. )
  2315. def test_run_command_password_and_privkey(self) -> None:
  2316. vendor = PLinkSSHVendor()
  2317. warnings.simplefilter("always", UserWarning)
  2318. self.addCleanup(warnings.resetwarnings)
  2319. warnings_list, restore_warnings = setup_warning_catcher()
  2320. self.addCleanup(restore_warnings)
  2321. command = vendor.run_command(
  2322. "host",
  2323. "git-clone-url",
  2324. password="12345",
  2325. key_filename="/tmp/id_rsa",
  2326. )
  2327. expected_warning = UserWarning(
  2328. "Invoking PLink with a password exposes the password in the process list."
  2329. )
  2330. for w in warnings_list:
  2331. if type(w) is type(expected_warning) and w.args == expected_warning.args:
  2332. break
  2333. else:
  2334. raise AssertionError(
  2335. f"Expected warning {expected_warning!r} not in {warnings_list!r}"
  2336. )
  2337. args = command.proc.args
  2338. if sys.platform == "win32":
  2339. binary = ["plink.exe", "-ssh"]
  2340. else:
  2341. binary = ["plink", "-ssh"]
  2342. expected = [
  2343. *binary,
  2344. "-pw",
  2345. "12345",
  2346. "-i",
  2347. "/tmp/id_rsa",
  2348. "host",
  2349. "git-clone-url",
  2350. ]
  2351. self.assertListEqual(expected, args[0])
  2352. def test_run_command_password(self) -> None:
  2353. if sys.platform == "win32":
  2354. binary = ["plink.exe", "-ssh"]
  2355. else:
  2356. binary = ["plink", "-ssh"]
  2357. expected = [*binary, "-pw", "12345", "host", "git-clone-url"]
  2358. vendor = PLinkSSHVendor()
  2359. warnings.simplefilter("always", UserWarning)
  2360. self.addCleanup(warnings.resetwarnings)
  2361. warnings_list, restore_warnings = setup_warning_catcher()
  2362. self.addCleanup(restore_warnings)
  2363. command = vendor.run_command("host", "git-clone-url", password="12345")
  2364. expected_warning = UserWarning(
  2365. "Invoking PLink with a password exposes the password in the process list."
  2366. )
  2367. for w in warnings_list:
  2368. if type(w) is type(expected_warning) and w.args == expected_warning.args:
  2369. break
  2370. else:
  2371. raise AssertionError(
  2372. f"Expected warning {expected_warning!r} not in {warnings_list!r}"
  2373. )
  2374. args = command.proc.args
  2375. self.assertListEqual(expected, args[0])
  2376. def test_run_command_with_port_username_and_privkey(self) -> None:
  2377. if sys.platform == "win32":
  2378. binary = ["plink.exe", "-ssh"]
  2379. else:
  2380. binary = ["plink", "-ssh"]
  2381. expected = [
  2382. *binary,
  2383. "-P",
  2384. "2200",
  2385. "-i",
  2386. "/tmp/id_rsa",
  2387. "user@host",
  2388. "git-clone-url",
  2389. ]
  2390. vendor = PLinkSSHVendor()
  2391. command = vendor.run_command(
  2392. "host",
  2393. "git-clone-url",
  2394. username="user",
  2395. port="2200",
  2396. key_filename="/tmp/id_rsa",
  2397. )
  2398. args = command.proc.args
  2399. self.assertListEqual(expected, args[0])
  2400. def test_run_with_ssh_command(self) -> None:
  2401. expected = [
  2402. "/path/to/plink",
  2403. "-ssh",
  2404. "host",
  2405. "git-clone-url",
  2406. ]
  2407. vendor = PLinkSSHVendor()
  2408. command = vendor.run_command(
  2409. "host",
  2410. "git-clone-url",
  2411. ssh_command="/path/to/plink",
  2412. )
  2413. args = command.proc.args
  2414. self.assertListEqual(expected, args[0])
  2415. class RsyncUrlTests(TestCase):
  2416. def test_simple(self) -> None:
  2417. self.assertEqual(parse_rsync_url("foo:bar/path"), (None, "foo", "bar/path"))
  2418. self.assertEqual(
  2419. parse_rsync_url("user@foo:bar/path"), ("user", "foo", "bar/path")
  2420. )
  2421. def test_path(self) -> None:
  2422. self.assertRaises(ValueError, parse_rsync_url, "/path")
  2423. class CheckWantsTests(TestCase):
  2424. def test_fine(self) -> None:
  2425. check_wants(
  2426. [b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262"],
  2427. {b"refs/heads/blah": b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262"},
  2428. )
  2429. def test_missing(self) -> None:
  2430. self.assertRaises(
  2431. InvalidWants,
  2432. check_wants,
  2433. [b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262"],
  2434. {b"refs/heads/blah": b"3f3dc7a53fb752a6961d3a56683df46d4d3bf262"},
  2435. )
  2436. def test_annotated(self) -> None:
  2437. self.assertRaises(
  2438. InvalidWants,
  2439. check_wants,
  2440. [b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262"],
  2441. {
  2442. b"refs/heads/blah": b"3f3dc7a53fb752a6961d3a56683df46d4d3bf262",
  2443. b"refs/heads/blah^{}": b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262",
  2444. },
  2445. )
  2446. class FetchPackResultTests(TestCase):
  2447. def test_eq(self) -> None:
  2448. self.assertEqual(
  2449. FetchPackResult(
  2450. {b"refs/heads/master": b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262"},
  2451. {},
  2452. b"user/agent",
  2453. ),
  2454. FetchPackResult(
  2455. {b"refs/heads/master": b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262"},
  2456. {},
  2457. b"user/agent",
  2458. ),
  2459. )
  2460. class GitCredentialStoreTests(TestCase):
  2461. @classmethod
  2462. def setUpClass(cls) -> None:
  2463. with tempfile.NamedTemporaryFile(delete=False) as f:
  2464. f.write(b"https://user:pass@example.org\n")
  2465. cls.fname = f.name
  2466. @classmethod
  2467. def tearDownClass(cls) -> None:
  2468. os.unlink(cls.fname)
  2469. def test_nonmatching_scheme(self) -> None:
  2470. result = list(
  2471. get_credentials_from_store("http", "example.org", fnames=[self.fname])
  2472. )
  2473. self.assertEqual(result, [])
  2474. def test_nonmatching_hostname(self) -> None:
  2475. result = list(
  2476. get_credentials_from_store("https", "noentry.org", fnames=[self.fname])
  2477. )
  2478. self.assertEqual(result, [])
  2479. def test_match_without_username(self) -> None:
  2480. result = list(
  2481. get_credentials_from_store("https", "example.org", fnames=[self.fname])
  2482. )
  2483. self.assertEqual(result, [("user", "pass")])
  2484. def test_match_with_matching_username(self) -> None:
  2485. result = list(
  2486. get_credentials_from_store(
  2487. "https", "example.org", "user", fnames=[self.fname]
  2488. )
  2489. )
  2490. self.assertEqual(result, [("user", "pass")])
  2491. def test_no_match_with_nonmatching_username(self) -> None:
  2492. result = list(
  2493. get_credentials_from_store(
  2494. "https", "example.org", "otheruser", fnames=[self.fname]
  2495. )
  2496. )
  2497. self.assertEqual(result, [])
  2498. class RemoteErrorFromStderrTests(TestCase):
  2499. def test_nothing(self) -> None:
  2500. self.assertEqual(_remote_error_from_stderr(None), HangupException())
  2501. def test_error_line(self) -> None:
  2502. b = BytesIO(
  2503. b"""\
  2504. This is some random output.
  2505. ERROR: This is the actual error
  2506. with a tail
  2507. """
  2508. )
  2509. self.assertEqual(
  2510. _remote_error_from_stderr(b),
  2511. GitProtocolError("This is the actual error"),
  2512. )
  2513. def test_no_error_line(self) -> None:
  2514. b = BytesIO(
  2515. b"""\
  2516. This is output without an error line.
  2517. And this line is just random noise, too.
  2518. """
  2519. )
  2520. self.assertEqual(
  2521. _remote_error_from_stderr(b),
  2522. HangupException(
  2523. [
  2524. b"This is output without an error line.",
  2525. b"And this line is just random noise, too.",
  2526. ]
  2527. ),
  2528. )
  2529. class TestExtractAgentAndSymrefs(TestCase):
  2530. def test_extract_agent_and_symrefs(self) -> None:
  2531. (symrefs, agent) = _extract_symrefs_and_agent(
  2532. [b"agent=git/2.31.1", b"symref=HEAD:refs/heads/master"]
  2533. )
  2534. self.assertEqual(agent, b"git/2.31.1")
  2535. self.assertEqual(symrefs, {b"HEAD": b"refs/heads/master"})