test_client.py 99 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762
  1. # test_client.py -- Tests for the git protocol, client side
  2. # Copyright (C) 2009 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. import base64
  22. import os
  23. import shutil
  24. import sys
  25. import tempfile
  26. import warnings
  27. from io import BytesIO
  28. from typing import NoReturn
  29. from unittest.mock import patch
  30. from urllib.parse import quote as urlquote
  31. from urllib.parse import urlparse
  32. import dulwich
  33. from dulwich import client
  34. from dulwich.bundle import create_bundle_from_repo, write_bundle
  35. from dulwich.client import (
  36. BundleClient,
  37. FetchPackResult,
  38. GitProtocolError,
  39. HangupException,
  40. HttpGitClient,
  41. InvalidWants,
  42. LocalGitClient,
  43. PLinkSSHVendor,
  44. ReportStatusParser,
  45. SendPackError,
  46. SSHGitClient,
  47. StrangeHostname,
  48. SubprocessSSHVendor,
  49. TCPGitClient,
  50. TraditionalGitClient,
  51. Urllib3HttpGitClient,
  52. _extract_symrefs_and_agent,
  53. _remote_error_from_stderr,
  54. _win32_url_to_path,
  55. check_wants,
  56. default_urllib3_manager,
  57. get_credentials_from_store,
  58. get_transport_and_path,
  59. get_transport_and_path_from_url,
  60. parse_rsync_url,
  61. )
  62. from dulwich.config import ConfigDict
  63. from dulwich.object_format import DEFAULT_OBJECT_FORMAT
  64. from dulwich.objects import ZERO_SHA, Blob, Commit, Tree
  65. from dulwich.pack import pack_objects_to_data, write_pack_data, write_pack_objects
  66. from dulwich.protocol import DEFAULT_GIT_PROTOCOL_VERSION_FETCH, TCP_GIT_PORT, Protocol
  67. from dulwich.repo import MemoryRepo, Repo
  68. from dulwich.tests.utils import open_repo, setup_warning_catcher, tear_down_repo
  69. from . import TestCase, skipIf
  70. class DummyClient(TraditionalGitClient):
  71. def __init__(self, can_read, read, write) -> None:
  72. self.can_read = can_read
  73. self.read = read
  74. self.write = write
  75. TraditionalGitClient.__init__(self)
  76. def _connect(self, service, path, protocol_version=None):
  77. return Protocol(self.read, self.write), self.can_read, None
  78. class DummyPopen:
  79. def __init__(self, *args, **kwards) -> None:
  80. self.stdin = BytesIO(b"stdin")
  81. self.stdout = BytesIO(b"stdout")
  82. self.stderr = BytesIO(b"stderr")
  83. self.returncode = 0
  84. self.args = args
  85. self.kwargs = kwards
  86. def communicate(self, *args, **kwards):
  87. return ("Running", "")
  88. def wait(self, *args, **kwards) -> bool:
  89. return False
  90. # TODO(durin42): add unit-level tests of GitClient
  91. class GitClientTests(TestCase):
  92. def setUp(self) -> None:
  93. super().setUp()
  94. self.rout = BytesIO()
  95. self.rin = BytesIO()
  96. self.client = DummyClient(lambda x: True, self.rin.read, self.rout.write)
  97. def test_caps(self) -> None:
  98. agent_cap = "agent=dulwich/{}.{}.{}".format(*dulwich.__version__).encode(
  99. "ascii"
  100. )
  101. self.assertEqual(
  102. {
  103. b"multi_ack",
  104. b"side-band-64k",
  105. b"ofs-delta",
  106. b"thin-pack",
  107. b"multi_ack_detailed",
  108. b"shallow",
  109. agent_cap,
  110. },
  111. set(self.client._fetch_capabilities),
  112. )
  113. self.assertEqual(
  114. {
  115. b"delete-refs",
  116. b"ofs-delta",
  117. b"report-status",
  118. b"side-band-64k",
  119. agent_cap,
  120. },
  121. set(self.client._send_capabilities),
  122. )
  123. def test_archive_ack(self) -> None:
  124. self.rin.write(b"0009NACK\n0000")
  125. self.rin.seek(0)
  126. self.client.archive(b"bla", b"HEAD", None, None)
  127. self.assertEqual(self.rout.getvalue(), b"0011argument HEAD0000")
  128. def test_fetch_empty(self) -> None:
  129. self.rin.write(b"0000")
  130. self.rin.seek(0)
  131. def check_heads(heads, **kwargs):
  132. self.assertEqual(heads, {})
  133. return []
  134. ret = self.client.fetch_pack(b"/", check_heads, None, None)
  135. self.assertEqual({}, ret.refs)
  136. self.assertEqual({}, ret.symrefs)
  137. def test_fetch_pack_ignores_magic_ref(self) -> None:
  138. self.rin.write(
  139. b"00000000000000000000000000000000000000000000 capabilities^{}"
  140. b"\x00 multi_ack "
  141. b"thin-pack side-band side-band-64k ofs-delta shallow no-progress "
  142. b"include-tag\n"
  143. b"0000"
  144. )
  145. self.rin.seek(0)
  146. def check_heads(heads, **kwargs):
  147. self.assertEqual({}, heads)
  148. return []
  149. ret = self.client.fetch_pack(b"bla", check_heads, None, None, None)
  150. self.assertEqual({}, ret.refs)
  151. self.assertEqual({}, ret.symrefs)
  152. self.assertEqual(self.rout.getvalue(), b"0000")
  153. def test_fetch_pack_none(self) -> None:
  154. self.rin.write(
  155. b"008855dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7 HEAD\x00multi_ack "
  156. b"thin-pack side-band side-band-64k ofs-delta shallow no-progress "
  157. b"include-tag\n"
  158. b"0000"
  159. )
  160. self.rin.seek(0)
  161. ret = self.client.fetch_pack(
  162. b"bla", lambda heads, depth=None: [], None, None, None
  163. )
  164. self.assertEqual(
  165. {b"HEAD": b"55dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7"}, ret.refs
  166. )
  167. self.assertEqual({}, ret.symrefs)
  168. self.assertEqual(self.rout.getvalue(), b"0000")
  169. def test_handle_upload_pack_head_deepen_since(self) -> None:
  170. # Test that deepen-since command is properly sent
  171. from dulwich.client import _handle_upload_pack_head
  172. self.rin.write(b"0008NAK\n0000")
  173. self.rin.seek(0)
  174. class DummyGraphWalker:
  175. def __iter__(self):
  176. return self
  177. def __next__(self):
  178. return None
  179. proto = Protocol(self.rin.read, self.rout.write)
  180. capabilities = [b"shallow", b"deepen-since"]
  181. wants = [b"55dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7"]
  182. graph_walker = DummyGraphWalker()
  183. _handle_upload_pack_head(
  184. proto=proto,
  185. capabilities=capabilities,
  186. graph_walker=graph_walker,
  187. wants=wants,
  188. can_read=None,
  189. depth=None,
  190. protocol_version=0,
  191. shallow_since="2023-01-01T00:00:00Z",
  192. )
  193. # Verify the deepen-since command was sent
  194. output = self.rout.getvalue()
  195. self.assertIn(b"deepen-since 2023-01-01T00:00:00Z\n", output)
  196. def test_handle_upload_pack_head_deepen_not(self) -> None:
  197. # Test that deepen-not command is properly sent
  198. from dulwich.client import _handle_upload_pack_head
  199. self.rin.write(b"0008NAK\n0000")
  200. self.rin.seek(0)
  201. class DummyGraphWalker:
  202. def __iter__(self):
  203. return self
  204. def __next__(self):
  205. return None
  206. proto = Protocol(self.rin.read, self.rout.write)
  207. capabilities = [b"shallow", b"deepen-not"]
  208. wants = [b"55dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7"]
  209. graph_walker = DummyGraphWalker()
  210. _handle_upload_pack_head(
  211. proto=proto,
  212. capabilities=capabilities,
  213. graph_walker=graph_walker,
  214. wants=wants,
  215. can_read=None,
  216. depth=None,
  217. protocol_version=0,
  218. shallow_exclude=["refs/heads/excluded"],
  219. )
  220. # Verify the deepen-not command was sent
  221. output = self.rout.getvalue()
  222. self.assertIn(b"deepen-not refs/heads/excluded\n", output)
  223. def test_handle_upload_pack_head_deepen_not_multiple(self) -> None:
  224. # Test that multiple deepen-not commands are properly sent
  225. from dulwich.client import _handle_upload_pack_head
  226. self.rin.write(b"0008NAK\n0000")
  227. self.rin.seek(0)
  228. class DummyGraphWalker:
  229. def __iter__(self):
  230. return self
  231. def __next__(self):
  232. return None
  233. proto = Protocol(self.rin.read, self.rout.write)
  234. capabilities = [b"shallow", b"deepen-not"]
  235. wants = [b"55dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7"]
  236. graph_walker = DummyGraphWalker()
  237. _handle_upload_pack_head(
  238. proto=proto,
  239. capabilities=capabilities,
  240. graph_walker=graph_walker,
  241. wants=wants,
  242. can_read=None,
  243. depth=None,
  244. protocol_version=0,
  245. shallow_exclude=["refs/heads/excluded1", "refs/heads/excluded2"],
  246. )
  247. # Verify both deepen-not commands were sent
  248. output = self.rout.getvalue()
  249. self.assertIn(b"deepen-not refs/heads/excluded1\n", output)
  250. self.assertIn(b"deepen-not refs/heads/excluded2\n", output)
  251. def test_handle_upload_pack_head_deepen_since_and_not(self) -> None:
  252. # Test that deepen-since and deepen-not can be used together
  253. from dulwich.client import _handle_upload_pack_head
  254. self.rin.write(b"0008NAK\n0000")
  255. self.rin.seek(0)
  256. class DummyGraphWalker:
  257. def __iter__(self):
  258. return self
  259. def __next__(self):
  260. return None
  261. proto = Protocol(self.rin.read, self.rout.write)
  262. capabilities = [b"shallow", b"deepen-since", b"deepen-not"]
  263. wants = [b"55dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7"]
  264. graph_walker = DummyGraphWalker()
  265. _handle_upload_pack_head(
  266. proto=proto,
  267. capabilities=capabilities,
  268. graph_walker=graph_walker,
  269. wants=wants,
  270. can_read=None,
  271. depth=None,
  272. protocol_version=0,
  273. shallow_since="2023-01-01T00:00:00Z",
  274. shallow_exclude=["refs/heads/excluded"],
  275. )
  276. # Verify both deepen-since and deepen-not commands were sent
  277. output = self.rout.getvalue()
  278. self.assertIn(b"deepen-since 2023-01-01T00:00:00Z\n", output)
  279. self.assertIn(b"deepen-not refs/heads/excluded\n", output)
  280. def test_send_pack_no_sideband64k_with_update_ref_error(self) -> None:
  281. # No side-bank-64k reported by server shouldn't try to parse
  282. # side band data
  283. pkts = [
  284. b"55dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7 capabilities^{}"
  285. b"\x00 report-status delete-refs ofs-delta\n",
  286. b"",
  287. b"unpack ok",
  288. b"ng refs/foo/bar pre-receive hook declined",
  289. b"",
  290. ]
  291. for pkt in pkts:
  292. if pkt == b"":
  293. self.rin.write(b"0000")
  294. else:
  295. self.rin.write(("%04x" % (len(pkt) + 4)).encode("ascii") + pkt)
  296. self.rin.seek(0)
  297. tree = Tree()
  298. commit = Commit()
  299. commit.tree = tree
  300. commit.parents = []
  301. commit.author = commit.committer = b"test user"
  302. commit.commit_time = commit.author_time = 1174773719
  303. commit.commit_timezone = commit.author_timezone = 0
  304. commit.encoding = b"UTF-8"
  305. commit.message = b"test message"
  306. def update_refs(refs):
  307. return {
  308. b"refs/foo/bar": commit.id,
  309. }
  310. def generate_pack_data(have, want, *, ofs_delta=False, progress=None):
  311. return pack_objects_to_data(
  312. [
  313. (commit, None),
  314. (tree, b""),
  315. ]
  316. )
  317. result = self.client.send_pack("blah", update_refs, generate_pack_data)
  318. self.assertEqual(
  319. {b"refs/foo/bar": "pre-receive hook declined"}, result.ref_status
  320. )
  321. self.assertEqual({b"refs/foo/bar": commit.id}, result.refs)
  322. def test_send_pack_none(self) -> None:
  323. # Set ref to current value
  324. self.rin.write(
  325. b"0078310ca9477129b8586fa2afc779c1f57cf64bba6c "
  326. b"refs/heads/master\x00 report-status delete-refs "
  327. b"side-band-64k quiet ofs-delta\n"
  328. b"0000"
  329. )
  330. self.rin.seek(0)
  331. def update_refs(refs):
  332. return {b"refs/heads/master": b"310ca9477129b8586fa2afc779c1f57cf64bba6c"}
  333. def generate_pack_data(have, want, *, ofs_delta=False, progress=None):
  334. return 0, []
  335. self.client.send_pack(b"/", update_refs, generate_pack_data)
  336. self.assertEqual(self.rout.getvalue(), b"0000")
  337. def test_send_pack_keep_and_delete(self) -> None:
  338. self.rin.write(
  339. b"0063310ca9477129b8586fa2afc779c1f57cf64bba6c "
  340. b"refs/heads/master\x00report-status delete-refs ofs-delta\n"
  341. b"003f310ca9477129b8586fa2afc779c1f57cf64bba6c refs/heads/keepme\n"
  342. b"0000000eunpack ok\n"
  343. b"0019ok refs/heads/master\n"
  344. b"0000"
  345. )
  346. self.rin.seek(0)
  347. def update_refs(refs):
  348. return {b"refs/heads/master": ZERO_SHA}
  349. def generate_pack_data(have, want, *, ofs_delta=False, progress=None):
  350. return 0, []
  351. self.client.send_pack(b"/", update_refs, generate_pack_data)
  352. self.assertEqual(
  353. self.rout.getvalue(),
  354. b"008b310ca9477129b8586fa2afc779c1f57cf64bba6c "
  355. b"0000000000000000000000000000000000000000 "
  356. b"refs/heads/master\x00delete-refs ofs-delta report-status0000",
  357. )
  358. def test_send_pack_delete_only(self) -> None:
  359. self.rin.write(
  360. b"0063310ca9477129b8586fa2afc779c1f57cf64bba6c "
  361. b"refs/heads/master\x00report-status delete-refs ofs-delta\n"
  362. b"0000000eunpack ok\n"
  363. b"0019ok refs/heads/master\n"
  364. b"0000"
  365. )
  366. self.rin.seek(0)
  367. def update_refs(refs):
  368. return {b"refs/heads/master": ZERO_SHA}
  369. def generate_pack_data(have, want, *, ofs_delta=False, progress=None):
  370. return 0, []
  371. self.client.send_pack(b"/", update_refs, generate_pack_data)
  372. self.assertEqual(
  373. self.rout.getvalue(),
  374. b"008b310ca9477129b8586fa2afc779c1f57cf64bba6c "
  375. b"0000000000000000000000000000000000000000 "
  376. b"refs/heads/master\x00delete-refs ofs-delta report-status0000",
  377. )
  378. def test_send_pack_new_ref_only(self) -> None:
  379. self.rin.write(
  380. b"0063310ca9477129b8586fa2afc779c1f57cf64bba6c "
  381. b"refs/heads/master\x00report-status delete-refs ofs-delta\n"
  382. b"0000000eunpack ok\n"
  383. b"0019ok refs/heads/blah12\n"
  384. b"0000"
  385. )
  386. self.rin.seek(0)
  387. def update_refs(refs):
  388. return {
  389. b"refs/heads/blah12": b"310ca9477129b8586fa2afc779c1f57cf64bba6c",
  390. b"refs/heads/master": b"310ca9477129b8586fa2afc779c1f57cf64bba6c",
  391. }
  392. def generate_pack_data(have, want, *, ofs_delta=False, progress=None):
  393. return 0, []
  394. f = BytesIO()
  395. write_pack_objects(f.write, [], object_format=DEFAULT_OBJECT_FORMAT)
  396. self.client.send_pack("/", update_refs, generate_pack_data)
  397. self.assertEqual(
  398. self.rout.getvalue(),
  399. b"008b0000000000000000000000000000000000000000 "
  400. b"310ca9477129b8586fa2afc779c1f57cf64bba6c "
  401. b"refs/heads/blah12\x00delete-refs ofs-delta report-status0000"
  402. + f.getvalue(),
  403. )
  404. def test_send_pack_new_ref(self) -> None:
  405. self.rin.write(
  406. b"0064310ca9477129b8586fa2afc779c1f57cf64bba6c "
  407. b"refs/heads/master\x00 report-status delete-refs ofs-delta\n"
  408. b"0000000eunpack ok\n"
  409. b"0019ok refs/heads/blah12\n"
  410. b"0000"
  411. )
  412. self.rin.seek(0)
  413. tree = Tree()
  414. commit = Commit()
  415. commit.tree = tree
  416. commit.parents = []
  417. commit.author = commit.committer = b"test user"
  418. commit.commit_time = commit.author_time = 1174773719
  419. commit.commit_timezone = commit.author_timezone = 0
  420. commit.encoding = b"UTF-8"
  421. commit.message = b"test message"
  422. def update_refs(refs):
  423. return {
  424. b"refs/heads/blah12": commit.id,
  425. b"refs/heads/master": b"310ca9477129b8586fa2afc779c1f57cf64bba6c",
  426. }
  427. def generate_pack_data(have, want, *, ofs_delta=False, progress=None):
  428. return pack_objects_to_data(
  429. [
  430. (commit, None),
  431. (tree, b""),
  432. ]
  433. )
  434. f = BytesIO()
  435. count, records = generate_pack_data(None, None)
  436. from dulwich.object_format import DEFAULT_OBJECT_FORMAT
  437. write_pack_data(
  438. f.write, records, num_records=count, object_format=DEFAULT_OBJECT_FORMAT
  439. )
  440. self.client.send_pack(b"/", update_refs, generate_pack_data)
  441. self.assertEqual(
  442. self.rout.getvalue(),
  443. b"008b0000000000000000000000000000000000000000 "
  444. + commit.id
  445. + b" refs/heads/blah12\x00delete-refs ofs-delta report-status0000"
  446. + f.getvalue(),
  447. )
  448. def test_send_pack_no_deleteref_delete_only(self) -> None:
  449. pkts = [
  450. b"310ca9477129b8586fa2afc779c1f57cf64bba6c refs/heads/master"
  451. b"\x00 report-status ofs-delta\n",
  452. b"",
  453. b"",
  454. ]
  455. for pkt in pkts:
  456. if pkt == b"":
  457. self.rin.write(b"0000")
  458. else:
  459. self.rin.write(("%04x" % (len(pkt) + 4)).encode("ascii") + pkt)
  460. self.rin.seek(0)
  461. def update_refs(refs):
  462. return {b"refs/heads/master": ZERO_SHA}
  463. def generate_pack_data(have, want, *, ofs_delta=False, progress=None):
  464. return 0, []
  465. result = self.client.send_pack(b"/", update_refs, generate_pack_data)
  466. self.assertEqual(
  467. result.ref_status,
  468. {b"refs/heads/master": "remote does not support deleting refs"},
  469. )
  470. self.assertEqual(
  471. result.refs,
  472. {b"refs/heads/master": b"310ca9477129b8586fa2afc779c1f57cf64bba6c"},
  473. )
  474. self.assertEqual(self.rout.getvalue(), b"0000")
  475. class TestGetTransportAndPath(TestCase):
  476. def test_tcp(self) -> None:
  477. c, path = get_transport_and_path("git://foo.com/bar/baz")
  478. self.assertIsInstance(c, TCPGitClient)
  479. self.assertEqual("foo.com", c._host)
  480. self.assertEqual(TCP_GIT_PORT, c._port)
  481. self.assertEqual("/bar/baz", path)
  482. def test_tcp_port(self) -> None:
  483. c, path = get_transport_and_path("git://foo.com:1234/bar/baz")
  484. self.assertIsInstance(c, TCPGitClient)
  485. self.assertEqual("foo.com", c._host)
  486. self.assertEqual(1234, c._port)
  487. self.assertEqual("/bar/baz", path)
  488. def test_tcp_ipv6(self) -> None:
  489. c, path = get_transport_and_path("git://[::1]/bar/baz")
  490. self.assertIsInstance(c, TCPGitClient)
  491. self.assertEqual("::1", c._host)
  492. self.assertEqual(TCP_GIT_PORT, c._port)
  493. self.assertEqual("/bar/baz", path)
  494. def test_tcp_ipv6_port(self) -> None:
  495. c, path = get_transport_and_path("git://[2001:db8::1]:1234/bar/baz")
  496. self.assertIsInstance(c, TCPGitClient)
  497. self.assertEqual("2001:db8::1", c._host)
  498. self.assertEqual(1234, c._port)
  499. self.assertEqual("/bar/baz", path)
  500. def test_git_ssh_explicit(self) -> None:
  501. c, path = get_transport_and_path("git+ssh://foo.com/bar/baz")
  502. self.assertIsInstance(c, SSHGitClient)
  503. self.assertEqual("foo.com", c.host)
  504. self.assertEqual(None, c.port)
  505. self.assertEqual(None, c.username)
  506. self.assertEqual("/bar/baz", path)
  507. def test_ssh_explicit(self) -> None:
  508. c, path = get_transport_and_path("ssh://foo.com/bar/baz")
  509. self.assertIsInstance(c, SSHGitClient)
  510. self.assertEqual("foo.com", c.host)
  511. self.assertEqual(None, c.port)
  512. self.assertEqual(None, c.username)
  513. self.assertEqual("/bar/baz", path)
  514. def test_ssh_port_explicit(self) -> None:
  515. c, path = get_transport_and_path("git+ssh://foo.com:1234/bar/baz")
  516. self.assertIsInstance(c, SSHGitClient)
  517. self.assertEqual("foo.com", c.host)
  518. self.assertEqual(1234, c.port)
  519. self.assertEqual("/bar/baz", path)
  520. def test_username_and_port_explicit_unknown_scheme(self) -> None:
  521. c, path = get_transport_and_path("unknown://git@server:7999/dply/stuff.git")
  522. self.assertIsInstance(c, SSHGitClient)
  523. self.assertEqual("unknown", c.host)
  524. self.assertEqual("//git@server:7999/dply/stuff.git", path)
  525. def test_username_and_port_explicit(self) -> None:
  526. c, path = get_transport_and_path("ssh://git@server:7999/dply/stuff.git")
  527. self.assertIsInstance(c, SSHGitClient)
  528. self.assertEqual("git", c.username)
  529. self.assertEqual("server", c.host)
  530. self.assertEqual(7999, c.port)
  531. self.assertEqual("/dply/stuff.git", path)
  532. def test_ssh_abspath_doubleslash(self) -> None:
  533. c, path = get_transport_and_path("git+ssh://foo.com//bar/baz")
  534. self.assertIsInstance(c, SSHGitClient)
  535. self.assertEqual("foo.com", c.host)
  536. self.assertEqual(None, c.port)
  537. self.assertEqual(None, c.username)
  538. self.assertEqual("//bar/baz", path)
  539. def test_ssh_port(self) -> None:
  540. c, path = get_transport_and_path("git+ssh://foo.com:1234/bar/baz")
  541. self.assertIsInstance(c, SSHGitClient)
  542. self.assertEqual("foo.com", c.host)
  543. self.assertEqual(1234, c.port)
  544. self.assertEqual("/bar/baz", path)
  545. def test_ssh_implicit(self) -> None:
  546. c, path = get_transport_and_path("foo:/bar/baz")
  547. self.assertIsInstance(c, SSHGitClient)
  548. self.assertEqual("foo", c.host)
  549. self.assertEqual(None, c.port)
  550. self.assertEqual(None, c.username)
  551. self.assertEqual("/bar/baz", path)
  552. def test_ssh_host(self) -> None:
  553. c, path = get_transport_and_path("foo.com:/bar/baz")
  554. self.assertIsInstance(c, SSHGitClient)
  555. self.assertEqual("foo.com", c.host)
  556. self.assertEqual(None, c.port)
  557. self.assertEqual(None, c.username)
  558. self.assertEqual("/bar/baz", path)
  559. def test_ssh_user_host(self) -> None:
  560. c, path = get_transport_and_path("user@foo.com:/bar/baz")
  561. self.assertIsInstance(c, SSHGitClient)
  562. self.assertEqual("foo.com", c.host)
  563. self.assertEqual(None, c.port)
  564. self.assertEqual("user", c.username)
  565. self.assertEqual("/bar/baz", path)
  566. def test_ssh_relpath(self) -> None:
  567. c, path = get_transport_and_path("foo:bar/baz")
  568. self.assertIsInstance(c, SSHGitClient)
  569. self.assertEqual("foo", c.host)
  570. self.assertEqual(None, c.port)
  571. self.assertEqual(None, c.username)
  572. self.assertEqual("bar/baz", path)
  573. def test_ssh_host_relpath(self) -> None:
  574. c, path = get_transport_and_path("foo.com:bar/baz")
  575. self.assertIsInstance(c, SSHGitClient)
  576. self.assertEqual("foo.com", c.host)
  577. self.assertEqual(None, c.port)
  578. self.assertEqual(None, c.username)
  579. self.assertEqual("bar/baz", path)
  580. def test_ssh_user_host_relpath(self) -> None:
  581. c, path = get_transport_and_path("user@foo.com:bar/baz")
  582. self.assertIsInstance(c, SSHGitClient)
  583. self.assertEqual("foo.com", c.host)
  584. self.assertEqual(None, c.port)
  585. self.assertEqual("user", c.username)
  586. self.assertEqual("bar/baz", path)
  587. def test_local(self) -> None:
  588. c, path = get_transport_and_path("foo.bar/baz")
  589. self.assertIsInstance(c, LocalGitClient)
  590. self.assertEqual("foo.bar/baz", path)
  591. def test_ssh_with_config(self) -> None:
  592. # Test that core.sshCommand from config is passed to SSHGitClient
  593. from dulwich.config import ConfigDict
  594. config = ConfigDict()
  595. c, _path = get_transport_and_path(
  596. "ssh://git@github.com/user/repo.git", config=config
  597. )
  598. self.assertIsInstance(c, SSHGitClient)
  599. self.assertEqual(c.ssh_command, "ssh") # Now defaults to "ssh"
  600. config.set((b"core",), b"sshCommand", b"custom-ssh -o CustomOption=yes")
  601. c, _path = get_transport_and_path(
  602. "ssh://git@github.com/user/repo.git", config=config
  603. )
  604. self.assertIsInstance(c, SSHGitClient)
  605. self.assertEqual("custom-ssh -o CustomOption=yes", c.ssh_command)
  606. # Test rsync-style URL also gets the config
  607. c, _path = get_transport_and_path("git@github.com:user/repo.git", config=config)
  608. self.assertIsInstance(c, SSHGitClient)
  609. self.assertEqual("custom-ssh -o CustomOption=yes", c.ssh_command)
  610. @skipIf(sys.platform != "win32", "Behaviour only happens on windows.")
  611. def test_local_abs_windows_path(self) -> None:
  612. c, path = get_transport_and_path("C:\\foo.bar\\baz")
  613. self.assertIsInstance(c, LocalGitClient)
  614. self.assertEqual("C:\\foo.bar\\baz", path)
  615. def test_error(self) -> None:
  616. # Need to use a known urlparse.uses_netloc URL scheme to get the
  617. # expected parsing of the URL on Python versions less than 2.6.5
  618. c, _path = get_transport_and_path("prospero://bar/baz")
  619. self.assertIsInstance(c, SSHGitClient)
  620. def test_http(self) -> None:
  621. url = "https://github.com/jelmer/dulwich"
  622. c, path = get_transport_and_path(url)
  623. self.assertIsInstance(c, HttpGitClient)
  624. self.assertEqual("/jelmer/dulwich", path)
  625. def test_http_auth(self) -> None:
  626. url = "https://user:passwd@github.com/jelmer/dulwich"
  627. c, path = get_transport_and_path(url)
  628. self.assertIsInstance(c, HttpGitClient)
  629. self.assertEqual("/jelmer/dulwich", path)
  630. self.assertEqual("user", c._username)
  631. self.assertEqual("passwd", c._password)
  632. def test_http_auth_with_username(self) -> None:
  633. url = "https://github.com/jelmer/dulwich"
  634. c, path = get_transport_and_path(url, username="user2", password="blah")
  635. self.assertIsInstance(c, HttpGitClient)
  636. self.assertEqual("/jelmer/dulwich", path)
  637. self.assertEqual("user2", c._username)
  638. self.assertEqual("blah", c._password)
  639. def test_http_auth_with_username_and_in_url(self) -> None:
  640. url = "https://user:passwd@github.com/jelmer/dulwich"
  641. c, path = get_transport_and_path(url, username="user2", password="blah")
  642. self.assertIsInstance(c, HttpGitClient)
  643. self.assertEqual("/jelmer/dulwich", path)
  644. # Explicitly provided credentials should override URL credentials
  645. self.assertEqual("user2", c._username)
  646. self.assertEqual("blah", c._password)
  647. def test_http_no_auth(self) -> None:
  648. url = "https://github.com/jelmer/dulwich"
  649. c, path = get_transport_and_path(url)
  650. self.assertIsInstance(c, HttpGitClient)
  651. self.assertEqual("/jelmer/dulwich", path)
  652. self.assertIs(None, c._username)
  653. self.assertIs(None, c._password)
  654. def test_ssh_with_key_filename_and_ssh_command(self) -> None:
  655. # Test that key_filename and ssh_command are passed through to SSHGitClient
  656. c, path = get_transport_and_path(
  657. "ssh://git@github.com/user/repo.git",
  658. key_filename="/path/to/id_rsa",
  659. ssh_command="custom-ssh -o StrictHostKeyChecking=no",
  660. )
  661. self.assertIsInstance(c, SSHGitClient)
  662. self.assertEqual("/user/repo.git", path)
  663. self.assertEqual("/path/to/id_rsa", c.key_filename)
  664. self.assertEqual("custom-ssh -o StrictHostKeyChecking=no", c.ssh_command)
  665. class TestGetTransportAndPathFromUrl(TestCase):
  666. def test_tcp(self) -> None:
  667. c, path = get_transport_and_path_from_url("git://foo.com/bar/baz")
  668. self.assertIsInstance(c, TCPGitClient)
  669. self.assertEqual("foo.com", c._host)
  670. self.assertEqual(TCP_GIT_PORT, c._port)
  671. self.assertEqual("/bar/baz", path)
  672. def test_tcp_port(self) -> None:
  673. c, path = get_transport_and_path_from_url("git://foo.com:1234/bar/baz")
  674. self.assertIsInstance(c, TCPGitClient)
  675. self.assertEqual("foo.com", c._host)
  676. self.assertEqual(1234, c._port)
  677. self.assertEqual("/bar/baz", path)
  678. def test_ssh_explicit(self) -> None:
  679. c, path = get_transport_and_path_from_url("git+ssh://foo.com/bar/baz")
  680. self.assertIsInstance(c, SSHGitClient)
  681. self.assertEqual("foo.com", c.host)
  682. self.assertEqual(None, c.port)
  683. self.assertEqual(None, c.username)
  684. self.assertEqual("/bar/baz", path)
  685. def test_ssh_port_explicit(self) -> None:
  686. c, path = get_transport_and_path_from_url("git+ssh://foo.com:1234/bar/baz")
  687. self.assertIsInstance(c, SSHGitClient)
  688. self.assertEqual("foo.com", c.host)
  689. self.assertEqual(1234, c.port)
  690. self.assertEqual("/bar/baz", path)
  691. def test_ssh_homepath(self) -> None:
  692. c, path = get_transport_and_path_from_url("git+ssh://foo.com/~/bar/baz")
  693. self.assertIsInstance(c, SSHGitClient)
  694. self.assertEqual("foo.com", c.host)
  695. self.assertEqual(None, c.port)
  696. self.assertEqual(None, c.username)
  697. self.assertEqual("/~/bar/baz", path)
  698. def test_ssh_port_homepath(self) -> None:
  699. c, path = get_transport_and_path_from_url("git+ssh://foo.com:1234/~/bar/baz")
  700. self.assertIsInstance(c, SSHGitClient)
  701. self.assertEqual("foo.com", c.host)
  702. self.assertEqual(1234, c.port)
  703. self.assertEqual("/~/bar/baz", path)
  704. def test_ssh_host_relpath(self) -> None:
  705. self.assertRaises(
  706. ValueError, get_transport_and_path_from_url, "foo.com:bar/baz"
  707. )
  708. def test_ssh_user_host_relpath(self) -> None:
  709. self.assertRaises(
  710. ValueError, get_transport_and_path_from_url, "user@foo.com:bar/baz"
  711. )
  712. def test_local_path(self) -> None:
  713. self.assertRaises(ValueError, get_transport_and_path_from_url, "foo.bar/baz")
  714. def test_error(self) -> None:
  715. # Need to use a known urlparse.uses_netloc URL scheme to get the
  716. # expected parsing of the URL on Python versions less than 2.6.5
  717. self.assertRaises(
  718. ValueError, get_transport_and_path_from_url, "prospero://bar/baz"
  719. )
  720. def test_http(self) -> None:
  721. url = "https://github.com/jelmer/dulwich"
  722. c, path = get_transport_and_path_from_url(url)
  723. self.assertIsInstance(c, HttpGitClient)
  724. self.assertEqual("https://github.com", c.get_url(b"/"))
  725. self.assertEqual("/jelmer/dulwich", path)
  726. def test_http_port(self) -> None:
  727. url = "https://github.com:9090/jelmer/dulwich"
  728. c, path = get_transport_and_path_from_url(url)
  729. self.assertEqual("https://github.com:9090", c.get_url(b"/"))
  730. self.assertIsInstance(c, HttpGitClient)
  731. self.assertEqual("/jelmer/dulwich", path)
  732. @patch("os.name", "posix")
  733. @patch("sys.platform", "linux")
  734. def test_file(self) -> None:
  735. c, path = get_transport_and_path_from_url("file:///home/jelmer/foo")
  736. self.assertIsInstance(c, LocalGitClient)
  737. self.assertEqual("/home/jelmer/foo", path)
  738. def test_win32_url_to_path(self):
  739. def check(url, expected):
  740. parsed = urlparse(url)
  741. self.assertEqual(_win32_url_to_path(parsed), expected)
  742. check("file:C:/foo.bar/baz", "C:\\foo.bar\\baz")
  743. check("file:/C:/foo.bar/baz", "C:\\foo.bar\\baz")
  744. check("file://C:/foo.bar/baz", "C:\\foo.bar\\baz")
  745. check("file:///C:/foo.bar/baz", "C:\\foo.bar\\baz")
  746. @patch("os.name", "nt")
  747. @patch("sys.platform", "win32")
  748. def test_file_win(self) -> None:
  749. expected = "C:\\foo.bar\\baz"
  750. for file_url in [
  751. "file:C:/foo.bar/baz",
  752. "file:/C:/foo.bar/baz",
  753. "file://C:/foo.bar/baz",
  754. "file:///C:/foo.bar/baz",
  755. ]:
  756. c, path = get_transport_and_path(file_url)
  757. self.assertIsInstance(c, LocalGitClient)
  758. self.assertEqual(path, expected)
  759. for remote_url in [
  760. "file://host.example.com/C:/foo.bar/baz"
  761. "file://host.example.com/C:/foo.bar/baz"
  762. "file:////host.example/foo.bar/baz",
  763. ]:
  764. with self.assertRaises(NotImplementedError):
  765. c, path = get_transport_and_path(remote_url)
  766. class TestSSHVendor:
  767. def __init__(self) -> None:
  768. self.host = None
  769. self.command = ""
  770. self.username = None
  771. self.port = None
  772. self.password = None
  773. self.key_filename = None
  774. def run_command(
  775. self,
  776. host,
  777. command,
  778. username=None,
  779. port=None,
  780. password=None,
  781. key_filename=None,
  782. ssh_command=None,
  783. protocol_version=None,
  784. ):
  785. self.host = host
  786. self.command = command
  787. self.username = username
  788. self.port = port
  789. self.password = password
  790. self.key_filename = key_filename
  791. self.ssh_command = ssh_command
  792. self.protocol_version = protocol_version
  793. class Subprocess:
  794. pass
  795. Subprocess.read = lambda: None
  796. Subprocess.write = lambda: None
  797. Subprocess.close = lambda: None
  798. Subprocess.can_read = lambda: None
  799. return Subprocess()
  800. class SSHGitClientTests(TestCase):
  801. def setUp(self) -> None:
  802. super().setUp()
  803. self.server = TestSSHVendor()
  804. self.real_vendor = client.get_ssh_vendor
  805. client.get_ssh_vendor = lambda: self.server
  806. self.client = SSHGitClient("git.samba.org")
  807. def tearDown(self) -> None:
  808. super().tearDown()
  809. client.get_ssh_vendor = self.real_vendor
  810. def test_get_url(self) -> None:
  811. path = "/tmp/repo.git"
  812. c = SSHGitClient("git.samba.org")
  813. url = c.get_url(path)
  814. self.assertEqual("ssh://git.samba.org/tmp/repo.git", url)
  815. def test_get_url_with_username_and_port(self) -> None:
  816. path = "/tmp/repo.git"
  817. c = SSHGitClient("git.samba.org", port=2222, username="user")
  818. url = c.get_url(path)
  819. self.assertEqual("ssh://user@git.samba.org:2222/tmp/repo.git", url)
  820. def test_default_command(self) -> None:
  821. self.assertEqual(b"git-upload-pack", self.client._get_cmd_path(b"upload-pack"))
  822. def test_alternative_command_path(self) -> None:
  823. self.client.alternative_paths[b"upload-pack"] = b"/usr/lib/git/git-upload-pack"
  824. self.assertEqual(
  825. b"/usr/lib/git/git-upload-pack",
  826. self.client._get_cmd_path(b"upload-pack"),
  827. )
  828. def test_alternative_command_path_spaces(self) -> None:
  829. self.client.alternative_paths[b"upload-pack"] = (
  830. b"/usr/lib/git/git-upload-pack -ibla"
  831. )
  832. self.assertEqual(
  833. b"/usr/lib/git/git-upload-pack -ibla",
  834. self.client._get_cmd_path(b"upload-pack"),
  835. )
  836. def test_connect(self) -> None:
  837. server = self.server
  838. client = self.client
  839. client.username = b"username"
  840. client.port = 1337
  841. client._connect(b"command", b"/path/to/repo")
  842. self.assertEqual(b"username", server.username)
  843. self.assertEqual(1337, server.port)
  844. self.assertEqual(b"git-command '/path/to/repo'", server.command)
  845. client._connect(b"relative-command", b"/~/path/to/repo")
  846. self.assertEqual(b"git-relative-command '~/path/to/repo'", server.command)
  847. def test_ssh_command_precedence(self) -> None:
  848. self.overrideEnv("GIT_SSH", "/path/to/ssh")
  849. test_client = SSHGitClient("git.samba.org")
  850. self.assertEqual(test_client.ssh_command, "/path/to/ssh")
  851. self.overrideEnv("GIT_SSH_COMMAND", "/path/to/ssh -o Option=Value")
  852. test_client = SSHGitClient("git.samba.org")
  853. self.assertEqual(test_client.ssh_command, "/path/to/ssh -o Option=Value")
  854. test_client = SSHGitClient("git.samba.org", ssh_command="ssh -o Option1=Value1")
  855. self.assertEqual(test_client.ssh_command, "ssh -o Option1=Value1")
  856. def test_ssh_command_config(self) -> None:
  857. # Test core.sshCommand config setting
  858. from dulwich.config import ConfigDict
  859. # No config, no environment - should default to "ssh"
  860. self.overrideEnv("GIT_SSH", None)
  861. self.overrideEnv("GIT_SSH_COMMAND", None)
  862. test_client = SSHGitClient("git.samba.org")
  863. self.assertEqual(test_client.ssh_command, "ssh")
  864. # Config with core.sshCommand
  865. config = ConfigDict()
  866. config.set((b"core",), b"sshCommand", b"ssh -o StrictHostKeyChecking=no")
  867. test_client = SSHGitClient("git.samba.org", config=config)
  868. self.assertEqual(test_client.ssh_command, "ssh -o StrictHostKeyChecking=no")
  869. # ssh_command parameter takes precedence over config
  870. test_client = SSHGitClient(
  871. "git.samba.org", config=config, ssh_command="custom-ssh"
  872. )
  873. self.assertEqual(test_client.ssh_command, "custom-ssh")
  874. # Environment variables take precedence over config when no ssh_command parameter
  875. self.overrideEnv("GIT_SSH_COMMAND", "/usr/bin/ssh -v")
  876. test_client = SSHGitClient("git.samba.org", config=config)
  877. self.assertEqual(test_client.ssh_command, "/usr/bin/ssh -v")
  878. def test_ssh_kwargs_passed_to_vendor(self) -> None:
  879. # Test that ssh_command and other kwargs are actually passed to the SSH vendor
  880. server = self.server
  881. client = self.client
  882. # Set custom ssh_command
  883. client.ssh_command = "custom-ssh-wrapper.sh -o Option=Value"
  884. client.password = "test-password"
  885. client.key_filename = "/path/to/key"
  886. # Connect and verify all kwargs are passed through
  887. client._connect(b"upload-pack", b"/path/to/repo")
  888. self.assertEqual(server.ssh_command, "custom-ssh-wrapper.sh -o Option=Value")
  889. self.assertEqual(server.password, "test-password")
  890. self.assertEqual(server.key_filename, "/path/to/key")
  891. class ReportStatusParserTests(TestCase):
  892. def test_invalid_pack(self) -> None:
  893. parser = ReportStatusParser()
  894. parser.handle_packet(b"unpack error - foo bar")
  895. parser.handle_packet(b"ok refs/foo/bar")
  896. parser.handle_packet(None)
  897. self.assertRaises(SendPackError, list, parser.check())
  898. def test_update_refs_error(self) -> None:
  899. parser = ReportStatusParser()
  900. parser.handle_packet(b"unpack ok")
  901. parser.handle_packet(b"ng refs/foo/bar need to pull")
  902. parser.handle_packet(None)
  903. self.assertEqual([(b"refs/foo/bar", "need to pull")], list(parser.check()))
  904. def test_ok(self) -> None:
  905. parser = ReportStatusParser()
  906. parser.handle_packet(b"unpack ok")
  907. parser.handle_packet(b"ok refs/foo/bar")
  908. parser.handle_packet(None)
  909. self.assertEqual([(b"refs/foo/bar", None)], list(parser.check()))
  910. class LocalGitClientTests(TestCase):
  911. def test_get_url(self) -> None:
  912. path = "/tmp/repo.git"
  913. c = LocalGitClient()
  914. url = c.get_url(path)
  915. self.assertEqual("file:///tmp/repo.git", url)
  916. def test_fetch_into_empty(self) -> None:
  917. c = LocalGitClient()
  918. target = tempfile.mkdtemp()
  919. self.addCleanup(shutil.rmtree, target)
  920. t = Repo.init_bare(target)
  921. self.addCleanup(t.close)
  922. s = open_repo("a.git")
  923. self.addCleanup(tear_down_repo, s)
  924. self.assertEqual(s.get_refs(), c.fetch(s.path, t).refs)
  925. def test_clone(self) -> None:
  926. c = LocalGitClient()
  927. s = open_repo("a.git")
  928. self.addCleanup(tear_down_repo, s)
  929. target = tempfile.mkdtemp()
  930. self.addCleanup(shutil.rmtree, target)
  931. result_repo = c.clone(s.path, target, mkdir=False)
  932. self.addCleanup(result_repo.close)
  933. expected = dict(s.get_refs())
  934. expected[b"refs/remotes/origin/HEAD"] = expected[b"HEAD"]
  935. expected[b"refs/remotes/origin/master"] = expected[b"refs/heads/master"]
  936. self.assertEqual(expected, result_repo.get_refs())
  937. def test_fetch_empty(self) -> None:
  938. c = LocalGitClient()
  939. s = open_repo("a.git")
  940. self.addCleanup(tear_down_repo, s)
  941. out = BytesIO()
  942. walker = {}
  943. ret = c.fetch_pack(
  944. s.path,
  945. lambda heads, depth=None: [],
  946. graph_walker=walker,
  947. pack_data=out.write,
  948. )
  949. self.assertEqual(
  950. {
  951. b"HEAD": b"a90fa2d900a17e99b433217e988c4eb4a2e9a097",
  952. b"refs/heads/master": b"a90fa2d900a17e99b433217e988c4eb4a2e9a097",
  953. b"refs/tags/mytag": b"28237f4dc30d0d462658d6b937b08a0f0b6ef55a",
  954. b"refs/tags/mytag-packed": b"b0931cadc54336e78a1d980420e3268903b57a50",
  955. },
  956. ret.refs,
  957. )
  958. self.assertEqual({b"HEAD": b"refs/heads/master"}, ret.symrefs)
  959. self.assertEqual(
  960. b"PACK\x00\x00\x00\x02\x00\x00\x00\x00\x02\x9d\x08"
  961. b"\x82;\xd8\xa8\xea\xb5\x10\xadj\xc7\\\x82<\xfd>\xd3\x1e",
  962. out.getvalue(),
  963. )
  964. def test_fetch_pack_none(self) -> None:
  965. c = LocalGitClient()
  966. s = open_repo("a.git")
  967. self.addCleanup(tear_down_repo, s)
  968. out = BytesIO()
  969. walker = MemoryRepo().get_graph_walker()
  970. ret = c.fetch_pack(
  971. s.path,
  972. lambda heads, depth=None: [b"a90fa2d900a17e99b433217e988c4eb4a2e9a097"],
  973. graph_walker=walker,
  974. pack_data=out.write,
  975. )
  976. self.assertEqual({b"HEAD": b"refs/heads/master"}, ret.symrefs)
  977. self.assertEqual(
  978. {
  979. b"HEAD": b"a90fa2d900a17e99b433217e988c4eb4a2e9a097",
  980. b"refs/heads/master": b"a90fa2d900a17e99b433217e988c4eb4a2e9a097",
  981. b"refs/tags/mytag": b"28237f4dc30d0d462658d6b937b08a0f0b6ef55a",
  982. b"refs/tags/mytag-packed": b"b0931cadc54336e78a1d980420e3268903b57a50",
  983. },
  984. ret.refs,
  985. )
  986. # Hardcoding is not ideal, but we'll fix that some other day..
  987. self.assertTrue(
  988. out.getvalue().startswith(b"PACK\x00\x00\x00\x02\x00\x00\x00\x07")
  989. )
  990. def test_send_pack_without_changes(self) -> None:
  991. local = open_repo("a.git")
  992. self.addCleanup(tear_down_repo, local)
  993. target = open_repo("a.git")
  994. self.addCleanup(tear_down_repo, target)
  995. self.send_and_verify(b"master", local, target)
  996. def test_send_pack_with_changes(self) -> None:
  997. local = open_repo("a.git")
  998. self.addCleanup(tear_down_repo, local)
  999. target_path = tempfile.mkdtemp()
  1000. self.addCleanup(shutil.rmtree, target_path)
  1001. with Repo.init_bare(target_path) as target:
  1002. self.send_and_verify(b"master", local, target)
  1003. def test_get_refs(self) -> None:
  1004. local = open_repo("refs.git")
  1005. self.addCleanup(tear_down_repo, local)
  1006. client = LocalGitClient()
  1007. result = client.get_refs(local.path)
  1008. self.assertDictEqual(local.refs.as_dict(), result.refs)
  1009. # Check that symrefs are detected correctly
  1010. self.assertIn(b"HEAD", result.symrefs)
  1011. def send_and_verify(self, branch, local, target) -> None:
  1012. """Send branch from local to remote repository and verify it worked."""
  1013. client = LocalGitClient()
  1014. ref_name = b"refs/heads/" + branch
  1015. result = client.send_pack(
  1016. target.path,
  1017. lambda _: {ref_name: local.refs[ref_name]},
  1018. local.generate_pack_data,
  1019. )
  1020. self.assertEqual(local.refs[ref_name], result.refs[ref_name])
  1021. self.assertIs(None, result.agent)
  1022. self.assertEqual({}, result.ref_status)
  1023. obj_local = local.get_object(result.refs[ref_name])
  1024. obj_target = target.get_object(result.refs[ref_name])
  1025. self.assertEqual(obj_local, obj_target)
  1026. class BundleClientTests(TestCase):
  1027. def setUp(self) -> None:
  1028. super().setUp()
  1029. self.tempdir = tempfile.mkdtemp()
  1030. self.addCleanup(shutil.rmtree, self.tempdir)
  1031. def _create_test_bundle(self):
  1032. """Create a test bundle file and return its path."""
  1033. # Create a simple repository
  1034. repo = MemoryRepo()
  1035. # Create some objects
  1036. blob = Blob.from_string(b"Hello world")
  1037. repo.object_store.add_object(blob)
  1038. tree = Tree()
  1039. tree.add(b"hello.txt", 0o100644, blob.id)
  1040. repo.object_store.add_object(tree)
  1041. commit = Commit()
  1042. commit.tree = tree.id
  1043. commit.message = b"Initial commit"
  1044. commit.author = commit.committer = b"Test User <test@example.com>"
  1045. commit.commit_time = commit.author_time = 1234567890
  1046. commit.commit_timezone = commit.author_timezone = 0
  1047. repo.object_store.add_object(commit)
  1048. repo.refs[b"refs/heads/master"] = commit.id
  1049. # Create bundle
  1050. bundle = create_bundle_from_repo(repo)
  1051. # Write bundle to file
  1052. bundle_path = os.path.join(self.tempdir, "test.bundle")
  1053. with open(bundle_path, "wb") as f:
  1054. write_bundle(f, bundle)
  1055. return bundle_path, repo
  1056. def test_is_bundle_file(self) -> None:
  1057. """Test bundle file detection."""
  1058. bundle_path, _ = self._create_test_bundle()
  1059. # Test positive case
  1060. self.assertTrue(BundleClient._is_bundle_file(bundle_path))
  1061. # Test negative case - regular file
  1062. regular_file = os.path.join(self.tempdir, "regular.txt")
  1063. with open(regular_file, "w") as f:
  1064. f.write("not a bundle")
  1065. self.assertFalse(BundleClient._is_bundle_file(regular_file))
  1066. # Test negative case - non-existent file
  1067. self.assertFalse(BundleClient._is_bundle_file("/non/existent/file"))
  1068. def test_get_refs(self) -> None:
  1069. """Test getting refs from bundle."""
  1070. bundle_path, _ = self._create_test_bundle()
  1071. client = BundleClient()
  1072. result = client.get_refs(bundle_path)
  1073. self.assertIn(b"refs/heads/master", result.refs)
  1074. self.assertEqual(result.symrefs, {})
  1075. def test_fetch_pack(self) -> None:
  1076. """Test fetching pack from bundle."""
  1077. bundle_path, _source_repo = self._create_test_bundle()
  1078. client = BundleClient()
  1079. pack_data = BytesIO()
  1080. def determine_wants(refs):
  1081. return list(refs.values())
  1082. class MockGraphWalker:
  1083. def next(self):
  1084. return None
  1085. def ack(self, sha):
  1086. pass
  1087. result = client.fetch_pack(
  1088. bundle_path, determine_wants, MockGraphWalker(), pack_data.write
  1089. )
  1090. # Verify we got refs back
  1091. self.assertIn(b"refs/heads/master", result.refs)
  1092. # Verify pack data was written
  1093. self.assertGreater(len(pack_data.getvalue()), 0)
  1094. def test_fetch(self) -> None:
  1095. """Test fetching from bundle into target repo."""
  1096. bundle_path, _source_repo = self._create_test_bundle()
  1097. client = BundleClient()
  1098. target_repo = MemoryRepo()
  1099. result = client.fetch(bundle_path, target_repo)
  1100. # Verify refs were imported
  1101. self.assertIn(b"refs/heads/master", result.refs)
  1102. # Verify objects were imported
  1103. master_id = result.refs[b"refs/heads/master"]
  1104. self.assertIn(master_id, target_repo.object_store)
  1105. # Verify the commit object is correct
  1106. commit = target_repo.object_store[master_id]
  1107. self.assertEqual(commit.message, b"Initial commit")
  1108. def test_send_pack_not_supported(self) -> None:
  1109. """Test that send_pack raises NotImplementedError."""
  1110. bundle_path, _ = self._create_test_bundle()
  1111. client = BundleClient()
  1112. with self.assertRaises(NotImplementedError):
  1113. client.send_pack(bundle_path, None, None)
  1114. def test_get_transport_and_path_bundle(self) -> None:
  1115. """Test that get_transport_and_path detects bundle files."""
  1116. bundle_path, _ = self._create_test_bundle()
  1117. client, path = get_transport_and_path(bundle_path)
  1118. self.assertIsInstance(client, BundleClient)
  1119. self.assertEqual(path, bundle_path)
  1120. class HttpGitClientTests(TestCase):
  1121. def test_get_url(self) -> None:
  1122. base_url = "https://github.com/jelmer/dulwich"
  1123. path = "/jelmer/dulwich"
  1124. c = HttpGitClient(base_url)
  1125. url = c.get_url(path)
  1126. self.assertEqual("https://github.com/jelmer/dulwich", url)
  1127. def test_get_url_bytes_path(self) -> None:
  1128. base_url = "https://github.com/jelmer/dulwich"
  1129. path_bytes = b"/jelmer/dulwich"
  1130. c = HttpGitClient(base_url)
  1131. url = c.get_url(path_bytes)
  1132. self.assertEqual("https://github.com/jelmer/dulwich", url)
  1133. def test_get_url_with_username_and_passwd(self) -> None:
  1134. base_url = "https://github.com/jelmer/dulwich"
  1135. path = "/jelmer/dulwich"
  1136. c = HttpGitClient(base_url, username="USERNAME", password="PASSWD")
  1137. url = c.get_url(path)
  1138. self.assertEqual("https://github.com/jelmer/dulwich", url)
  1139. def test_init_username_passwd_set(self) -> None:
  1140. url = "https://github.com/jelmer/dulwich"
  1141. c = HttpGitClient(url, config=None, username="user", password="passwd")
  1142. self.assertEqual("user", c._username)
  1143. self.assertEqual("passwd", c._password)
  1144. basic_auth = c.pool_manager.headers["authorization"]
  1145. auth_string = "{}:{}".format("user", "passwd")
  1146. b64_credentials = base64.b64encode(auth_string.encode("latin1"))
  1147. expected_basic_auth = "Basic {}".format(b64_credentials.decode("latin1"))
  1148. self.assertEqual(basic_auth, expected_basic_auth)
  1149. def test_init_username_set_no_password(self) -> None:
  1150. url = "https://github.com/jelmer/dulwich"
  1151. c = HttpGitClient(url, config=None, username="user")
  1152. self.assertEqual("user", c._username)
  1153. self.assertIsNone(c._password)
  1154. basic_auth = c.pool_manager.headers["authorization"]
  1155. auth_string = b"user:"
  1156. b64_credentials = base64.b64encode(auth_string)
  1157. expected_basic_auth = f"Basic {b64_credentials.decode('ascii')}"
  1158. self.assertEqual(basic_auth, expected_basic_auth)
  1159. def test_init_no_username_passwd(self) -> None:
  1160. url = "https://github.com/jelmer/dulwich"
  1161. c = HttpGitClient(url, config=None)
  1162. self.assertIs(None, c._username)
  1163. self.assertIs(None, c._password)
  1164. self.assertNotIn("authorization", c.pool_manager.headers)
  1165. def test_from_parsedurl_username_only(self) -> None:
  1166. username = "user"
  1167. url = f"https://{username}@github.com/jelmer/dulwich"
  1168. c = HttpGitClient.from_parsedurl(urlparse(url))
  1169. self.assertEqual(c._username, username)
  1170. self.assertEqual(c._password, None)
  1171. basic_auth = c.pool_manager.headers["authorization"]
  1172. auth_string = username.encode("ascii") + b":"
  1173. b64_credentials = base64.b64encode(auth_string)
  1174. expected_basic_auth = f"Basic {b64_credentials.decode('ascii')}"
  1175. self.assertEqual(basic_auth, expected_basic_auth)
  1176. def test_from_parsedurl_on_url_with_quoted_credentials(self) -> None:
  1177. original_username = "john|the|first"
  1178. quoted_username = urlquote(original_username)
  1179. original_password = "Ya#1$2%3"
  1180. quoted_password = urlquote(original_password)
  1181. url = f"https://{quoted_username}:{quoted_password}@github.com/jelmer/dulwich"
  1182. c = HttpGitClient.from_parsedurl(urlparse(url))
  1183. self.assertEqual(original_username, c._username)
  1184. self.assertEqual(original_password, c._password)
  1185. basic_auth = c.pool_manager.headers["authorization"]
  1186. auth_string = f"{original_username}:{original_password}"
  1187. b64_credentials = base64.b64encode(auth_string.encode("latin1"))
  1188. expected_basic_auth = "Basic {}".format(b64_credentials.decode("latin1"))
  1189. self.assertEqual(basic_auth, expected_basic_auth)
  1190. def test_url_redirect_location(self) -> None:
  1191. from urllib3.response import HTTPResponse
  1192. test_data = {
  1193. "https://gitlab.com/inkscape/inkscape/": {
  1194. "location": "https://gitlab.com/inkscape/inkscape.git/",
  1195. "redirect_url": "https://gitlab.com/inkscape/inkscape.git/",
  1196. "refs_data": (
  1197. b"001e# service=git-upload-pack\n00000032"
  1198. b"fb2bebf4919a011f0fd7cec085443d0031228e76 "
  1199. b"HEAD\n0000"
  1200. ),
  1201. },
  1202. "https://github.com/jelmer/dulwich/": {
  1203. "location": "https://github.com/jelmer/dulwich/",
  1204. "redirect_url": "https://github.com/jelmer/dulwich/",
  1205. "refs_data": (
  1206. b"001e# service=git-upload-pack\n00000032"
  1207. b"3ff25e09724aa4d86ea5bca7d5dd0399a3c8bfcf "
  1208. b"HEAD\n0000"
  1209. ),
  1210. },
  1211. # check for absolute-path URI reference as location
  1212. "https://codeberg.org/ashwinvis/radicale-sh.git/": {
  1213. "location": "/ashwinvis/radicale-auth-sh/",
  1214. "redirect_url": "https://codeberg.org/ashwinvis/radicale-auth-sh/",
  1215. "refs_data": (
  1216. b"001e# service=git-upload-pack\n00000032"
  1217. b"470f8603768b608fc988675de2fae8f963c21158 "
  1218. b"HEAD\n0000"
  1219. ),
  1220. },
  1221. }
  1222. tail = "info/refs?service=git-upload-pack"
  1223. # we need to mock urllib3.PoolManager as this test will fail
  1224. # otherwise without an active internet connection
  1225. class PoolManagerMock:
  1226. def __init__(self) -> None:
  1227. self.headers: dict[str, str] = {}
  1228. def request(
  1229. self,
  1230. method,
  1231. url,
  1232. fields=None,
  1233. headers=None,
  1234. redirect=True,
  1235. preload_content=True,
  1236. ):
  1237. base_url = url[: -len(tail)]
  1238. redirect_base_url = test_data[base_url]["location"]
  1239. redirect_url = redirect_base_url + tail
  1240. headers = {
  1241. "Content-Type": "application/x-git-upload-pack-advertisement"
  1242. }
  1243. body = test_data[base_url]["refs_data"]
  1244. # urllib3 handles automatic redirection by default
  1245. status = 200
  1246. request_url = redirect_url
  1247. # simulate urllib3 behavior when redirect parameter is False
  1248. if redirect is False:
  1249. request_url = url
  1250. if redirect_base_url != base_url:
  1251. body = b""
  1252. headers["location"] = test_data[base_url]["location"]
  1253. status = 301
  1254. return HTTPResponse(
  1255. body=BytesIO(body),
  1256. headers=headers,
  1257. request_method=method,
  1258. request_url=request_url,
  1259. preload_content=preload_content,
  1260. status=status,
  1261. )
  1262. pool_manager = PoolManagerMock()
  1263. for base_url in test_data.keys():
  1264. # instantiate HttpGitClient with mocked pool manager
  1265. c = HttpGitClient(base_url, pool_manager=pool_manager, config=None)
  1266. # call method that detects url redirection
  1267. _, _, processed_url, _, _ = c._discover_references(
  1268. b"git-upload-pack", base_url
  1269. )
  1270. # send the same request as the method above without redirection
  1271. resp = c.pool_manager.request("GET", base_url + tail, redirect=False)
  1272. # check expected behavior of urllib3
  1273. redirect_location = resp.get_redirect_location()
  1274. if resp.status == 200:
  1275. self.assertFalse(redirect_location)
  1276. if redirect_location:
  1277. # check that url redirection has been correctly detected
  1278. self.assertEqual(processed_url, test_data[base_url]["redirect_url"])
  1279. else:
  1280. # check also the no redirection case
  1281. self.assertEqual(processed_url, base_url)
  1282. def test_smart_request_content_type_with_directive_check(self) -> None:
  1283. from urllib3.response import HTTPResponse
  1284. # we need to mock urllib3.PoolManager as this test will fail
  1285. # otherwise without an active internet connection
  1286. class PoolManagerMock:
  1287. def __init__(self) -> None:
  1288. self.headers: dict[str, str] = {}
  1289. def request(
  1290. self,
  1291. method,
  1292. url,
  1293. fields=None,
  1294. headers=None,
  1295. redirect=True,
  1296. preload_content=True,
  1297. ):
  1298. return HTTPResponse(
  1299. headers={
  1300. "Content-Type": "application/x-git-upload-pack-result; charset=utf-8"
  1301. },
  1302. request_method=method,
  1303. request_url=url,
  1304. preload_content=preload_content,
  1305. status=200,
  1306. )
  1307. clone_url = "https://hacktivis.me/git/blog.git/"
  1308. client = HttpGitClient(clone_url, pool_manager=PoolManagerMock(), config=None)
  1309. self.assertTrue(client._smart_request("git-upload-pack", clone_url, data=None))
  1310. def test_urllib3_protocol_error(self) -> None:
  1311. from urllib3.exceptions import ProtocolError
  1312. from urllib3.response import HTTPResponse
  1313. error_msg = "protocol error"
  1314. # we need to mock urllib3.PoolManager as this test will fail
  1315. # otherwise without an active internet connection
  1316. class PoolManagerMock:
  1317. def __init__(self) -> None:
  1318. self.headers: dict[str, str] = {}
  1319. def request(
  1320. self,
  1321. method,
  1322. url,
  1323. fields=None,
  1324. headers=None,
  1325. redirect=True,
  1326. preload_content=True,
  1327. ):
  1328. response = HTTPResponse(
  1329. headers={"Content-Type": "application/x-git-upload-pack-result"},
  1330. request_method=method,
  1331. request_url=url,
  1332. preload_content=preload_content,
  1333. status=200,
  1334. )
  1335. def read(self) -> NoReturn:
  1336. raise ProtocolError(error_msg)
  1337. # override HTTPResponse.read to throw urllib3.exceptions.ProtocolError
  1338. response.read = read
  1339. return response
  1340. def check_heads(heads, **kwargs):
  1341. self.assertEqual(heads, {})
  1342. return []
  1343. clone_url = "https://git.example.org/user/project.git/"
  1344. client = HttpGitClient(clone_url, pool_manager=PoolManagerMock(), config=None)
  1345. with self.assertRaises(GitProtocolError, msg=error_msg):
  1346. client.fetch_pack(b"/", check_heads, None, None)
  1347. def test_fetch_pack_dumb_http(self) -> None:
  1348. import zlib
  1349. from urllib3.response import HTTPResponse
  1350. # Mock responses for dumb HTTP
  1351. info_refs_content = (
  1352. b"0123456789abcdef0123456789abcdef01234567\trefs/heads/master\n"
  1353. )
  1354. head_content = b"ref: refs/heads/master"
  1355. # Create a blob object for testing
  1356. blob_content = b"Hello, dumb HTTP!"
  1357. blob_sha = b"0123456789abcdef0123456789abcdef01234567"
  1358. blob_hex = blob_sha.decode("ascii")
  1359. blob_obj_data = (
  1360. b"blob " + str(len(blob_content)).encode() + b"\x00" + blob_content
  1361. )
  1362. blob_compressed = zlib.compress(blob_obj_data)
  1363. responses = {
  1364. "/HEAD": {
  1365. "status": 200,
  1366. "content": head_content,
  1367. "content_type": "text/plain",
  1368. },
  1369. "/git-upload-pack": {
  1370. "status": 404,
  1371. "content": b"Not Found",
  1372. "content_type": "text/plain",
  1373. },
  1374. "/info/refs": {
  1375. "status": 200,
  1376. "content": info_refs_content,
  1377. "content_type": "text/plain",
  1378. },
  1379. f"/objects/{blob_hex[:2]}/{blob_hex[2:]}": {
  1380. "status": 200,
  1381. "content": blob_compressed,
  1382. "content_type": "application/octet-stream",
  1383. },
  1384. }
  1385. class PoolManagerMock:
  1386. def __init__(self) -> None:
  1387. self.headers: dict[str, str] = {}
  1388. def request(
  1389. self,
  1390. method,
  1391. url,
  1392. fields=None,
  1393. headers=None,
  1394. redirect=True,
  1395. preload_content=True,
  1396. ):
  1397. # Extract path from URL
  1398. from urllib.parse import urlparse
  1399. parsed = urlparse(url)
  1400. path = parsed.path.rstrip("/")
  1401. # Find matching response
  1402. for pattern, resp_data in responses.items():
  1403. if path.endswith(pattern):
  1404. return HTTPResponse(
  1405. body=BytesIO(resp_data["content"]),
  1406. headers={
  1407. "Content-Type": resp_data.get(
  1408. "content_type", "text/plain"
  1409. )
  1410. },
  1411. request_method=method,
  1412. request_url=url,
  1413. preload_content=preload_content,
  1414. status=resp_data["status"],
  1415. )
  1416. # Default 404
  1417. return HTTPResponse(
  1418. body=BytesIO(b"Not Found"),
  1419. headers={"Content-Type": "text/plain"},
  1420. request_method=method,
  1421. request_url=url,
  1422. preload_content=preload_content,
  1423. status=404,
  1424. )
  1425. def determine_wants(heads, **kwargs):
  1426. # heads contains the refs with SHA values, just return the SHA we want
  1427. return [heads[b"refs/heads/master"]]
  1428. received_data = []
  1429. def pack_data_handler(data):
  1430. # Collect pack data
  1431. received_data.append(data)
  1432. clone_url = "https://git.example.org/repo.git/"
  1433. client = HttpGitClient(clone_url, pool_manager=PoolManagerMock(), config=None)
  1434. # Mock graph walker that says we don't have anything
  1435. class MockGraphWalker:
  1436. def ack(self, sha):
  1437. return []
  1438. graph_walker = MockGraphWalker()
  1439. result = client.fetch_pack(
  1440. b"/", determine_wants, graph_walker, pack_data_handler
  1441. )
  1442. # Verify we got the refs
  1443. expected_sha = blob_hex.encode("ascii")
  1444. self.assertEqual({b"refs/heads/master": expected_sha}, result.refs)
  1445. # Verify we received pack data
  1446. self.assertTrue(len(received_data) > 0)
  1447. pack_data = b"".join(received_data)
  1448. self.assertTrue(len(pack_data) > 0)
  1449. # The pack should be valid pack format
  1450. self.assertTrue(pack_data.startswith(b"PACK"))
  1451. # Pack header: PACK + version (4 bytes) + num objects (4 bytes)
  1452. self.assertEqual(pack_data[4:8], b"\x00\x00\x00\x02") # version 2
  1453. self.assertEqual(pack_data[8:12], b"\x00\x00\x00\x01") # 1 object
  1454. def test_timeout_configuration(self) -> None:
  1455. """Test that timeout parameter is properly configured."""
  1456. url = "https://github.com/jelmer/dulwich"
  1457. timeout = 30
  1458. c = HttpGitClient(url, timeout=timeout)
  1459. self.assertEqual(c._timeout, timeout)
  1460. def test_timeout_from_config(self) -> None:
  1461. """Test that timeout can be configured via git config."""
  1462. from dulwich.config import ConfigDict
  1463. url = "https://github.com/jelmer/dulwich"
  1464. config = ConfigDict()
  1465. config.set((b"http",), b"timeout", b"25")
  1466. c = HttpGitClient(url, config=config)
  1467. # The timeout should be set on the pool manager
  1468. # Since we can't easily access the timeout from the pool manager,
  1469. # we just verify the client was created successfully
  1470. self.assertIsNotNone(c.pool_manager)
  1471. def test_timeout_parameter_precedence(self) -> None:
  1472. """Test that explicit timeout parameter takes precedence over config."""
  1473. from dulwich.config import ConfigDict
  1474. url = "https://github.com/jelmer/dulwich"
  1475. config = ConfigDict()
  1476. config.set((b"http",), b"timeout", b"25")
  1477. c = HttpGitClient(url, config=config, timeout=15)
  1478. self.assertEqual(c._timeout, 15)
  1479. def test_http_extra_headers_from_config(self) -> None:
  1480. """Test that http.extraHeader config values are applied."""
  1481. from dulwich.config import ConfigDict
  1482. url = "https://github.com/jelmer/dulwich"
  1483. config = ConfigDict()
  1484. # Set a single extra header
  1485. config.set((b"http",), b"extraHeader", b"X-Custom-Header: test-value")
  1486. c = HttpGitClient(url, config=config)
  1487. # Check that the header was added to the pool manager
  1488. self.assertIn("X-Custom-Header", c.pool_manager.headers)
  1489. self.assertEqual(c.pool_manager.headers["X-Custom-Header"], "test-value")
  1490. def test_http_multiple_extra_headers_from_config(self) -> None:
  1491. """Test that multiple http.extraHeader config values are applied."""
  1492. from dulwich.config import ConfigDict
  1493. url = "https://github.com/jelmer/dulwich"
  1494. config = ConfigDict()
  1495. # Set multiple extra headers
  1496. config.set((b"http",), b"extraHeader", b"X-Header-1: value1")
  1497. config.add((b"http",), b"extraHeader", b"X-Header-2: value2")
  1498. config.add((b"http",), b"extraHeader", b"Authorization: Bearer token123")
  1499. c = HttpGitClient(url, config=config)
  1500. # Check that all headers were added to the pool manager
  1501. self.assertIn("X-Header-1", c.pool_manager.headers)
  1502. self.assertEqual(c.pool_manager.headers["X-Header-1"], "value1")
  1503. self.assertIn("X-Header-2", c.pool_manager.headers)
  1504. self.assertEqual(c.pool_manager.headers["X-Header-2"], "value2")
  1505. self.assertIn("Authorization", c.pool_manager.headers)
  1506. self.assertEqual(c.pool_manager.headers["Authorization"], "Bearer token123")
  1507. def test_http_extra_headers_per_url_config(self) -> None:
  1508. """Test that per-URL http.extraHeader config values are applied (issue #882)."""
  1509. from dulwich.config import ConfigDict
  1510. url = "https://github.com/jelmer/dulwich"
  1511. config = ConfigDict()
  1512. # Set URL-specific extra header
  1513. config.set(
  1514. (b"http", b"https://github.com/"),
  1515. b"extraHeader",
  1516. b"Authorization: basic token123",
  1517. )
  1518. c = HttpGitClient(url, config=config)
  1519. # Check that the header was added to the pool manager
  1520. self.assertIn("Authorization", c.pool_manager.headers)
  1521. self.assertEqual(c.pool_manager.headers["Authorization"], "basic token123")
  1522. def test_http_extra_headers_url_specificity(self) -> None:
  1523. """Test that more specific URL configs override less specific ones."""
  1524. from dulwich.config import ConfigDict
  1525. url = "https://github.com/jelmer/dulwich"
  1526. config = ConfigDict()
  1527. # Set global header
  1528. config.set((b"http",), b"extraHeader", b"X-Global: global-value")
  1529. # Set host-specific header (overrides global)
  1530. config.set(
  1531. (b"http", b"https://github.com/"), b"extraHeader", b"X-Global: github-value"
  1532. )
  1533. config.add(
  1534. (b"http", b"https://github.com/"),
  1535. b"extraHeader",
  1536. b"Authorization: Bearer token123",
  1537. )
  1538. c = HttpGitClient(url, config=config)
  1539. # More specific setting should win
  1540. self.assertEqual(c.pool_manager.headers["X-Global"], "github-value")
  1541. self.assertEqual(c.pool_manager.headers["Authorization"], "Bearer token123")
  1542. def test_http_extra_headers_multiple_url_configs(self) -> None:
  1543. """Test that different URLs can have different extra headers."""
  1544. from dulwich.config import ConfigDict
  1545. config = ConfigDict()
  1546. # Set different headers for different URLs
  1547. config.set(
  1548. (b"http", b"https://github.com/"),
  1549. b"extraHeader",
  1550. b"Authorization: Bearer github-token",
  1551. )
  1552. config.set(
  1553. (b"http", b"https://gitlab.com/"),
  1554. b"extraHeader",
  1555. b"Authorization: Bearer gitlab-token",
  1556. )
  1557. # Test GitHub URL
  1558. c1 = HttpGitClient("https://github.com/user/repo", config=config)
  1559. self.assertEqual(
  1560. c1.pool_manager.headers["Authorization"], "Bearer github-token"
  1561. )
  1562. # Test GitLab URL
  1563. c2 = HttpGitClient("https://gitlab.com/user/repo", config=config)
  1564. self.assertEqual(
  1565. c2.pool_manager.headers["Authorization"], "Bearer gitlab-token"
  1566. )
  1567. def test_http_extra_headers_no_match(self) -> None:
  1568. """Test that non-matching URL configs don't apply."""
  1569. from dulwich.config import ConfigDict
  1570. url = "https://example.com/repo"
  1571. config = ConfigDict()
  1572. # Set header only for GitHub
  1573. config.set(
  1574. (b"http", b"https://github.com/"),
  1575. b"extraHeader",
  1576. b"Authorization: Bearer token123",
  1577. )
  1578. c = HttpGitClient(url, config=config)
  1579. # Authorization header should not be present for example.com
  1580. self.assertNotIn("Authorization", c.pool_manager.headers)
  1581. def test_http_extra_headers_invalid_format(self) -> None:
  1582. """Test that invalid extra headers trigger warnings."""
  1583. import logging
  1584. from dulwich.config import ConfigDict
  1585. url = "https://github.com/jelmer/dulwich"
  1586. config = ConfigDict()
  1587. # Set valid header
  1588. config.set((b"http",), b"extraHeader", b"X-Valid: valid-value")
  1589. # Set invalid headers (no colon-space separator)
  1590. config.add((b"http",), b"extraHeader", b"X-Invalid-No-Separator")
  1591. # Set empty header
  1592. config.add((b"http",), b"extraHeader", b"")
  1593. # Set another valid header to verify we continue processing
  1594. config.add((b"http",), b"extraHeader", b"X-Another-Valid: another-value")
  1595. with self.assertLogs("dulwich.client", level=logging.WARNING) as cm:
  1596. c = HttpGitClient(url, config=config)
  1597. # Check that warnings were logged
  1598. self.assertEqual(len(cm.output), 2)
  1599. self.assertIn("missing ': ' separator", cm.output[0])
  1600. self.assertIn("empty http.extraHeader", cm.output[1])
  1601. # Valid headers should still be applied
  1602. self.assertIn("X-Valid", c.pool_manager.headers)
  1603. self.assertEqual(c.pool_manager.headers["X-Valid"], "valid-value")
  1604. self.assertIn("X-Another-Valid", c.pool_manager.headers)
  1605. self.assertEqual(c.pool_manager.headers["X-Another-Valid"], "another-value")
  1606. # Invalid header should not be present
  1607. self.assertNotIn("X-Invalid-No-Separator", c.pool_manager.headers)
  1608. def test_get_url_preserves_credentials_from_url(self) -> None:
  1609. """Test that credentials from URL are preserved in get_url() (issue #1925)."""
  1610. # When credentials come from the URL (not passed explicitly),
  1611. # they should be included in get_url() so they're saved to git config
  1612. username = "ghp_token123"
  1613. url = f"https://{username}@github.com/jelmer/dulwich"
  1614. path = "/jelmer/dulwich"
  1615. c = HttpGitClient.from_parsedurl(urlparse(url))
  1616. reconstructed_url = c.get_url(path)
  1617. # Credentials should be preserved in the URL
  1618. self.assertIn(username, reconstructed_url)
  1619. self.assertEqual(
  1620. f"https://{username}@github.com/jelmer/dulwich", reconstructed_url
  1621. )
  1622. def test_get_url_preserves_credentials_with_password_from_url(self) -> None:
  1623. """Test that username:password from URL are preserved in get_url()."""
  1624. username = "user"
  1625. password = "pass"
  1626. url = f"https://{username}:{password}@github.com/jelmer/dulwich"
  1627. path = "/jelmer/dulwich"
  1628. c = HttpGitClient.from_parsedurl(urlparse(url))
  1629. reconstructed_url = c.get_url(path)
  1630. # Both username and password should be preserved
  1631. self.assertIn(username, reconstructed_url)
  1632. self.assertIn(password, reconstructed_url)
  1633. self.assertEqual(
  1634. f"https://{username}:{password}@github.com/jelmer/dulwich",
  1635. reconstructed_url,
  1636. )
  1637. def test_get_url_preserves_special_chars_in_credentials(self) -> None:
  1638. """Test that special characters in credentials are properly escaped."""
  1639. # URL-encoded credentials with special characters
  1640. original_username = "user@domain"
  1641. original_password = "p@ss:word"
  1642. quoted_username = urlquote(original_username, safe="")
  1643. quoted_password = urlquote(original_password, safe="")
  1644. url = f"https://{quoted_username}:{quoted_password}@github.com/jelmer/dulwich"
  1645. path = "/jelmer/dulwich"
  1646. c = HttpGitClient.from_parsedurl(urlparse(url))
  1647. reconstructed_url = c.get_url(path)
  1648. # The reconstructed URL should have properly escaped credentials
  1649. self.assertIn(quoted_username, reconstructed_url)
  1650. self.assertIn(quoted_password, reconstructed_url)
  1651. # Verify the URL is valid by parsing it back
  1652. parsed = urlparse(reconstructed_url)
  1653. from urllib.parse import unquote
  1654. self.assertEqual(unquote(parsed.username), original_username)
  1655. self.assertEqual(unquote(parsed.password), original_password)
  1656. def test_get_url_explicit_credentials_not_in_url(self) -> None:
  1657. """Test that explicitly passed credentials are NOT included in get_url()."""
  1658. # When credentials are passed explicitly (not from URL),
  1659. # they should NOT appear in get_url() for security
  1660. base_url = "https://github.com/jelmer/dulwich"
  1661. path = "/jelmer/dulwich"
  1662. username = "explicit_user"
  1663. password = "explicit_pass"
  1664. c = HttpGitClient(base_url, username=username, password=password)
  1665. url = c.get_url(path)
  1666. # Credentials should NOT be in the URL
  1667. self.assertNotIn(username, url)
  1668. self.assertNotIn(password, url)
  1669. self.assertEqual("https://github.com/jelmer/dulwich", url)
  1670. def test_pool_manager_parameter(self) -> None:
  1671. """Test that pool_manager parameter is properly passed through."""
  1672. import urllib3
  1673. # Create a custom pool manager
  1674. custom_pool_manager = urllib3.PoolManager()
  1675. # Test with get_transport_and_path_from_url
  1676. url = "https://github.com/jelmer/dulwich"
  1677. client, _path = get_transport_and_path_from_url(
  1678. url, pool_manager=custom_pool_manager
  1679. )
  1680. # Verify the client is an HTTP client and has our custom pool manager
  1681. self.assertIsInstance(client, HttpGitClient)
  1682. self.assertIs(client.pool_manager, custom_pool_manager)
  1683. # Test with get_transport_and_path
  1684. client2, _path2 = get_transport_and_path(url, pool_manager=custom_pool_manager)
  1685. # Verify the client is an HTTP client and has our custom pool manager
  1686. self.assertIsInstance(client2, HttpGitClient)
  1687. self.assertIs(client2.pool_manager, custom_pool_manager)
  1688. def test_urllib3_subclass_support(self) -> None:
  1689. """Test that subclasses of Urllib3HttpGitClient are properly supported.
  1690. This test verifies that the bug fix for commit d1f41c5c works correctly.
  1691. Previously, the code used `cls is Urllib3HttpGitClient` which failed for
  1692. subclasses. Now it uses `issubclass(cls, Urllib3HttpGitClient)` which
  1693. correctly handles subclasses.
  1694. """
  1695. # Create a custom subclass of Urllib3HttpGitClient
  1696. class CustomUrllib3HttpGitClient(Urllib3HttpGitClient):
  1697. def __init__(self, *args, **kwargs):
  1698. super().__init__(*args, **kwargs)
  1699. self.custom_attribute = "custom_value"
  1700. # Test with AbstractHttpGitClient.from_parsedurl directly
  1701. # This is how subclasses use the client
  1702. from urllib.parse import urlparse
  1703. parsed = urlparse("https://github.com/jelmer/dulwich")
  1704. config = ConfigDict()
  1705. client = CustomUrllib3HttpGitClient.from_parsedurl(parsed, config=config)
  1706. # Verify the client is our custom subclass
  1707. self.assertIsInstance(client, CustomUrllib3HttpGitClient)
  1708. self.assertIsInstance(client, Urllib3HttpGitClient)
  1709. self.assertEqual("custom_value", client.custom_attribute)
  1710. # Verify the config was passed through (this was the bug - it wasn't passed to subclasses before)
  1711. self.assertIsNotNone(client.config)
  1712. class TCPGitClientTests(TestCase):
  1713. def test_get_url(self) -> None:
  1714. host = "github.com"
  1715. path = "/jelmer/dulwich"
  1716. c = TCPGitClient(host)
  1717. url = c.get_url(path)
  1718. self.assertEqual("git://github.com/jelmer/dulwich", url)
  1719. def test_get_url_with_port(self) -> None:
  1720. host = "github.com"
  1721. path = "/jelmer/dulwich"
  1722. port = 9090
  1723. c = TCPGitClient(host, port=port)
  1724. url = c.get_url(path)
  1725. self.assertEqual("git://github.com:9090/jelmer/dulwich", url)
  1726. def test_get_url_with_ipv6(self) -> None:
  1727. host = "::1"
  1728. path = "/jelmer/dulwich"
  1729. c = TCPGitClient(host)
  1730. url = c.get_url(path)
  1731. self.assertEqual("git://[::1]/jelmer/dulwich", url)
  1732. def test_get_url_with_ipv6_and_port(self) -> None:
  1733. host = "2001:db8::1"
  1734. path = "/jelmer/dulwich"
  1735. port = 9090
  1736. c = TCPGitClient(host, port=port)
  1737. url = c.get_url(path)
  1738. self.assertEqual("git://[2001:db8::1]:9090/jelmer/dulwich", url)
  1739. def test_get_url_with_ipv6_default_port(self) -> None:
  1740. host = "2001:db8::1"
  1741. path = "/jelmer/dulwich"
  1742. port = TCP_GIT_PORT # Default port should not be included in URL
  1743. c = TCPGitClient(host, port=port)
  1744. url = c.get_url(path)
  1745. self.assertEqual("git://[2001:db8::1]/jelmer/dulwich", url)
  1746. class DefaultUrllib3ManagerTest(TestCase):
  1747. def test_no_config(self) -> None:
  1748. manager = default_urllib3_manager(config=None)
  1749. self.assertEqual(manager.connection_pool_kw["cert_reqs"], "CERT_REQUIRED")
  1750. def test_config_no_proxy(self) -> None:
  1751. import urllib3
  1752. manager = default_urllib3_manager(config=ConfigDict())
  1753. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  1754. self.assertIsInstance(manager, urllib3.PoolManager)
  1755. def test_config_no_proxy_custom_cls(self) -> None:
  1756. import urllib3
  1757. class CustomPoolManager(urllib3.PoolManager):
  1758. pass
  1759. manager = default_urllib3_manager(
  1760. config=ConfigDict(), pool_manager_cls=CustomPoolManager
  1761. )
  1762. self.assertIsInstance(manager, CustomPoolManager)
  1763. def test_config_ssl(self) -> None:
  1764. config = ConfigDict()
  1765. config.set(b"http", b"sslVerify", b"true")
  1766. manager = default_urllib3_manager(config=config)
  1767. self.assertEqual(manager.connection_pool_kw["cert_reqs"], "CERT_REQUIRED")
  1768. def test_config_no_ssl(self) -> None:
  1769. config = ConfigDict()
  1770. config.set(b"http", b"sslVerify", b"false")
  1771. manager = default_urllib3_manager(config=config)
  1772. self.assertEqual(manager.connection_pool_kw["cert_reqs"], "CERT_NONE")
  1773. def test_config_proxy(self) -> None:
  1774. import urllib3
  1775. config = ConfigDict()
  1776. config.set(b"http", b"proxy", b"http://localhost:3128/")
  1777. manager = default_urllib3_manager(config=config)
  1778. self.assertIsInstance(manager, urllib3.ProxyManager)
  1779. self.assertTrue(hasattr(manager, "proxy"))
  1780. self.assertEqual(manager.proxy.scheme, "http")
  1781. self.assertEqual(manager.proxy.host, "localhost")
  1782. self.assertEqual(manager.proxy.port, 3128)
  1783. def test_environment_proxy(self) -> None:
  1784. import urllib3
  1785. config = ConfigDict()
  1786. self.overrideEnv("http_proxy", "http://myproxy:8080")
  1787. manager = default_urllib3_manager(config=config)
  1788. self.assertIsInstance(manager, urllib3.ProxyManager)
  1789. self.assertTrue(hasattr(manager, "proxy"))
  1790. self.assertEqual(manager.proxy.scheme, "http")
  1791. self.assertEqual(manager.proxy.host, "myproxy")
  1792. self.assertEqual(manager.proxy.port, 8080)
  1793. def test_environment_empty_proxy(self) -> None:
  1794. import urllib3
  1795. config = ConfigDict()
  1796. self.overrideEnv("http_proxy", "")
  1797. manager = default_urllib3_manager(config=config)
  1798. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  1799. self.assertIsInstance(manager, urllib3.PoolManager)
  1800. def test_environment_no_proxy_1(self) -> None:
  1801. import urllib3
  1802. config = ConfigDict()
  1803. self.overrideEnv("http_proxy", "http://myproxy:8080")
  1804. self.overrideEnv("no_proxy", "xyz,abc.def.gh,abc.gh")
  1805. base_url = "http://xyz.abc.def.gh:8080/path/port"
  1806. manager = default_urllib3_manager(config=config, base_url=base_url)
  1807. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  1808. self.assertIsInstance(manager, urllib3.PoolManager)
  1809. def test_environment_no_proxy_2(self) -> None:
  1810. import urllib3
  1811. config = ConfigDict()
  1812. self.overrideEnv("http_proxy", "http://myproxy:8080")
  1813. self.overrideEnv("no_proxy", "xyz,abc.def.gh,abc.gh,ample.com")
  1814. base_url = "http://ample.com/path/port"
  1815. manager = default_urllib3_manager(config=config, base_url=base_url)
  1816. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  1817. self.assertIsInstance(manager, urllib3.PoolManager)
  1818. def test_environment_no_proxy_3(self) -> None:
  1819. import urllib3
  1820. config = ConfigDict()
  1821. self.overrideEnv("http_proxy", "http://myproxy:8080")
  1822. self.overrideEnv("no_proxy", "xyz,abc.def.gh,abc.gh,ample.com")
  1823. base_url = "http://ample.com:80/path/port"
  1824. manager = default_urllib3_manager(config=config, base_url=base_url)
  1825. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  1826. self.assertIsInstance(manager, urllib3.PoolManager)
  1827. def test_environment_no_proxy_4(self) -> None:
  1828. import urllib3
  1829. config = ConfigDict()
  1830. self.overrideEnv("http_proxy", "http://myproxy:8080")
  1831. self.overrideEnv("no_proxy", "xyz,abc.def.gh,abc.gh,ample.com")
  1832. base_url = "http://www.ample.com/path/port"
  1833. manager = default_urllib3_manager(config=config, base_url=base_url)
  1834. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  1835. self.assertIsInstance(manager, urllib3.PoolManager)
  1836. def test_environment_no_proxy_5(self) -> None:
  1837. import urllib3
  1838. config = ConfigDict()
  1839. self.overrideEnv("http_proxy", "http://myproxy:8080")
  1840. self.overrideEnv("no_proxy", "xyz,abc.def.gh,abc.gh,ample.com")
  1841. base_url = "http://www.example.com/path/port"
  1842. manager = default_urllib3_manager(config=config, base_url=base_url)
  1843. self.assertIsInstance(manager, urllib3.ProxyManager)
  1844. self.assertTrue(hasattr(manager, "proxy"))
  1845. self.assertEqual(manager.proxy.scheme, "http")
  1846. self.assertEqual(manager.proxy.host, "myproxy")
  1847. self.assertEqual(manager.proxy.port, 8080)
  1848. def test_environment_no_proxy_6(self) -> None:
  1849. import urllib3
  1850. config = ConfigDict()
  1851. self.overrideEnv("http_proxy", "http://myproxy:8080")
  1852. self.overrideEnv("no_proxy", "xyz,abc.def.gh,abc.gh,ample.com")
  1853. base_url = "http://ample.com.org/path/port"
  1854. manager = default_urllib3_manager(config=config, base_url=base_url)
  1855. self.assertIsInstance(manager, urllib3.ProxyManager)
  1856. self.assertTrue(hasattr(manager, "proxy"))
  1857. self.assertEqual(manager.proxy.scheme, "http")
  1858. self.assertEqual(manager.proxy.host, "myproxy")
  1859. self.assertEqual(manager.proxy.port, 8080)
  1860. def test_environment_no_proxy_ipv4_address_1(self) -> None:
  1861. import urllib3
  1862. config = ConfigDict()
  1863. self.overrideEnv("http_proxy", "http://myproxy:8080")
  1864. self.overrideEnv("no_proxy", "xyz,abc.def.gh,192.168.0.10,ample.com")
  1865. base_url = "http://192.168.0.10/path/port"
  1866. manager = default_urllib3_manager(config=config, base_url=base_url)
  1867. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  1868. self.assertIsInstance(manager, urllib3.PoolManager)
  1869. def test_environment_no_proxy_ipv4_address_2(self) -> None:
  1870. import urllib3
  1871. config = ConfigDict()
  1872. self.overrideEnv("http_proxy", "http://myproxy:8080")
  1873. self.overrideEnv("no_proxy", "xyz,abc.def.gh,192.168.0.10,ample.com")
  1874. base_url = "http://192.168.0.10:8888/path/port"
  1875. manager = default_urllib3_manager(config=config, base_url=base_url)
  1876. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  1877. self.assertIsInstance(manager, urllib3.PoolManager)
  1878. def test_environment_no_proxy_ipv4_address_3(self) -> None:
  1879. import urllib3
  1880. config = ConfigDict()
  1881. self.overrideEnv("http_proxy", "http://myproxy:8080")
  1882. self.overrideEnv(
  1883. "no_proxy", "xyz,abc.def.gh,ff80:1::/64,192.168.0.0/24,ample.com"
  1884. )
  1885. base_url = "http://192.168.0.10/path/port"
  1886. manager = default_urllib3_manager(config=config, base_url=base_url)
  1887. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  1888. self.assertIsInstance(manager, urllib3.PoolManager)
  1889. def test_environment_no_proxy_ipv6_address_1(self) -> None:
  1890. import urllib3
  1891. config = ConfigDict()
  1892. self.overrideEnv("http_proxy", "http://myproxy:8080")
  1893. self.overrideEnv("no_proxy", "xyz,abc.def.gh,ff80:1::affe,ample.com")
  1894. base_url = "http://[ff80:1::affe]/path/port"
  1895. manager = default_urllib3_manager(config=config, base_url=base_url)
  1896. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  1897. self.assertIsInstance(manager, urllib3.PoolManager)
  1898. def test_environment_no_proxy_ipv6_address_2(self) -> None:
  1899. import urllib3
  1900. config = ConfigDict()
  1901. self.overrideEnv("http_proxy", "http://myproxy:8080")
  1902. self.overrideEnv("no_proxy", "xyz,abc.def.gh,ff80:1::affe,ample.com")
  1903. base_url = "http://[ff80:1::affe]:1234/path/port"
  1904. manager = default_urllib3_manager(config=config, base_url=base_url)
  1905. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  1906. self.assertIsInstance(manager, urllib3.PoolManager)
  1907. def test_environment_no_proxy_ipv6_address_3(self) -> None:
  1908. import urllib3
  1909. config = ConfigDict()
  1910. self.overrideEnv("http_proxy", "http://myproxy:8080")
  1911. self.overrideEnv(
  1912. "no_proxy", "xyz,abc.def.gh,192.168.0.0/24,ff80:1::/64,ample.com"
  1913. )
  1914. base_url = "http://[ff80:1::affe]/path/port"
  1915. manager = default_urllib3_manager(config=config, base_url=base_url)
  1916. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  1917. self.assertIsInstance(manager, urllib3.PoolManager)
  1918. def test_config_proxy_custom_cls(self) -> None:
  1919. import urllib3
  1920. class CustomProxyManager(urllib3.ProxyManager):
  1921. pass
  1922. config = ConfigDict()
  1923. config.set(b"http", b"proxy", b"http://localhost:3128/")
  1924. manager = default_urllib3_manager(
  1925. config=config, proxy_manager_cls=CustomProxyManager
  1926. )
  1927. self.assertIsInstance(manager, CustomProxyManager)
  1928. def test_config_proxy_creds(self) -> None:
  1929. import urllib3
  1930. config = ConfigDict()
  1931. config.set(b"http", b"proxy", b"http://jelmer:example@localhost:3128/")
  1932. manager = default_urllib3_manager(config=config)
  1933. assert isinstance(manager, urllib3.ProxyManager)
  1934. self.assertEqual(
  1935. manager.proxy_headers, {"proxy-authorization": "Basic amVsbWVyOmV4YW1wbGU="}
  1936. )
  1937. def test_config_no_verify_ssl(self) -> None:
  1938. manager = default_urllib3_manager(config=None, cert_reqs="CERT_NONE")
  1939. self.assertEqual(manager.connection_pool_kw["cert_reqs"], "CERT_NONE")
  1940. def test_timeout_parameter(self) -> None:
  1941. """Test that timeout parameter is passed to urllib3 manager."""
  1942. timeout = 30
  1943. manager = default_urllib3_manager(config=None, timeout=timeout)
  1944. self.assertEqual(manager.connection_pool_kw["timeout"], timeout)
  1945. def test_timeout_from_config(self) -> None:
  1946. """Test that timeout can be configured via git config."""
  1947. from dulwich.config import ConfigDict
  1948. config = ConfigDict()
  1949. config.set((b"http",), b"timeout", b"25")
  1950. manager = default_urllib3_manager(config=config)
  1951. self.assertEqual(manager.connection_pool_kw["timeout"], 25)
  1952. def test_timeout_parameter_precedence(self) -> None:
  1953. """Test that explicit timeout parameter takes precedence over config."""
  1954. from dulwich.config import ConfigDict
  1955. config = ConfigDict()
  1956. config.set((b"http",), b"timeout", b"25")
  1957. manager = default_urllib3_manager(config=config, timeout=15)
  1958. self.assertEqual(manager.connection_pool_kw["timeout"], 15)
  1959. class SubprocessSSHVendorTests(TestCase):
  1960. def setUp(self) -> None:
  1961. # Monkey Patch client subprocess popen
  1962. self._orig_popen = dulwich.client.subprocess.Popen
  1963. dulwich.client.subprocess.Popen = DummyPopen
  1964. def tearDown(self) -> None:
  1965. dulwich.client.subprocess.Popen = self._orig_popen
  1966. def test_run_command_dashes(self) -> None:
  1967. vendor = SubprocessSSHVendor()
  1968. self.assertRaises(
  1969. StrangeHostname,
  1970. vendor.run_command,
  1971. "--weird-host",
  1972. "git-clone-url",
  1973. )
  1974. def test_run_command_password(self) -> None:
  1975. vendor = SubprocessSSHVendor()
  1976. self.assertRaises(
  1977. NotImplementedError,
  1978. vendor.run_command,
  1979. "host",
  1980. "git-clone-url",
  1981. password="12345",
  1982. )
  1983. def test_run_command_password_and_privkey(self) -> None:
  1984. vendor = SubprocessSSHVendor()
  1985. self.assertRaises(
  1986. NotImplementedError,
  1987. vendor.run_command,
  1988. "host",
  1989. "git-clone-url",
  1990. password="12345",
  1991. key_filename="/tmp/id_rsa",
  1992. )
  1993. def test_run_command_with_port_username_and_privkey(self) -> None:
  1994. expected = [
  1995. "ssh",
  1996. "-x",
  1997. "-p",
  1998. "2200",
  1999. "-i",
  2000. "/tmp/id_rsa",
  2001. ]
  2002. if DEFAULT_GIT_PROTOCOL_VERSION_FETCH:
  2003. expected += [
  2004. "-o",
  2005. f"SetEnv GIT_PROTOCOL=version={DEFAULT_GIT_PROTOCOL_VERSION_FETCH}",
  2006. ]
  2007. expected += [
  2008. "user@host",
  2009. "git-clone-url",
  2010. ]
  2011. vendor = SubprocessSSHVendor()
  2012. command = vendor.run_command(
  2013. "host",
  2014. "git-clone-url",
  2015. username="user",
  2016. port="2200",
  2017. key_filename="/tmp/id_rsa",
  2018. )
  2019. args = command.proc.args
  2020. self.assertListEqual(expected, args[0])
  2021. def test_run_with_ssh_command(self) -> None:
  2022. expected = [
  2023. "/path/to/ssh",
  2024. "-o",
  2025. "Option=Value",
  2026. "-x",
  2027. ]
  2028. if DEFAULT_GIT_PROTOCOL_VERSION_FETCH:
  2029. expected += [
  2030. "-o",
  2031. f"SetEnv GIT_PROTOCOL=version={DEFAULT_GIT_PROTOCOL_VERSION_FETCH}",
  2032. ]
  2033. expected += [
  2034. "host",
  2035. "git-clone-url",
  2036. ]
  2037. vendor = SubprocessSSHVendor()
  2038. command = vendor.run_command(
  2039. "host",
  2040. "git-clone-url",
  2041. ssh_command="/path/to/ssh -o Option=Value",
  2042. )
  2043. args = command.proc.args
  2044. self.assertListEqual(expected, args[0])
  2045. class PLinkSSHVendorTests(TestCase):
  2046. def setUp(self) -> None:
  2047. # Monkey Patch client subprocess popen
  2048. self._orig_popen = dulwich.client.subprocess.Popen
  2049. dulwich.client.subprocess.Popen = DummyPopen
  2050. def tearDown(self) -> None:
  2051. dulwich.client.subprocess.Popen = self._orig_popen
  2052. def test_run_command_dashes(self) -> None:
  2053. vendor = PLinkSSHVendor()
  2054. self.assertRaises(
  2055. StrangeHostname,
  2056. vendor.run_command,
  2057. "--weird-host",
  2058. "git-clone-url",
  2059. )
  2060. def test_run_command_password_and_privkey(self) -> None:
  2061. vendor = PLinkSSHVendor()
  2062. warnings.simplefilter("always", UserWarning)
  2063. self.addCleanup(warnings.resetwarnings)
  2064. warnings_list, restore_warnings = setup_warning_catcher()
  2065. self.addCleanup(restore_warnings)
  2066. command = vendor.run_command(
  2067. "host",
  2068. "git-clone-url",
  2069. password="12345",
  2070. key_filename="/tmp/id_rsa",
  2071. )
  2072. expected_warning = UserWarning(
  2073. "Invoking PLink with a password exposes the password in the process list."
  2074. )
  2075. for w in warnings_list:
  2076. if type(w) is type(expected_warning) and w.args == expected_warning.args:
  2077. break
  2078. else:
  2079. raise AssertionError(
  2080. f"Expected warning {expected_warning!r} not in {warnings_list!r}"
  2081. )
  2082. args = command.proc.args
  2083. if sys.platform == "win32":
  2084. binary = ["plink.exe", "-ssh"]
  2085. else:
  2086. binary = ["plink", "-ssh"]
  2087. expected = [
  2088. *binary,
  2089. "-pw",
  2090. "12345",
  2091. "-i",
  2092. "/tmp/id_rsa",
  2093. "host",
  2094. "git-clone-url",
  2095. ]
  2096. self.assertListEqual(expected, args[0])
  2097. def test_run_command_password(self) -> None:
  2098. if sys.platform == "win32":
  2099. binary = ["plink.exe", "-ssh"]
  2100. else:
  2101. binary = ["plink", "-ssh"]
  2102. expected = [*binary, "-pw", "12345", "host", "git-clone-url"]
  2103. vendor = PLinkSSHVendor()
  2104. warnings.simplefilter("always", UserWarning)
  2105. self.addCleanup(warnings.resetwarnings)
  2106. warnings_list, restore_warnings = setup_warning_catcher()
  2107. self.addCleanup(restore_warnings)
  2108. command = vendor.run_command("host", "git-clone-url", password="12345")
  2109. expected_warning = UserWarning(
  2110. "Invoking PLink with a password exposes the password in the process list."
  2111. )
  2112. for w in warnings_list:
  2113. if type(w) is type(expected_warning) and w.args == expected_warning.args:
  2114. break
  2115. else:
  2116. raise AssertionError(
  2117. f"Expected warning {expected_warning!r} not in {warnings_list!r}"
  2118. )
  2119. args = command.proc.args
  2120. self.assertListEqual(expected, args[0])
  2121. def test_run_command_with_port_username_and_privkey(self) -> None:
  2122. if sys.platform == "win32":
  2123. binary = ["plink.exe", "-ssh"]
  2124. else:
  2125. binary = ["plink", "-ssh"]
  2126. expected = [
  2127. *binary,
  2128. "-P",
  2129. "2200",
  2130. "-i",
  2131. "/tmp/id_rsa",
  2132. "user@host",
  2133. "git-clone-url",
  2134. ]
  2135. vendor = PLinkSSHVendor()
  2136. command = vendor.run_command(
  2137. "host",
  2138. "git-clone-url",
  2139. username="user",
  2140. port="2200",
  2141. key_filename="/tmp/id_rsa",
  2142. )
  2143. args = command.proc.args
  2144. self.assertListEqual(expected, args[0])
  2145. def test_run_with_ssh_command(self) -> None:
  2146. expected = [
  2147. "/path/to/plink",
  2148. "-ssh",
  2149. "host",
  2150. "git-clone-url",
  2151. ]
  2152. vendor = PLinkSSHVendor()
  2153. command = vendor.run_command(
  2154. "host",
  2155. "git-clone-url",
  2156. ssh_command="/path/to/plink",
  2157. )
  2158. args = command.proc.args
  2159. self.assertListEqual(expected, args[0])
  2160. class RsyncUrlTests(TestCase):
  2161. def test_simple(self) -> None:
  2162. self.assertEqual(parse_rsync_url("foo:bar/path"), (None, "foo", "bar/path"))
  2163. self.assertEqual(
  2164. parse_rsync_url("user@foo:bar/path"), ("user", "foo", "bar/path")
  2165. )
  2166. def test_path(self) -> None:
  2167. self.assertRaises(ValueError, parse_rsync_url, "/path")
  2168. class CheckWantsTests(TestCase):
  2169. def test_fine(self) -> None:
  2170. check_wants(
  2171. [b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262"],
  2172. {b"refs/heads/blah": b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262"},
  2173. )
  2174. def test_missing(self) -> None:
  2175. self.assertRaises(
  2176. InvalidWants,
  2177. check_wants,
  2178. [b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262"],
  2179. {b"refs/heads/blah": b"3f3dc7a53fb752a6961d3a56683df46d4d3bf262"},
  2180. )
  2181. def test_annotated(self) -> None:
  2182. self.assertRaises(
  2183. InvalidWants,
  2184. check_wants,
  2185. [b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262"],
  2186. {
  2187. b"refs/heads/blah": b"3f3dc7a53fb752a6961d3a56683df46d4d3bf262",
  2188. b"refs/heads/blah^{}": b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262",
  2189. },
  2190. )
  2191. class FetchPackResultTests(TestCase):
  2192. def test_eq(self) -> None:
  2193. self.assertEqual(
  2194. FetchPackResult(
  2195. {b"refs/heads/master": b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262"},
  2196. {},
  2197. b"user/agent",
  2198. ),
  2199. FetchPackResult(
  2200. {b"refs/heads/master": b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262"},
  2201. {},
  2202. b"user/agent",
  2203. ),
  2204. )
  2205. class GitCredentialStoreTests(TestCase):
  2206. @classmethod
  2207. def setUpClass(cls) -> None:
  2208. with tempfile.NamedTemporaryFile(delete=False) as f:
  2209. f.write(b"https://user:pass@example.org\n")
  2210. cls.fname = f.name
  2211. @classmethod
  2212. def tearDownClass(cls) -> None:
  2213. os.unlink(cls.fname)
  2214. def test_nonmatching_scheme(self) -> None:
  2215. result = list(
  2216. get_credentials_from_store("http", "example.org", fnames=[self.fname])
  2217. )
  2218. self.assertEqual(result, [])
  2219. def test_nonmatching_hostname(self) -> None:
  2220. result = list(
  2221. get_credentials_from_store("https", "noentry.org", fnames=[self.fname])
  2222. )
  2223. self.assertEqual(result, [])
  2224. def test_match_without_username(self) -> None:
  2225. result = list(
  2226. get_credentials_from_store("https", "example.org", fnames=[self.fname])
  2227. )
  2228. self.assertEqual(result, [("user", "pass")])
  2229. def test_match_with_matching_username(self) -> None:
  2230. result = list(
  2231. get_credentials_from_store(
  2232. "https", "example.org", "user", fnames=[self.fname]
  2233. )
  2234. )
  2235. self.assertEqual(result, [("user", "pass")])
  2236. def test_no_match_with_nonmatching_username(self) -> None:
  2237. result = list(
  2238. get_credentials_from_store(
  2239. "https", "example.org", "otheruser", fnames=[self.fname]
  2240. )
  2241. )
  2242. self.assertEqual(result, [])
  2243. class RemoteErrorFromStderrTests(TestCase):
  2244. def test_nothing(self) -> None:
  2245. self.assertEqual(_remote_error_from_stderr(None), HangupException())
  2246. def test_error_line(self) -> None:
  2247. b = BytesIO(
  2248. b"""\
  2249. This is some random output.
  2250. ERROR: This is the actual error
  2251. with a tail
  2252. """
  2253. )
  2254. self.assertEqual(
  2255. _remote_error_from_stderr(b),
  2256. GitProtocolError("This is the actual error"),
  2257. )
  2258. def test_no_error_line(self) -> None:
  2259. b = BytesIO(
  2260. b"""\
  2261. This is output without an error line.
  2262. And this line is just random noise, too.
  2263. """
  2264. )
  2265. self.assertEqual(
  2266. _remote_error_from_stderr(b),
  2267. HangupException(
  2268. [
  2269. b"This is output without an error line.",
  2270. b"And this line is just random noise, too.",
  2271. ]
  2272. ),
  2273. )
  2274. class TestExtractAgentAndSymrefs(TestCase):
  2275. def test_extract_agent_and_symrefs(self) -> None:
  2276. (symrefs, agent) = _extract_symrefs_and_agent(
  2277. [b"agent=git/2.31.1", b"symref=HEAD:refs/heads/master"]
  2278. )
  2279. self.assertEqual(agent, b"git/2.31.1")
  2280. self.assertEqual(symrefs, {b"HEAD": b"refs/heads/master"})