test_client.py 111 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080
  1. # test_client.py -- Tests for the git protocol, client side
  2. # Copyright (C) 2009 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. import base64
  22. import os
  23. import shutil
  24. import sys
  25. import tempfile
  26. import warnings
  27. from io import BytesIO
  28. from typing import NoReturn
  29. from unittest.mock import MagicMock, Mock, patch
  30. from urllib.parse import quote as urlquote
  31. from urllib.parse import urlparse
  32. import dulwich
  33. from dulwich import client
  34. from dulwich.bundle import create_bundle_from_repo, write_bundle
  35. from dulwich.client import (
  36. AuthCallbackPoolManager,
  37. BundleClient,
  38. FetchPackResult,
  39. GitProtocolError,
  40. HangupException,
  41. HttpGitClient,
  42. InvalidWants,
  43. LocalGitClient,
  44. PLinkSSHVendor,
  45. ReportStatusParser,
  46. SendPackError,
  47. SSHGitClient,
  48. StrangeHostname,
  49. SubprocessSSHVendor,
  50. TCPGitClient,
  51. TraditionalGitClient,
  52. Urllib3HttpGitClient,
  53. _extract_symrefs_and_agent,
  54. _remote_error_from_stderr,
  55. _win32_url_to_path,
  56. check_wants,
  57. default_urllib3_manager,
  58. get_credentials_from_store,
  59. get_transport_and_path,
  60. get_transport_and_path_from_url,
  61. parse_rsync_url,
  62. )
  63. from dulwich.config import ConfigDict
  64. from dulwich.object_format import DEFAULT_OBJECT_FORMAT
  65. from dulwich.objects import ZERO_SHA, Blob, Commit, Tree
  66. from dulwich.pack import pack_objects_to_data, write_pack_data, write_pack_objects
  67. from dulwich.protocol import DEFAULT_GIT_PROTOCOL_VERSION_FETCH, TCP_GIT_PORT, Protocol
  68. from dulwich.repo import MemoryRepo, Repo
  69. from dulwich.tests.utils import open_repo, setup_warning_catcher, tear_down_repo
  70. from . import TestCase, skipIf
  71. class DummyClient(TraditionalGitClient):
  72. def __init__(self, can_read, read, write) -> None:
  73. self.can_read = can_read
  74. self.read = read
  75. self.write = write
  76. TraditionalGitClient.__init__(self)
  77. def _connect(self, service, path, protocol_version=None):
  78. return Protocol(self.read, self.write), self.can_read, None
  79. class DummyPopen:
  80. def __init__(self, *args, **kwards) -> None:
  81. self.stdin = BytesIO(b"stdin")
  82. self.stdout = BytesIO(b"stdout")
  83. self.stderr = BytesIO(b"stderr")
  84. self.returncode = 0
  85. self.args = args
  86. self.kwargs = kwards
  87. def communicate(self, *args, **kwards):
  88. return ("Running", "")
  89. def wait(self, *args, **kwards) -> bool:
  90. return False
  91. # TODO(durin42): add unit-level tests of GitClient
  92. class GitClientTests(TestCase):
  93. def setUp(self) -> None:
  94. super().setUp()
  95. self.rout = BytesIO()
  96. self.rin = BytesIO()
  97. self.client = DummyClient(lambda x: True, self.rin.read, self.rout.write)
  98. def test_caps(self) -> None:
  99. agent_cap = "agent=dulwich/{}.{}.{}".format(*dulwich.__version__).encode(
  100. "ascii"
  101. )
  102. self.assertEqual(
  103. {
  104. b"multi_ack",
  105. b"side-band-64k",
  106. b"ofs-delta",
  107. b"thin-pack",
  108. b"multi_ack_detailed",
  109. b"shallow",
  110. agent_cap,
  111. },
  112. set(self.client._fetch_capabilities),
  113. )
  114. self.assertEqual(
  115. {
  116. b"delete-refs",
  117. b"ofs-delta",
  118. b"report-status",
  119. b"side-band-64k",
  120. agent_cap,
  121. },
  122. set(self.client._send_capabilities),
  123. )
  124. def test_archive_ack(self) -> None:
  125. self.rin.write(b"0009NACK\n0000")
  126. self.rin.seek(0)
  127. self.client.archive(b"bla", b"HEAD", None, None)
  128. self.assertEqual(self.rout.getvalue(), b"0011argument HEAD0000")
  129. def test_fetch_empty(self) -> None:
  130. self.rin.write(b"0000")
  131. self.rin.seek(0)
  132. def check_heads(heads, **kwargs):
  133. self.assertEqual(heads, {})
  134. return []
  135. ret = self.client.fetch_pack(b"/", check_heads, None, None)
  136. self.assertEqual({}, ret.refs)
  137. self.assertEqual({}, ret.symrefs)
  138. def test_fetch_pack_ignores_magic_ref(self) -> None:
  139. self.rin.write(
  140. b"00000000000000000000000000000000000000000000 capabilities^{}"
  141. b"\x00 multi_ack "
  142. b"thin-pack side-band side-band-64k ofs-delta shallow no-progress "
  143. b"include-tag\n"
  144. b"0000"
  145. )
  146. self.rin.seek(0)
  147. def check_heads(heads, **kwargs):
  148. self.assertEqual({}, heads)
  149. return []
  150. ret = self.client.fetch_pack(b"bla", check_heads, None, None, None)
  151. self.assertEqual({}, ret.refs)
  152. self.assertEqual({}, ret.symrefs)
  153. self.assertEqual(self.rout.getvalue(), b"0000")
  154. def test_fetch_pack_none(self) -> None:
  155. self.rin.write(
  156. b"008855dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7 HEAD\x00multi_ack "
  157. b"thin-pack side-band side-band-64k ofs-delta shallow no-progress "
  158. b"include-tag\n"
  159. b"0000"
  160. )
  161. self.rin.seek(0)
  162. ret = self.client.fetch_pack(
  163. b"bla", lambda heads, depth=None: [], None, None, None
  164. )
  165. self.assertEqual(
  166. {b"HEAD": b"55dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7"}, ret.refs
  167. )
  168. self.assertEqual({}, ret.symrefs)
  169. self.assertEqual(self.rout.getvalue(), b"0000")
  170. def test_handle_upload_pack_head_deepen_since(self) -> None:
  171. # Test that deepen-since command is properly sent
  172. from dulwich.client import _handle_upload_pack_head
  173. self.rin.write(b"0008NAK\n0000")
  174. self.rin.seek(0)
  175. class DummyGraphWalker:
  176. def __iter__(self):
  177. return self
  178. def __next__(self):
  179. return None
  180. proto = Protocol(self.rin.read, self.rout.write)
  181. capabilities = [b"shallow", b"deepen-since"]
  182. wants = [b"55dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7"]
  183. graph_walker = DummyGraphWalker()
  184. _handle_upload_pack_head(
  185. proto=proto,
  186. capabilities=capabilities,
  187. graph_walker=graph_walker,
  188. wants=wants,
  189. can_read=None,
  190. depth=None,
  191. protocol_version=0,
  192. shallow_since="2023-01-01T00:00:00Z",
  193. )
  194. # Verify the deepen-since command was sent
  195. output = self.rout.getvalue()
  196. self.assertIn(b"deepen-since 2023-01-01T00:00:00Z\n", output)
  197. def test_handle_upload_pack_head_deepen_not(self) -> None:
  198. # Test that deepen-not command is properly sent
  199. from dulwich.client import _handle_upload_pack_head
  200. self.rin.write(b"0008NAK\n0000")
  201. self.rin.seek(0)
  202. class DummyGraphWalker:
  203. def __iter__(self):
  204. return self
  205. def __next__(self):
  206. return None
  207. proto = Protocol(self.rin.read, self.rout.write)
  208. capabilities = [b"shallow", b"deepen-not"]
  209. wants = [b"55dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7"]
  210. graph_walker = DummyGraphWalker()
  211. _handle_upload_pack_head(
  212. proto=proto,
  213. capabilities=capabilities,
  214. graph_walker=graph_walker,
  215. wants=wants,
  216. can_read=None,
  217. depth=None,
  218. protocol_version=0,
  219. shallow_exclude=["refs/heads/excluded"],
  220. )
  221. # Verify the deepen-not command was sent
  222. output = self.rout.getvalue()
  223. self.assertIn(b"deepen-not refs/heads/excluded\n", output)
  224. def test_handle_upload_pack_head_deepen_not_multiple(self) -> None:
  225. # Test that multiple deepen-not commands are properly sent
  226. from dulwich.client import _handle_upload_pack_head
  227. self.rin.write(b"0008NAK\n0000")
  228. self.rin.seek(0)
  229. class DummyGraphWalker:
  230. def __iter__(self):
  231. return self
  232. def __next__(self):
  233. return None
  234. proto = Protocol(self.rin.read, self.rout.write)
  235. capabilities = [b"shallow", b"deepen-not"]
  236. wants = [b"55dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7"]
  237. graph_walker = DummyGraphWalker()
  238. _handle_upload_pack_head(
  239. proto=proto,
  240. capabilities=capabilities,
  241. graph_walker=graph_walker,
  242. wants=wants,
  243. can_read=None,
  244. depth=None,
  245. protocol_version=0,
  246. shallow_exclude=["refs/heads/excluded1", "refs/heads/excluded2"],
  247. )
  248. # Verify both deepen-not commands were sent
  249. output = self.rout.getvalue()
  250. self.assertIn(b"deepen-not refs/heads/excluded1\n", output)
  251. self.assertIn(b"deepen-not refs/heads/excluded2\n", output)
  252. def test_handle_upload_pack_head_deepen_since_and_not(self) -> None:
  253. # Test that deepen-since and deepen-not can be used together
  254. from dulwich.client import _handle_upload_pack_head
  255. self.rin.write(b"0008NAK\n0000")
  256. self.rin.seek(0)
  257. class DummyGraphWalker:
  258. def __iter__(self):
  259. return self
  260. def __next__(self):
  261. return None
  262. proto = Protocol(self.rin.read, self.rout.write)
  263. capabilities = [b"shallow", b"deepen-since", b"deepen-not"]
  264. wants = [b"55dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7"]
  265. graph_walker = DummyGraphWalker()
  266. _handle_upload_pack_head(
  267. proto=proto,
  268. capabilities=capabilities,
  269. graph_walker=graph_walker,
  270. wants=wants,
  271. can_read=None,
  272. depth=None,
  273. protocol_version=0,
  274. shallow_since="2023-01-01T00:00:00Z",
  275. shallow_exclude=["refs/heads/excluded"],
  276. )
  277. # Verify both deepen-since and deepen-not commands were sent
  278. output = self.rout.getvalue()
  279. self.assertIn(b"deepen-since 2023-01-01T00:00:00Z\n", output)
  280. self.assertIn(b"deepen-not refs/heads/excluded\n", output)
  281. def test_send_pack_no_sideband64k_with_update_ref_error(self) -> None:
  282. # No side-bank-64k reported by server shouldn't try to parse
  283. # side band data
  284. pkts = [
  285. b"55dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7 capabilities^{}"
  286. b"\x00 report-status delete-refs ofs-delta\n",
  287. b"",
  288. b"unpack ok",
  289. b"ng refs/foo/bar pre-receive hook declined",
  290. b"",
  291. ]
  292. for pkt in pkts:
  293. if pkt == b"":
  294. self.rin.write(b"0000")
  295. else:
  296. self.rin.write(("%04x" % (len(pkt) + 4)).encode("ascii") + pkt)
  297. self.rin.seek(0)
  298. tree = Tree()
  299. commit = Commit()
  300. commit.tree = tree
  301. commit.parents = []
  302. commit.author = commit.committer = b"test user"
  303. commit.commit_time = commit.author_time = 1174773719
  304. commit.commit_timezone = commit.author_timezone = 0
  305. commit.encoding = b"UTF-8"
  306. commit.message = b"test message"
  307. def update_refs(refs):
  308. return {
  309. b"refs/foo/bar": commit.id,
  310. }
  311. def generate_pack_data(have, want, *, ofs_delta=False, progress=None):
  312. return pack_objects_to_data(
  313. [
  314. (commit, None),
  315. (tree, b""),
  316. ]
  317. )
  318. result = self.client.send_pack("blah", update_refs, generate_pack_data)
  319. self.assertEqual(
  320. {b"refs/foo/bar": "pre-receive hook declined"}, result.ref_status
  321. )
  322. self.assertEqual({b"refs/foo/bar": commit.id}, result.refs)
  323. def test_send_pack_none(self) -> None:
  324. # Set ref to current value
  325. self.rin.write(
  326. b"0078310ca9477129b8586fa2afc779c1f57cf64bba6c "
  327. b"refs/heads/master\x00 report-status delete-refs "
  328. b"side-band-64k quiet ofs-delta\n"
  329. b"0000"
  330. )
  331. self.rin.seek(0)
  332. def update_refs(refs):
  333. return {b"refs/heads/master": b"310ca9477129b8586fa2afc779c1f57cf64bba6c"}
  334. def generate_pack_data(have, want, *, ofs_delta=False, progress=None):
  335. return 0, []
  336. self.client.send_pack(b"/", update_refs, generate_pack_data)
  337. self.assertEqual(self.rout.getvalue(), b"0000")
  338. def test_send_pack_keep_and_delete(self) -> None:
  339. self.rin.write(
  340. b"0063310ca9477129b8586fa2afc779c1f57cf64bba6c "
  341. b"refs/heads/master\x00report-status delete-refs ofs-delta\n"
  342. b"003f310ca9477129b8586fa2afc779c1f57cf64bba6c refs/heads/keepme\n"
  343. b"0000000eunpack ok\n"
  344. b"0019ok refs/heads/master\n"
  345. b"0000"
  346. )
  347. self.rin.seek(0)
  348. def update_refs(refs):
  349. return {b"refs/heads/master": ZERO_SHA}
  350. def generate_pack_data(have, want, *, ofs_delta=False, progress=None):
  351. return 0, []
  352. self.client.send_pack(b"/", update_refs, generate_pack_data)
  353. self.assertEqual(
  354. self.rout.getvalue(),
  355. b"008b310ca9477129b8586fa2afc779c1f57cf64bba6c "
  356. b"0000000000000000000000000000000000000000 "
  357. b"refs/heads/master\x00delete-refs ofs-delta report-status0000",
  358. )
  359. def test_send_pack_delete_only(self) -> None:
  360. self.rin.write(
  361. b"0063310ca9477129b8586fa2afc779c1f57cf64bba6c "
  362. b"refs/heads/master\x00report-status delete-refs ofs-delta\n"
  363. b"0000000eunpack ok\n"
  364. b"0019ok refs/heads/master\n"
  365. b"0000"
  366. )
  367. self.rin.seek(0)
  368. def update_refs(refs):
  369. return {b"refs/heads/master": ZERO_SHA}
  370. def generate_pack_data(have, want, *, ofs_delta=False, progress=None):
  371. return 0, []
  372. self.client.send_pack(b"/", update_refs, generate_pack_data)
  373. self.assertEqual(
  374. self.rout.getvalue(),
  375. b"008b310ca9477129b8586fa2afc779c1f57cf64bba6c "
  376. b"0000000000000000000000000000000000000000 "
  377. b"refs/heads/master\x00delete-refs ofs-delta report-status0000",
  378. )
  379. def test_send_pack_new_ref_only(self) -> None:
  380. self.rin.write(
  381. b"0063310ca9477129b8586fa2afc779c1f57cf64bba6c "
  382. b"refs/heads/master\x00report-status delete-refs ofs-delta\n"
  383. b"0000000eunpack ok\n"
  384. b"0019ok refs/heads/blah12\n"
  385. b"0000"
  386. )
  387. self.rin.seek(0)
  388. def update_refs(refs):
  389. return {
  390. b"refs/heads/blah12": b"310ca9477129b8586fa2afc779c1f57cf64bba6c",
  391. b"refs/heads/master": b"310ca9477129b8586fa2afc779c1f57cf64bba6c",
  392. }
  393. def generate_pack_data(have, want, *, ofs_delta=False, progress=None):
  394. return 0, []
  395. f = BytesIO()
  396. write_pack_objects(f.write, [], object_format=DEFAULT_OBJECT_FORMAT)
  397. self.client.send_pack("/", update_refs, generate_pack_data)
  398. self.assertEqual(
  399. self.rout.getvalue(),
  400. b"008b0000000000000000000000000000000000000000 "
  401. b"310ca9477129b8586fa2afc779c1f57cf64bba6c "
  402. b"refs/heads/blah12\x00delete-refs ofs-delta report-status0000"
  403. + f.getvalue(),
  404. )
  405. def test_send_pack_new_ref(self) -> None:
  406. self.rin.write(
  407. b"0064310ca9477129b8586fa2afc779c1f57cf64bba6c "
  408. b"refs/heads/master\x00 report-status delete-refs ofs-delta\n"
  409. b"0000000eunpack ok\n"
  410. b"0019ok refs/heads/blah12\n"
  411. b"0000"
  412. )
  413. self.rin.seek(0)
  414. tree = Tree()
  415. commit = Commit()
  416. commit.tree = tree
  417. commit.parents = []
  418. commit.author = commit.committer = b"test user"
  419. commit.commit_time = commit.author_time = 1174773719
  420. commit.commit_timezone = commit.author_timezone = 0
  421. commit.encoding = b"UTF-8"
  422. commit.message = b"test message"
  423. def update_refs(refs):
  424. return {
  425. b"refs/heads/blah12": commit.id,
  426. b"refs/heads/master": b"310ca9477129b8586fa2afc779c1f57cf64bba6c",
  427. }
  428. def generate_pack_data(have, want, *, ofs_delta=False, progress=None):
  429. return pack_objects_to_data(
  430. [
  431. (commit, None),
  432. (tree, b""),
  433. ]
  434. )
  435. f = BytesIO()
  436. count, records = generate_pack_data(None, None)
  437. from dulwich.object_format import DEFAULT_OBJECT_FORMAT
  438. write_pack_data(
  439. f.write, records, num_records=count, object_format=DEFAULT_OBJECT_FORMAT
  440. )
  441. self.client.send_pack(b"/", update_refs, generate_pack_data)
  442. self.assertEqual(
  443. self.rout.getvalue(),
  444. b"008b0000000000000000000000000000000000000000 "
  445. + commit.id
  446. + b" refs/heads/blah12\x00delete-refs ofs-delta report-status0000"
  447. + f.getvalue(),
  448. )
  449. def test_send_pack_no_deleteref_delete_only(self) -> None:
  450. pkts = [
  451. b"310ca9477129b8586fa2afc779c1f57cf64bba6c refs/heads/master"
  452. b"\x00 report-status ofs-delta\n",
  453. b"",
  454. b"",
  455. ]
  456. for pkt in pkts:
  457. if pkt == b"":
  458. self.rin.write(b"0000")
  459. else:
  460. self.rin.write(("%04x" % (len(pkt) + 4)).encode("ascii") + pkt)
  461. self.rin.seek(0)
  462. def update_refs(refs):
  463. return {b"refs/heads/master": ZERO_SHA}
  464. def generate_pack_data(have, want, *, ofs_delta=False, progress=None):
  465. return 0, []
  466. result = self.client.send_pack(b"/", update_refs, generate_pack_data)
  467. self.assertEqual(
  468. result.ref_status,
  469. {b"refs/heads/master": "remote does not support deleting refs"},
  470. )
  471. self.assertEqual(
  472. result.refs,
  473. {b"refs/heads/master": b"310ca9477129b8586fa2afc779c1f57cf64bba6c"},
  474. )
  475. self.assertEqual(self.rout.getvalue(), b"0000")
  476. class TestGetTransportAndPath(TestCase):
  477. def test_tcp(self) -> None:
  478. c, path = get_transport_and_path("git://foo.com/bar/baz")
  479. self.assertIsInstance(c, TCPGitClient)
  480. self.assertEqual("foo.com", c._host)
  481. self.assertEqual(TCP_GIT_PORT, c._port)
  482. self.assertEqual("/bar/baz", path)
  483. def test_tcp_port(self) -> None:
  484. c, path = get_transport_and_path("git://foo.com:1234/bar/baz")
  485. self.assertIsInstance(c, TCPGitClient)
  486. self.assertEqual("foo.com", c._host)
  487. self.assertEqual(1234, c._port)
  488. self.assertEqual("/bar/baz", path)
  489. def test_tcp_ipv6(self) -> None:
  490. c, path = get_transport_and_path("git://[::1]/bar/baz")
  491. self.assertIsInstance(c, TCPGitClient)
  492. self.assertEqual("::1", c._host)
  493. self.assertEqual(TCP_GIT_PORT, c._port)
  494. self.assertEqual("/bar/baz", path)
  495. def test_tcp_ipv6_port(self) -> None:
  496. c, path = get_transport_and_path("git://[2001:db8::1]:1234/bar/baz")
  497. self.assertIsInstance(c, TCPGitClient)
  498. self.assertEqual("2001:db8::1", c._host)
  499. self.assertEqual(1234, c._port)
  500. self.assertEqual("/bar/baz", path)
  501. def test_git_ssh_explicit(self) -> None:
  502. c, path = get_transport_and_path("git+ssh://foo.com/bar/baz")
  503. self.assertIsInstance(c, SSHGitClient)
  504. self.assertEqual("foo.com", c.host)
  505. self.assertEqual(None, c.port)
  506. self.assertEqual(None, c.username)
  507. self.assertEqual("/bar/baz", path)
  508. def test_ssh_explicit(self) -> None:
  509. c, path = get_transport_and_path("ssh://foo.com/bar/baz")
  510. self.assertIsInstance(c, SSHGitClient)
  511. self.assertEqual("foo.com", c.host)
  512. self.assertEqual(None, c.port)
  513. self.assertEqual(None, c.username)
  514. self.assertEqual("/bar/baz", path)
  515. def test_ssh_port_explicit(self) -> None:
  516. c, path = get_transport_and_path("git+ssh://foo.com:1234/bar/baz")
  517. self.assertIsInstance(c, SSHGitClient)
  518. self.assertEqual("foo.com", c.host)
  519. self.assertEqual(1234, c.port)
  520. self.assertEqual("/bar/baz", path)
  521. def test_username_and_port_explicit_unknown_scheme(self) -> None:
  522. c, path = get_transport_and_path("unknown://git@server:7999/dply/stuff.git")
  523. self.assertIsInstance(c, SSHGitClient)
  524. self.assertEqual("unknown", c.host)
  525. self.assertEqual("//git@server:7999/dply/stuff.git", path)
  526. def test_username_and_port_explicit(self) -> None:
  527. c, path = get_transport_and_path("ssh://git@server:7999/dply/stuff.git")
  528. self.assertIsInstance(c, SSHGitClient)
  529. self.assertEqual("git", c.username)
  530. self.assertEqual("server", c.host)
  531. self.assertEqual(7999, c.port)
  532. self.assertEqual("/dply/stuff.git", path)
  533. def test_ssh_abspath_doubleslash(self) -> None:
  534. c, path = get_transport_and_path("git+ssh://foo.com//bar/baz")
  535. self.assertIsInstance(c, SSHGitClient)
  536. self.assertEqual("foo.com", c.host)
  537. self.assertEqual(None, c.port)
  538. self.assertEqual(None, c.username)
  539. self.assertEqual("//bar/baz", path)
  540. def test_ssh_port(self) -> None:
  541. c, path = get_transport_and_path("git+ssh://foo.com:1234/bar/baz")
  542. self.assertIsInstance(c, SSHGitClient)
  543. self.assertEqual("foo.com", c.host)
  544. self.assertEqual(1234, c.port)
  545. self.assertEqual("/bar/baz", path)
  546. def test_ssh_implicit(self) -> None:
  547. c, path = get_transport_and_path("foo:/bar/baz")
  548. self.assertIsInstance(c, SSHGitClient)
  549. self.assertEqual("foo", c.host)
  550. self.assertEqual(None, c.port)
  551. self.assertEqual(None, c.username)
  552. self.assertEqual("/bar/baz", path)
  553. def test_ssh_host(self) -> None:
  554. c, path = get_transport_and_path("foo.com:/bar/baz")
  555. self.assertIsInstance(c, SSHGitClient)
  556. self.assertEqual("foo.com", c.host)
  557. self.assertEqual(None, c.port)
  558. self.assertEqual(None, c.username)
  559. self.assertEqual("/bar/baz", path)
  560. def test_ssh_user_host(self) -> None:
  561. c, path = get_transport_and_path("user@foo.com:/bar/baz")
  562. self.assertIsInstance(c, SSHGitClient)
  563. self.assertEqual("foo.com", c.host)
  564. self.assertEqual(None, c.port)
  565. self.assertEqual("user", c.username)
  566. self.assertEqual("/bar/baz", path)
  567. def test_ssh_relpath(self) -> None:
  568. c, path = get_transport_and_path("foo:bar/baz")
  569. self.assertIsInstance(c, SSHGitClient)
  570. self.assertEqual("foo", c.host)
  571. self.assertEqual(None, c.port)
  572. self.assertEqual(None, c.username)
  573. self.assertEqual("bar/baz", path)
  574. def test_ssh_host_relpath(self) -> None:
  575. c, path = get_transport_and_path("foo.com:bar/baz")
  576. self.assertIsInstance(c, SSHGitClient)
  577. self.assertEqual("foo.com", c.host)
  578. self.assertEqual(None, c.port)
  579. self.assertEqual(None, c.username)
  580. self.assertEqual("bar/baz", path)
  581. def test_ssh_user_host_relpath(self) -> None:
  582. c, path = get_transport_and_path("user@foo.com:bar/baz")
  583. self.assertIsInstance(c, SSHGitClient)
  584. self.assertEqual("foo.com", c.host)
  585. self.assertEqual(None, c.port)
  586. self.assertEqual("user", c.username)
  587. self.assertEqual("bar/baz", path)
  588. def test_local(self) -> None:
  589. c, path = get_transport_and_path("foo.bar/baz")
  590. self.assertIsInstance(c, LocalGitClient)
  591. self.assertEqual("foo.bar/baz", path)
  592. def test_ssh_with_config(self) -> None:
  593. # Test that core.sshCommand from config is passed to SSHGitClient
  594. config = ConfigDict()
  595. c, _path = get_transport_and_path(
  596. "ssh://git@github.com/user/repo.git", config=config
  597. )
  598. self.assertIsInstance(c, SSHGitClient)
  599. self.assertEqual(c.ssh_command, "ssh") # Now defaults to "ssh"
  600. config.set((b"core",), b"sshCommand", b"custom-ssh -o CustomOption=yes")
  601. c, _path = get_transport_and_path(
  602. "ssh://git@github.com/user/repo.git", config=config
  603. )
  604. self.assertIsInstance(c, SSHGitClient)
  605. self.assertEqual("custom-ssh -o CustomOption=yes", c.ssh_command)
  606. # Test rsync-style URL also gets the config
  607. c, _path = get_transport_and_path("git@github.com:user/repo.git", config=config)
  608. self.assertIsInstance(c, SSHGitClient)
  609. self.assertEqual("custom-ssh -o CustomOption=yes", c.ssh_command)
  610. @skipIf(sys.platform != "win32", "Behaviour only happens on windows.")
  611. def test_local_abs_windows_path(self) -> None:
  612. c, path = get_transport_and_path("C:\\foo.bar\\baz")
  613. self.assertIsInstance(c, LocalGitClient)
  614. self.assertEqual("C:\\foo.bar\\baz", path)
  615. def test_error(self) -> None:
  616. # Need to use a known urlparse.uses_netloc URL scheme to get the
  617. # expected parsing of the URL on Python versions less than 2.6.5
  618. c, _path = get_transport_and_path("prospero://bar/baz")
  619. self.assertIsInstance(c, SSHGitClient)
  620. def test_http(self) -> None:
  621. url = "https://github.com/jelmer/dulwich"
  622. c, path = get_transport_and_path(url)
  623. self.assertIsInstance(c, HttpGitClient)
  624. self.assertEqual("/jelmer/dulwich", path)
  625. def test_http_auth(self) -> None:
  626. url = "https://user:passwd@github.com/jelmer/dulwich"
  627. c, path = get_transport_and_path(url)
  628. self.assertIsInstance(c, HttpGitClient)
  629. self.assertEqual("/jelmer/dulwich", path)
  630. self.assertEqual("user", c._username)
  631. self.assertEqual("passwd", c._password)
  632. def test_http_auth_with_username(self) -> None:
  633. url = "https://github.com/jelmer/dulwich"
  634. c, path = get_transport_and_path(url, username="user2", password="blah")
  635. self.assertIsInstance(c, HttpGitClient)
  636. self.assertEqual("/jelmer/dulwich", path)
  637. self.assertEqual("user2", c._username)
  638. self.assertEqual("blah", c._password)
  639. def test_http_auth_with_username_and_in_url(self) -> None:
  640. url = "https://user:passwd@github.com/jelmer/dulwich"
  641. c, path = get_transport_and_path(url, username="user2", password="blah")
  642. self.assertIsInstance(c, HttpGitClient)
  643. self.assertEqual("/jelmer/dulwich", path)
  644. # Explicitly provided credentials should override URL credentials
  645. self.assertEqual("user2", c._username)
  646. self.assertEqual("blah", c._password)
  647. def test_http_no_auth(self) -> None:
  648. url = "https://github.com/jelmer/dulwich"
  649. c, path = get_transport_and_path(url)
  650. self.assertIsInstance(c, HttpGitClient)
  651. self.assertEqual("/jelmer/dulwich", path)
  652. self.assertIs(None, c._username)
  653. self.assertIs(None, c._password)
  654. def test_ssh_with_key_filename_and_ssh_command(self) -> None:
  655. # Test that key_filename and ssh_command are passed through to SSHGitClient
  656. c, path = get_transport_and_path(
  657. "ssh://git@github.com/user/repo.git",
  658. key_filename="/path/to/id_rsa",
  659. ssh_command="custom-ssh -o StrictHostKeyChecking=no",
  660. )
  661. self.assertIsInstance(c, SSHGitClient)
  662. self.assertEqual("/user/repo.git", path)
  663. self.assertEqual("/path/to/id_rsa", c.key_filename)
  664. self.assertEqual("custom-ssh -o StrictHostKeyChecking=no", c.ssh_command)
  665. class TestGetTransportAndPathFromUrl(TestCase):
  666. def test_tcp(self) -> None:
  667. c, path = get_transport_and_path_from_url("git://foo.com/bar/baz")
  668. self.assertIsInstance(c, TCPGitClient)
  669. self.assertEqual("foo.com", c._host)
  670. self.assertEqual(TCP_GIT_PORT, c._port)
  671. self.assertEqual("/bar/baz", path)
  672. def test_tcp_port(self) -> None:
  673. c, path = get_transport_and_path_from_url("git://foo.com:1234/bar/baz")
  674. self.assertIsInstance(c, TCPGitClient)
  675. self.assertEqual("foo.com", c._host)
  676. self.assertEqual(1234, c._port)
  677. self.assertEqual("/bar/baz", path)
  678. def test_ssh_explicit(self) -> None:
  679. c, path = get_transport_and_path_from_url("git+ssh://foo.com/bar/baz")
  680. self.assertIsInstance(c, SSHGitClient)
  681. self.assertEqual("foo.com", c.host)
  682. self.assertEqual(None, c.port)
  683. self.assertEqual(None, c.username)
  684. self.assertEqual("/bar/baz", path)
  685. def test_ssh_port_explicit(self) -> None:
  686. c, path = get_transport_and_path_from_url("git+ssh://foo.com:1234/bar/baz")
  687. self.assertIsInstance(c, SSHGitClient)
  688. self.assertEqual("foo.com", c.host)
  689. self.assertEqual(1234, c.port)
  690. self.assertEqual("/bar/baz", path)
  691. def test_ssh_homepath(self) -> None:
  692. c, path = get_transport_and_path_from_url("git+ssh://foo.com/~/bar/baz")
  693. self.assertIsInstance(c, SSHGitClient)
  694. self.assertEqual("foo.com", c.host)
  695. self.assertEqual(None, c.port)
  696. self.assertEqual(None, c.username)
  697. self.assertEqual("/~/bar/baz", path)
  698. def test_ssh_port_homepath(self) -> None:
  699. c, path = get_transport_and_path_from_url("git+ssh://foo.com:1234/~/bar/baz")
  700. self.assertIsInstance(c, SSHGitClient)
  701. self.assertEqual("foo.com", c.host)
  702. self.assertEqual(1234, c.port)
  703. self.assertEqual("/~/bar/baz", path)
  704. def test_ssh_host_relpath(self) -> None:
  705. self.assertRaises(
  706. ValueError, get_transport_and_path_from_url, "foo.com:bar/baz"
  707. )
  708. def test_ssh_user_host_relpath(self) -> None:
  709. self.assertRaises(
  710. ValueError, get_transport_and_path_from_url, "user@foo.com:bar/baz"
  711. )
  712. def test_local_path(self) -> None:
  713. self.assertRaises(ValueError, get_transport_and_path_from_url, "foo.bar/baz")
  714. def test_error(self) -> None:
  715. # Need to use a known urlparse.uses_netloc URL scheme to get the
  716. # expected parsing of the URL on Python versions less than 2.6.5
  717. self.assertRaises(
  718. ValueError, get_transport_and_path_from_url, "prospero://bar/baz"
  719. )
  720. def test_http(self) -> None:
  721. url = "https://github.com/jelmer/dulwich"
  722. c, path = get_transport_and_path_from_url(url)
  723. self.assertIsInstance(c, HttpGitClient)
  724. self.assertEqual("https://github.com", c.get_url(b"/"))
  725. self.assertEqual("/jelmer/dulwich", path)
  726. def test_http_port(self) -> None:
  727. url = "https://github.com:9090/jelmer/dulwich"
  728. c, path = get_transport_and_path_from_url(url)
  729. self.assertEqual("https://github.com:9090", c.get_url(b"/"))
  730. self.assertIsInstance(c, HttpGitClient)
  731. self.assertEqual("/jelmer/dulwich", path)
  732. @patch("os.name", "posix")
  733. @patch("sys.platform", "linux")
  734. def test_file(self) -> None:
  735. c, path = get_transport_and_path_from_url("file:///home/jelmer/foo")
  736. self.assertIsInstance(c, LocalGitClient)
  737. self.assertEqual("/home/jelmer/foo", path)
  738. def test_win32_url_to_path(self):
  739. def check(url, expected):
  740. parsed = urlparse(url)
  741. self.assertEqual(_win32_url_to_path(parsed), expected)
  742. check("file:C:/foo.bar/baz", "C:\\foo.bar\\baz")
  743. check("file:/C:/foo.bar/baz", "C:\\foo.bar\\baz")
  744. check("file://C:/foo.bar/baz", "C:\\foo.bar\\baz")
  745. check("file:///C:/foo.bar/baz", "C:\\foo.bar\\baz")
  746. @patch("os.name", "nt")
  747. @patch("sys.platform", "win32")
  748. def test_file_win(self) -> None:
  749. expected = "C:\\foo.bar\\baz"
  750. for file_url in [
  751. "file:C:/foo.bar/baz",
  752. "file:/C:/foo.bar/baz",
  753. "file://C:/foo.bar/baz",
  754. "file:///C:/foo.bar/baz",
  755. ]:
  756. c, path = get_transport_and_path(file_url)
  757. self.assertIsInstance(c, LocalGitClient)
  758. self.assertEqual(path, expected)
  759. for remote_url in [
  760. "file://host.example.com/C:/foo.bar/baz"
  761. "file://host.example.com/C:/foo.bar/baz"
  762. "file:////host.example/foo.bar/baz",
  763. ]:
  764. with self.assertRaises(NotImplementedError):
  765. c, path = get_transport_and_path(remote_url)
  766. class TestSSHVendor:
  767. def __init__(self) -> None:
  768. self.host = None
  769. self.command = ""
  770. self.username = None
  771. self.port = None
  772. self.password = None
  773. self.key_filename = None
  774. def run_command(
  775. self,
  776. host,
  777. command,
  778. username=None,
  779. port=None,
  780. password=None,
  781. key_filename=None,
  782. ssh_command=None,
  783. protocol_version=None,
  784. ):
  785. self.host = host
  786. self.command = command
  787. self.username = username
  788. self.port = port
  789. self.password = password
  790. self.key_filename = key_filename
  791. self.ssh_command = ssh_command
  792. self.protocol_version = protocol_version
  793. class Subprocess:
  794. pass
  795. Subprocess.read = lambda: None
  796. Subprocess.write = lambda: None
  797. Subprocess.close = lambda: None
  798. Subprocess.can_read = lambda: None
  799. return Subprocess()
  800. class SSHGitClientTests(TestCase):
  801. def setUp(self) -> None:
  802. super().setUp()
  803. self.server = TestSSHVendor()
  804. self.real_vendor = client.get_ssh_vendor
  805. client.get_ssh_vendor = lambda: self.server
  806. self.client = SSHGitClient("git.samba.org")
  807. def tearDown(self) -> None:
  808. super().tearDown()
  809. client.get_ssh_vendor = self.real_vendor
  810. def test_get_url(self) -> None:
  811. path = "/tmp/repo.git"
  812. c = SSHGitClient("git.samba.org")
  813. url = c.get_url(path)
  814. self.assertEqual("ssh://git.samba.org/tmp/repo.git", url)
  815. def test_get_url_with_username_and_port(self) -> None:
  816. path = "/tmp/repo.git"
  817. c = SSHGitClient("git.samba.org", port=2222, username="user")
  818. url = c.get_url(path)
  819. self.assertEqual("ssh://user@git.samba.org:2222/tmp/repo.git", url)
  820. def test_default_command(self) -> None:
  821. self.assertEqual(b"git-upload-pack", self.client._get_cmd_path(b"upload-pack"))
  822. def test_alternative_command_path(self) -> None:
  823. self.client.alternative_paths[b"upload-pack"] = b"/usr/lib/git/git-upload-pack"
  824. self.assertEqual(
  825. b"/usr/lib/git/git-upload-pack",
  826. self.client._get_cmd_path(b"upload-pack"),
  827. )
  828. def test_alternative_command_path_spaces(self) -> None:
  829. self.client.alternative_paths[b"upload-pack"] = (
  830. b"/usr/lib/git/git-upload-pack -ibla"
  831. )
  832. self.assertEqual(
  833. b"/usr/lib/git/git-upload-pack -ibla",
  834. self.client._get_cmd_path(b"upload-pack"),
  835. )
  836. def test_connect(self) -> None:
  837. server = self.server
  838. client = self.client
  839. client.username = b"username"
  840. client.port = 1337
  841. client._connect(b"command", b"/path/to/repo")
  842. self.assertEqual(b"username", server.username)
  843. self.assertEqual(1337, server.port)
  844. self.assertEqual(b"git-command '/path/to/repo'", server.command)
  845. client._connect(b"relative-command", b"/~/path/to/repo")
  846. self.assertEqual(b"git-relative-command '~/path/to/repo'", server.command)
  847. def test_ssh_command_precedence(self) -> None:
  848. self.overrideEnv("GIT_SSH", "/path/to/ssh")
  849. test_client = SSHGitClient("git.samba.org")
  850. self.assertEqual(test_client.ssh_command, "/path/to/ssh")
  851. self.overrideEnv("GIT_SSH_COMMAND", "/path/to/ssh -o Option=Value")
  852. test_client = SSHGitClient("git.samba.org")
  853. self.assertEqual(test_client.ssh_command, "/path/to/ssh -o Option=Value")
  854. test_client = SSHGitClient("git.samba.org", ssh_command="ssh -o Option1=Value1")
  855. self.assertEqual(test_client.ssh_command, "ssh -o Option1=Value1")
  856. def test_ssh_command_config(self) -> None:
  857. # Test core.sshCommand config setting
  858. # No config, no environment - should default to "ssh"
  859. self.overrideEnv("GIT_SSH", None)
  860. self.overrideEnv("GIT_SSH_COMMAND", None)
  861. test_client = SSHGitClient("git.samba.org")
  862. self.assertEqual(test_client.ssh_command, "ssh")
  863. # Config with core.sshCommand
  864. config = ConfigDict()
  865. config.set((b"core",), b"sshCommand", b"ssh -o StrictHostKeyChecking=no")
  866. test_client = SSHGitClient("git.samba.org", config=config)
  867. self.assertEqual(test_client.ssh_command, "ssh -o StrictHostKeyChecking=no")
  868. # ssh_command parameter takes precedence over config
  869. test_client = SSHGitClient(
  870. "git.samba.org", config=config, ssh_command="custom-ssh"
  871. )
  872. self.assertEqual(test_client.ssh_command, "custom-ssh")
  873. # Environment variables take precedence over config when no ssh_command parameter
  874. self.overrideEnv("GIT_SSH_COMMAND", "/usr/bin/ssh -v")
  875. test_client = SSHGitClient("git.samba.org", config=config)
  876. self.assertEqual(test_client.ssh_command, "/usr/bin/ssh -v")
  877. def test_ssh_kwargs_passed_to_vendor(self) -> None:
  878. # Test that ssh_command and other kwargs are actually passed to the SSH vendor
  879. server = self.server
  880. client = self.client
  881. # Set custom ssh_command
  882. client.ssh_command = "custom-ssh-wrapper.sh -o Option=Value"
  883. client.password = "test-password"
  884. client.key_filename = "/path/to/key"
  885. # Connect and verify all kwargs are passed through
  886. client._connect(b"upload-pack", b"/path/to/repo")
  887. self.assertEqual(server.ssh_command, "custom-ssh-wrapper.sh -o Option=Value")
  888. self.assertEqual(server.password, "test-password")
  889. self.assertEqual(server.key_filename, "/path/to/key")
  890. class ReportStatusParserTests(TestCase):
  891. def test_invalid_pack(self) -> None:
  892. parser = ReportStatusParser()
  893. parser.handle_packet(b"unpack error - foo bar")
  894. parser.handle_packet(b"ok refs/foo/bar")
  895. parser.handle_packet(None)
  896. self.assertRaises(SendPackError, list, parser.check())
  897. def test_update_refs_error(self) -> None:
  898. parser = ReportStatusParser()
  899. parser.handle_packet(b"unpack ok")
  900. parser.handle_packet(b"ng refs/foo/bar need to pull")
  901. parser.handle_packet(None)
  902. self.assertEqual([(b"refs/foo/bar", "need to pull")], list(parser.check()))
  903. def test_ok(self) -> None:
  904. parser = ReportStatusParser()
  905. parser.handle_packet(b"unpack ok")
  906. parser.handle_packet(b"ok refs/foo/bar")
  907. parser.handle_packet(None)
  908. self.assertEqual([(b"refs/foo/bar", None)], list(parser.check()))
  909. class LocalGitClientTests(TestCase):
  910. def test_get_url(self) -> None:
  911. path = "/tmp/repo.git"
  912. c = LocalGitClient()
  913. url = c.get_url(path)
  914. self.assertEqual("file:///tmp/repo.git", url)
  915. def test_fetch_into_empty(self) -> None:
  916. c = LocalGitClient()
  917. target = tempfile.mkdtemp()
  918. self.addCleanup(shutil.rmtree, target)
  919. t = Repo.init_bare(target)
  920. self.addCleanup(t.close)
  921. s = open_repo("a.git")
  922. self.addCleanup(tear_down_repo, s)
  923. self.assertEqual(s.get_refs(), c.fetch(s.path, t).refs)
  924. def test_clone(self) -> None:
  925. c = LocalGitClient()
  926. s = open_repo("a.git")
  927. self.addCleanup(tear_down_repo, s)
  928. target = tempfile.mkdtemp()
  929. self.addCleanup(shutil.rmtree, target)
  930. result_repo = c.clone(s.path, target, mkdir=False)
  931. self.addCleanup(result_repo.close)
  932. expected = dict(s.get_refs())
  933. expected[b"refs/remotes/origin/HEAD"] = expected[b"HEAD"]
  934. expected[b"refs/remotes/origin/master"] = expected[b"refs/heads/master"]
  935. self.assertEqual(expected, result_repo.get_refs())
  936. def test_clone_sha256_local(self) -> None:
  937. """Test that cloning a SHA-256 local repo creates a SHA-256 clone."""
  938. client = LocalGitClient()
  939. # Create a SHA-256 source repository
  940. source_path = tempfile.mkdtemp()
  941. self.addCleanup(shutil.rmtree, source_path)
  942. source_repo = Repo.init(source_path, object_format="sha256")
  943. # Verify source is SHA-256
  944. self.assertEqual("sha256", source_repo.object_format.name)
  945. source_repo.close()
  946. # Clone the repository
  947. target_path = tempfile.mkdtemp()
  948. self.addCleanup(shutil.rmtree, target_path)
  949. cloned_repo = client.clone(source_path, target_path, mkdir=False)
  950. self.addCleanup(cloned_repo.close)
  951. # Verify the clone uses SHA-256
  952. self.assertEqual("sha256", cloned_repo.object_format.name)
  953. # Verify the config has the correct objectformat extension
  954. config = cloned_repo.get_config()
  955. self.assertEqual(b"sha256", config.get((b"extensions",), b"objectformat"))
  956. def test_clone_sha1_local(self) -> None:
  957. """Test that cloning a SHA-1 local repo creates a SHA-1 clone."""
  958. client = LocalGitClient()
  959. # Create a SHA-1 source repository
  960. source_path = tempfile.mkdtemp()
  961. self.addCleanup(shutil.rmtree, source_path)
  962. source_repo = Repo.init(source_path, object_format="sha1")
  963. # Verify source is SHA-1
  964. self.assertEqual("sha1", source_repo.object_format.name)
  965. source_repo.close()
  966. # Clone the repository
  967. target_path = tempfile.mkdtemp()
  968. self.addCleanup(shutil.rmtree, target_path)
  969. cloned_repo = client.clone(source_path, target_path, mkdir=False)
  970. self.addCleanup(cloned_repo.close)
  971. # Verify the clone uses SHA-1
  972. self.assertEqual("sha1", cloned_repo.object_format.name)
  973. def test_fetch_empty(self) -> None:
  974. c = LocalGitClient()
  975. s = open_repo("a.git")
  976. self.addCleanup(tear_down_repo, s)
  977. out = BytesIO()
  978. walker = {}
  979. ret = c.fetch_pack(
  980. s.path,
  981. lambda heads, depth=None: [],
  982. graph_walker=walker,
  983. pack_data=out.write,
  984. )
  985. self.assertEqual(
  986. {
  987. b"HEAD": b"a90fa2d900a17e99b433217e988c4eb4a2e9a097",
  988. b"refs/heads/master": b"a90fa2d900a17e99b433217e988c4eb4a2e9a097",
  989. b"refs/tags/mytag": b"28237f4dc30d0d462658d6b937b08a0f0b6ef55a",
  990. b"refs/tags/mytag-packed": b"b0931cadc54336e78a1d980420e3268903b57a50",
  991. },
  992. ret.refs,
  993. )
  994. self.assertEqual({b"HEAD": b"refs/heads/master"}, ret.symrefs)
  995. self.assertEqual(
  996. b"PACK\x00\x00\x00\x02\x00\x00\x00\x00\x02\x9d\x08"
  997. b"\x82;\xd8\xa8\xea\xb5\x10\xadj\xc7\\\x82<\xfd>\xd3\x1e",
  998. out.getvalue(),
  999. )
  1000. def test_fetch_pack_none(self) -> None:
  1001. c = LocalGitClient()
  1002. s = open_repo("a.git")
  1003. self.addCleanup(tear_down_repo, s)
  1004. out = BytesIO()
  1005. walker = MemoryRepo().get_graph_walker()
  1006. ret = c.fetch_pack(
  1007. s.path,
  1008. lambda heads, depth=None: [b"a90fa2d900a17e99b433217e988c4eb4a2e9a097"],
  1009. graph_walker=walker,
  1010. pack_data=out.write,
  1011. )
  1012. self.assertEqual({b"HEAD": b"refs/heads/master"}, ret.symrefs)
  1013. self.assertEqual(
  1014. {
  1015. b"HEAD": b"a90fa2d900a17e99b433217e988c4eb4a2e9a097",
  1016. b"refs/heads/master": b"a90fa2d900a17e99b433217e988c4eb4a2e9a097",
  1017. b"refs/tags/mytag": b"28237f4dc30d0d462658d6b937b08a0f0b6ef55a",
  1018. b"refs/tags/mytag-packed": b"b0931cadc54336e78a1d980420e3268903b57a50",
  1019. },
  1020. ret.refs,
  1021. )
  1022. # Hardcoding is not ideal, but we'll fix that some other day..
  1023. self.assertTrue(
  1024. out.getvalue().startswith(b"PACK\x00\x00\x00\x02\x00\x00\x00\x07")
  1025. )
  1026. def test_send_pack_without_changes(self) -> None:
  1027. local = open_repo("a.git")
  1028. self.addCleanup(tear_down_repo, local)
  1029. target = open_repo("a.git")
  1030. self.addCleanup(tear_down_repo, target)
  1031. self.send_and_verify(b"master", local, target)
  1032. def test_send_pack_with_changes(self) -> None:
  1033. local = open_repo("a.git")
  1034. self.addCleanup(tear_down_repo, local)
  1035. target_path = tempfile.mkdtemp()
  1036. self.addCleanup(shutil.rmtree, target_path)
  1037. with Repo.init_bare(target_path) as target:
  1038. self.send_and_verify(b"master", local, target)
  1039. def test_get_refs(self) -> None:
  1040. local = open_repo("refs.git")
  1041. self.addCleanup(tear_down_repo, local)
  1042. client = LocalGitClient()
  1043. result = client.get_refs(local.path)
  1044. self.assertDictEqual(local.refs.as_dict(), result.refs)
  1045. # Check that symrefs are detected correctly
  1046. self.assertIn(b"HEAD", result.symrefs)
  1047. def test_fetch_object_format_mismatch_sha256_to_sha1(self) -> None:
  1048. """Test that fetching from SHA-256 to non-empty SHA-1 repository fails."""
  1049. from dulwich.objects import Blob
  1050. client = LocalGitClient()
  1051. # Create SHA-256 source repository
  1052. sha256_path = tempfile.mkdtemp()
  1053. self.addCleanup(shutil.rmtree, sha256_path)
  1054. sha256_repo = Repo.init(sha256_path, object_format="sha256")
  1055. self.addCleanup(sha256_repo.close)
  1056. # Create SHA-1 target repository with an object (so it can't be auto-changed)
  1057. sha1_path = tempfile.mkdtemp()
  1058. self.addCleanup(shutil.rmtree, sha1_path)
  1059. sha1_repo = Repo.init(sha1_path, object_format="sha1")
  1060. self.addCleanup(sha1_repo.close)
  1061. # Add an object to make the repo non-empty
  1062. blob = Blob.from_string(b"test content")
  1063. sha1_repo.object_store.add_object(blob)
  1064. # Attempt to fetch should raise AssertionError (repo not empty)
  1065. with self.assertRaises(AssertionError) as cm:
  1066. client.fetch(sha256_path, sha1_repo)
  1067. self.assertIn("Cannot change object format", str(cm.exception))
  1068. self.assertIn("already contains objects", str(cm.exception))
  1069. def test_fetch_object_format_mismatch_sha1_to_sha256(self) -> None:
  1070. """Test that fetching from SHA-1 to non-empty SHA-256 repository fails."""
  1071. from dulwich.objects import Blob
  1072. client = LocalGitClient()
  1073. # Create SHA-1 source repository
  1074. sha1_path = tempfile.mkdtemp()
  1075. self.addCleanup(shutil.rmtree, sha1_path)
  1076. sha1_repo = Repo.init(sha1_path, object_format="sha1")
  1077. self.addCleanup(sha1_repo.close)
  1078. # Create SHA-256 target repository with an object (so it can't be auto-changed)
  1079. sha256_path = tempfile.mkdtemp()
  1080. self.addCleanup(shutil.rmtree, sha256_path)
  1081. sha256_repo = Repo.init(sha256_path, object_format="sha256")
  1082. self.addCleanup(sha256_repo.close)
  1083. # Add an object to make the repo non-empty
  1084. blob = Blob.from_string(b"test content")
  1085. sha256_repo.object_store.add_object(blob)
  1086. # Attempt to fetch should raise AssertionError (repo not empty)
  1087. with self.assertRaises(AssertionError) as cm:
  1088. client.fetch(sha1_path, sha256_repo)
  1089. self.assertIn("Cannot change object format", str(cm.exception))
  1090. self.assertIn("already contains objects", str(cm.exception))
  1091. def test_fetch_object_format_same(self) -> None:
  1092. """Test that fetching between repositories with same object format works."""
  1093. client = LocalGitClient()
  1094. # Create SHA-256 source repository
  1095. sha256_src = tempfile.mkdtemp()
  1096. self.addCleanup(shutil.rmtree, sha256_src)
  1097. src_repo = Repo.init(sha256_src, object_format="sha256")
  1098. self.addCleanup(src_repo.close)
  1099. # Create SHA-256 target repository
  1100. sha256_dst = tempfile.mkdtemp()
  1101. self.addCleanup(shutil.rmtree, sha256_dst)
  1102. dst_repo = Repo.init(sha256_dst, object_format="sha256")
  1103. self.addCleanup(dst_repo.close)
  1104. # Fetch should succeed without error
  1105. result = client.fetch(sha256_src, dst_repo)
  1106. self.assertIsNotNone(result)
  1107. def send_and_verify(self, branch, local, target) -> None:
  1108. """Send branch from local to remote repository and verify it worked."""
  1109. client = LocalGitClient()
  1110. ref_name = b"refs/heads/" + branch
  1111. result = client.send_pack(
  1112. target.path,
  1113. lambda _: {ref_name: local.refs[ref_name]},
  1114. local.generate_pack_data,
  1115. )
  1116. self.assertEqual(local.refs[ref_name], result.refs[ref_name])
  1117. self.assertIs(None, result.agent)
  1118. self.assertEqual({}, result.ref_status)
  1119. obj_local = local.get_object(result.refs[ref_name])
  1120. obj_target = target.get_object(result.refs[ref_name])
  1121. self.assertEqual(obj_local, obj_target)
  1122. class BundleClientTests(TestCase):
  1123. def setUp(self) -> None:
  1124. super().setUp()
  1125. self.tempdir = tempfile.mkdtemp()
  1126. self.addCleanup(shutil.rmtree, self.tempdir)
  1127. def _create_test_bundle(self):
  1128. """Create a test bundle file and return its path."""
  1129. # Create a simple repository
  1130. repo = MemoryRepo()
  1131. # Create some objects
  1132. blob = Blob.from_string(b"Hello world")
  1133. repo.object_store.add_object(blob)
  1134. tree = Tree()
  1135. tree.add(b"hello.txt", 0o100644, blob.id)
  1136. repo.object_store.add_object(tree)
  1137. commit = Commit()
  1138. commit.tree = tree.id
  1139. commit.message = b"Initial commit"
  1140. commit.author = commit.committer = b"Test User <test@example.com>"
  1141. commit.commit_time = commit.author_time = 1234567890
  1142. commit.commit_timezone = commit.author_timezone = 0
  1143. repo.object_store.add_object(commit)
  1144. repo.refs[b"refs/heads/master"] = commit.id
  1145. # Create bundle
  1146. bundle = create_bundle_from_repo(repo)
  1147. # Write bundle to file
  1148. bundle_path = os.path.join(self.tempdir, "test.bundle")
  1149. with open(bundle_path, "wb") as f:
  1150. write_bundle(f, bundle)
  1151. return bundle_path, repo
  1152. def test_is_bundle_file(self) -> None:
  1153. """Test bundle file detection."""
  1154. bundle_path, _ = self._create_test_bundle()
  1155. # Test positive case
  1156. self.assertTrue(BundleClient._is_bundle_file(bundle_path))
  1157. # Test negative case - regular file
  1158. regular_file = os.path.join(self.tempdir, "regular.txt")
  1159. with open(regular_file, "w") as f:
  1160. f.write("not a bundle")
  1161. self.assertFalse(BundleClient._is_bundle_file(regular_file))
  1162. # Test negative case - non-existent file
  1163. self.assertFalse(BundleClient._is_bundle_file("/non/existent/file"))
  1164. def test_get_refs(self) -> None:
  1165. """Test getting refs from bundle."""
  1166. bundle_path, _ = self._create_test_bundle()
  1167. client = BundleClient()
  1168. result = client.get_refs(bundle_path)
  1169. self.assertIn(b"refs/heads/master", result.refs)
  1170. self.assertEqual(result.symrefs, {})
  1171. def test_fetch_pack(self) -> None:
  1172. """Test fetching pack from bundle."""
  1173. bundle_path, _source_repo = self._create_test_bundle()
  1174. client = BundleClient()
  1175. pack_data = BytesIO()
  1176. def determine_wants(refs):
  1177. return list(refs.values())
  1178. class MockGraphWalker:
  1179. def next(self):
  1180. return None
  1181. def ack(self, sha):
  1182. pass
  1183. result = client.fetch_pack(
  1184. bundle_path, determine_wants, MockGraphWalker(), pack_data.write
  1185. )
  1186. # Verify we got refs back
  1187. self.assertIn(b"refs/heads/master", result.refs)
  1188. # Verify pack data was written
  1189. self.assertGreater(len(pack_data.getvalue()), 0)
  1190. def test_fetch(self) -> None:
  1191. """Test fetching from bundle into target repo."""
  1192. bundle_path, _source_repo = self._create_test_bundle()
  1193. client = BundleClient()
  1194. target_repo = MemoryRepo()
  1195. result = client.fetch(bundle_path, target_repo)
  1196. # Verify refs were imported
  1197. self.assertIn(b"refs/heads/master", result.refs)
  1198. # Verify objects were imported
  1199. master_id = result.refs[b"refs/heads/master"]
  1200. self.assertIn(master_id, target_repo.object_store)
  1201. # Verify the commit object is correct
  1202. commit = target_repo.object_store[master_id]
  1203. self.assertEqual(commit.message, b"Initial commit")
  1204. def test_send_pack_not_supported(self) -> None:
  1205. """Test that send_pack raises NotImplementedError."""
  1206. bundle_path, _ = self._create_test_bundle()
  1207. client = BundleClient()
  1208. with self.assertRaises(NotImplementedError):
  1209. client.send_pack(bundle_path, None, None)
  1210. def test_get_transport_and_path_bundle(self) -> None:
  1211. """Test that get_transport_and_path detects bundle files."""
  1212. bundle_path, _ = self._create_test_bundle()
  1213. client, path = get_transport_and_path(bundle_path)
  1214. self.assertIsInstance(client, BundleClient)
  1215. self.assertEqual(path, bundle_path)
  1216. class HttpGitClientTests(TestCase):
  1217. def test_get_url(self) -> None:
  1218. base_url = "https://github.com/jelmer/dulwich"
  1219. path = "/jelmer/dulwich"
  1220. c = HttpGitClient(base_url)
  1221. url = c.get_url(path)
  1222. self.assertEqual("https://github.com/jelmer/dulwich", url)
  1223. def test_get_url_bytes_path(self) -> None:
  1224. base_url = "https://github.com/jelmer/dulwich"
  1225. path_bytes = b"/jelmer/dulwich"
  1226. c = HttpGitClient(base_url)
  1227. url = c.get_url(path_bytes)
  1228. self.assertEqual("https://github.com/jelmer/dulwich", url)
  1229. def test_get_url_with_username_and_passwd(self) -> None:
  1230. base_url = "https://github.com/jelmer/dulwich"
  1231. path = "/jelmer/dulwich"
  1232. c = HttpGitClient(base_url, username="USERNAME", password="PASSWD")
  1233. url = c.get_url(path)
  1234. self.assertEqual("https://github.com/jelmer/dulwich", url)
  1235. def test_init_username_passwd_set(self) -> None:
  1236. url = "https://github.com/jelmer/dulwich"
  1237. c = HttpGitClient(url, config=None, username="user", password="passwd")
  1238. self.assertEqual("user", c._username)
  1239. self.assertEqual("passwd", c._password)
  1240. basic_auth = c.pool_manager.headers["authorization"]
  1241. auth_string = "{}:{}".format("user", "passwd")
  1242. b64_credentials = base64.b64encode(auth_string.encode("latin1"))
  1243. expected_basic_auth = "Basic {}".format(b64_credentials.decode("latin1"))
  1244. self.assertEqual(basic_auth, expected_basic_auth)
  1245. def test_init_username_set_no_password(self) -> None:
  1246. url = "https://github.com/jelmer/dulwich"
  1247. c = HttpGitClient(url, config=None, username="user")
  1248. self.assertEqual("user", c._username)
  1249. self.assertIsNone(c._password)
  1250. basic_auth = c.pool_manager.headers["authorization"]
  1251. auth_string = b"user:"
  1252. b64_credentials = base64.b64encode(auth_string)
  1253. expected_basic_auth = f"Basic {b64_credentials.decode('ascii')}"
  1254. self.assertEqual(basic_auth, expected_basic_auth)
  1255. def test_init_no_username_passwd(self) -> None:
  1256. url = "https://github.com/jelmer/dulwich"
  1257. c = HttpGitClient(url, config=None)
  1258. self.assertIs(None, c._username)
  1259. self.assertIs(None, c._password)
  1260. self.assertNotIn("authorization", c.pool_manager.headers)
  1261. def test_from_parsedurl_username_only(self) -> None:
  1262. username = "user"
  1263. url = f"https://{username}@github.com/jelmer/dulwich"
  1264. c = HttpGitClient.from_parsedurl(urlparse(url))
  1265. self.assertEqual(c._username, username)
  1266. self.assertEqual(c._password, None)
  1267. basic_auth = c.pool_manager.headers["authorization"]
  1268. auth_string = username.encode("ascii") + b":"
  1269. b64_credentials = base64.b64encode(auth_string)
  1270. expected_basic_auth = f"Basic {b64_credentials.decode('ascii')}"
  1271. self.assertEqual(basic_auth, expected_basic_auth)
  1272. def test_from_parsedurl_on_url_with_quoted_credentials(self) -> None:
  1273. original_username = "john|the|first"
  1274. quoted_username = urlquote(original_username)
  1275. original_password = "Ya#1$2%3"
  1276. quoted_password = urlquote(original_password)
  1277. url = f"https://{quoted_username}:{quoted_password}@github.com/jelmer/dulwich"
  1278. c = HttpGitClient.from_parsedurl(urlparse(url))
  1279. self.assertEqual(original_username, c._username)
  1280. self.assertEqual(original_password, c._password)
  1281. basic_auth = c.pool_manager.headers["authorization"]
  1282. auth_string = f"{original_username}:{original_password}"
  1283. b64_credentials = base64.b64encode(auth_string.encode("latin1"))
  1284. expected_basic_auth = "Basic {}".format(b64_credentials.decode("latin1"))
  1285. self.assertEqual(basic_auth, expected_basic_auth)
  1286. def test_url_redirect_location(self) -> None:
  1287. from urllib3.response import HTTPResponse
  1288. test_data = {
  1289. "https://gitlab.com/inkscape/inkscape/": {
  1290. "location": "https://gitlab.com/inkscape/inkscape.git/",
  1291. "redirect_url": "https://gitlab.com/inkscape/inkscape.git/",
  1292. "refs_data": (
  1293. b"001e# service=git-upload-pack\n00000032"
  1294. b"fb2bebf4919a011f0fd7cec085443d0031228e76 "
  1295. b"HEAD\n0000"
  1296. ),
  1297. },
  1298. "https://github.com/jelmer/dulwich/": {
  1299. "location": "https://github.com/jelmer/dulwich/",
  1300. "redirect_url": "https://github.com/jelmer/dulwich/",
  1301. "refs_data": (
  1302. b"001e# service=git-upload-pack\n00000032"
  1303. b"3ff25e09724aa4d86ea5bca7d5dd0399a3c8bfcf "
  1304. b"HEAD\n0000"
  1305. ),
  1306. },
  1307. # check for absolute-path URI reference as location
  1308. "https://codeberg.org/ashwinvis/radicale-sh.git/": {
  1309. "location": "/ashwinvis/radicale-auth-sh/",
  1310. "redirect_url": "https://codeberg.org/ashwinvis/radicale-auth-sh/",
  1311. "refs_data": (
  1312. b"001e# service=git-upload-pack\n00000032"
  1313. b"470f8603768b608fc988675de2fae8f963c21158 "
  1314. b"HEAD\n0000"
  1315. ),
  1316. },
  1317. }
  1318. tail = "info/refs?service=git-upload-pack"
  1319. # we need to mock urllib3.PoolManager as this test will fail
  1320. # otherwise without an active internet connection
  1321. class PoolManagerMock:
  1322. def __init__(self) -> None:
  1323. self.headers: dict[str, str] = {}
  1324. def request(
  1325. self,
  1326. method,
  1327. url,
  1328. fields=None,
  1329. headers=None,
  1330. redirect=True,
  1331. preload_content=True,
  1332. ):
  1333. base_url = url[: -len(tail)]
  1334. redirect_base_url = test_data[base_url]["location"]
  1335. redirect_url = redirect_base_url + tail
  1336. headers = {
  1337. "Content-Type": "application/x-git-upload-pack-advertisement"
  1338. }
  1339. body = test_data[base_url]["refs_data"]
  1340. # urllib3 handles automatic redirection by default
  1341. status = 200
  1342. request_url = redirect_url
  1343. # simulate urllib3 behavior when redirect parameter is False
  1344. if redirect is False:
  1345. request_url = url
  1346. if redirect_base_url != base_url:
  1347. body = b""
  1348. headers["location"] = test_data[base_url]["location"]
  1349. status = 301
  1350. return HTTPResponse(
  1351. body=BytesIO(body),
  1352. headers=headers,
  1353. request_method=method,
  1354. request_url=request_url,
  1355. preload_content=preload_content,
  1356. status=status,
  1357. )
  1358. pool_manager = PoolManagerMock()
  1359. for base_url in test_data.keys():
  1360. # instantiate HttpGitClient with mocked pool manager
  1361. c = HttpGitClient(base_url, pool_manager=pool_manager, config=None)
  1362. # call method that detects url redirection
  1363. _, _, processed_url, _, _ = c._discover_references(
  1364. b"git-upload-pack", base_url
  1365. )
  1366. # send the same request as the method above without redirection
  1367. resp = c.pool_manager.request("GET", base_url + tail, redirect=False)
  1368. # check expected behavior of urllib3
  1369. redirect_location = resp.get_redirect_location()
  1370. if resp.status == 200:
  1371. self.assertFalse(redirect_location)
  1372. if redirect_location:
  1373. # check that url redirection has been correctly detected
  1374. self.assertEqual(processed_url, test_data[base_url]["redirect_url"])
  1375. else:
  1376. # check also the no redirection case
  1377. self.assertEqual(processed_url, base_url)
  1378. def test_smart_request_content_type_with_directive_check(self) -> None:
  1379. from urllib3.response import HTTPResponse
  1380. # we need to mock urllib3.PoolManager as this test will fail
  1381. # otherwise without an active internet connection
  1382. class PoolManagerMock:
  1383. def __init__(self) -> None:
  1384. self.headers: dict[str, str] = {}
  1385. def request(
  1386. self,
  1387. method,
  1388. url,
  1389. fields=None,
  1390. headers=None,
  1391. redirect=True,
  1392. preload_content=True,
  1393. ):
  1394. return HTTPResponse(
  1395. headers={
  1396. "Content-Type": "application/x-git-upload-pack-result; charset=utf-8"
  1397. },
  1398. request_method=method,
  1399. request_url=url,
  1400. preload_content=preload_content,
  1401. status=200,
  1402. )
  1403. clone_url = "https://hacktivis.me/git/blog.git/"
  1404. client = HttpGitClient(clone_url, pool_manager=PoolManagerMock(), config=None)
  1405. self.assertTrue(client._smart_request("git-upload-pack", clone_url, data=None))
  1406. def test_urllib3_protocol_error(self) -> None:
  1407. from urllib3.exceptions import ProtocolError
  1408. from urllib3.response import HTTPResponse
  1409. error_msg = "protocol error"
  1410. # we need to mock urllib3.PoolManager as this test will fail
  1411. # otherwise without an active internet connection
  1412. class PoolManagerMock:
  1413. def __init__(self) -> None:
  1414. self.headers: dict[str, str] = {}
  1415. def request(
  1416. self,
  1417. method,
  1418. url,
  1419. fields=None,
  1420. headers=None,
  1421. redirect=True,
  1422. preload_content=True,
  1423. ):
  1424. response = HTTPResponse(
  1425. headers={"Content-Type": "application/x-git-upload-pack-result"},
  1426. request_method=method,
  1427. request_url=url,
  1428. preload_content=preload_content,
  1429. status=200,
  1430. )
  1431. def read(self) -> NoReturn:
  1432. raise ProtocolError(error_msg)
  1433. # override HTTPResponse.read to throw urllib3.exceptions.ProtocolError
  1434. response.read = read
  1435. return response
  1436. def check_heads(heads, **kwargs):
  1437. self.assertEqual(heads, {})
  1438. return []
  1439. clone_url = "https://git.example.org/user/project.git/"
  1440. client = HttpGitClient(clone_url, pool_manager=PoolManagerMock(), config=None)
  1441. with self.assertRaises(GitProtocolError, msg=error_msg):
  1442. client.fetch_pack(b"/", check_heads, None, None)
  1443. def test_fetch_pack_dumb_http(self) -> None:
  1444. import zlib
  1445. from urllib3.response import HTTPResponse
  1446. # Mock responses for dumb HTTP
  1447. info_refs_content = (
  1448. b"0123456789abcdef0123456789abcdef01234567\trefs/heads/master\n"
  1449. )
  1450. head_content = b"ref: refs/heads/master"
  1451. # Create a blob object for testing
  1452. blob_content = b"Hello, dumb HTTP!"
  1453. blob_sha = b"0123456789abcdef0123456789abcdef01234567"
  1454. blob_hex = blob_sha.decode("ascii")
  1455. blob_obj_data = (
  1456. b"blob " + str(len(blob_content)).encode() + b"\x00" + blob_content
  1457. )
  1458. blob_compressed = zlib.compress(blob_obj_data)
  1459. responses = {
  1460. "/HEAD": {
  1461. "status": 200,
  1462. "content": head_content,
  1463. "content_type": "text/plain",
  1464. },
  1465. "/git-upload-pack": {
  1466. "status": 404,
  1467. "content": b"Not Found",
  1468. "content_type": "text/plain",
  1469. },
  1470. "/info/refs": {
  1471. "status": 200,
  1472. "content": info_refs_content,
  1473. "content_type": "text/plain",
  1474. },
  1475. f"/objects/{blob_hex[:2]}/{blob_hex[2:]}": {
  1476. "status": 200,
  1477. "content": blob_compressed,
  1478. "content_type": "application/octet-stream",
  1479. },
  1480. }
  1481. class PoolManagerMock:
  1482. def __init__(self) -> None:
  1483. self.headers: dict[str, str] = {}
  1484. def request(
  1485. self,
  1486. method,
  1487. url,
  1488. fields=None,
  1489. headers=None,
  1490. redirect=True,
  1491. preload_content=True,
  1492. ):
  1493. # Extract path from URL
  1494. from urllib.parse import urlparse
  1495. parsed = urlparse(url)
  1496. path = parsed.path.rstrip("/")
  1497. # Find matching response
  1498. for pattern, resp_data in responses.items():
  1499. if path.endswith(pattern):
  1500. return HTTPResponse(
  1501. body=BytesIO(resp_data["content"]),
  1502. headers={
  1503. "Content-Type": resp_data.get(
  1504. "content_type", "text/plain"
  1505. )
  1506. },
  1507. request_method=method,
  1508. request_url=url,
  1509. preload_content=preload_content,
  1510. status=resp_data["status"],
  1511. )
  1512. # Default 404
  1513. return HTTPResponse(
  1514. body=BytesIO(b"Not Found"),
  1515. headers={"Content-Type": "text/plain"},
  1516. request_method=method,
  1517. request_url=url,
  1518. preload_content=preload_content,
  1519. status=404,
  1520. )
  1521. def determine_wants(heads, **kwargs):
  1522. # heads contains the refs with SHA values, just return the SHA we want
  1523. return [heads[b"refs/heads/master"]]
  1524. received_data = []
  1525. def pack_data_handler(data):
  1526. # Collect pack data
  1527. received_data.append(data)
  1528. clone_url = "https://git.example.org/repo.git/"
  1529. client = HttpGitClient(clone_url, pool_manager=PoolManagerMock(), config=None)
  1530. # Mock graph walker that says we don't have anything
  1531. class MockGraphWalker:
  1532. def ack(self, sha):
  1533. return []
  1534. graph_walker = MockGraphWalker()
  1535. result = client.fetch_pack(
  1536. b"/", determine_wants, graph_walker, pack_data_handler
  1537. )
  1538. # Verify we got the refs
  1539. expected_sha = blob_hex.encode("ascii")
  1540. self.assertEqual({b"refs/heads/master": expected_sha}, result.refs)
  1541. # Verify we received pack data
  1542. self.assertTrue(len(received_data) > 0)
  1543. pack_data = b"".join(received_data)
  1544. self.assertTrue(len(pack_data) > 0)
  1545. # The pack should be valid pack format
  1546. self.assertTrue(pack_data.startswith(b"PACK"))
  1547. # Pack header: PACK + version (4 bytes) + num objects (4 bytes)
  1548. self.assertEqual(pack_data[4:8], b"\x00\x00\x00\x02") # version 2
  1549. self.assertEqual(pack_data[8:12], b"\x00\x00\x00\x01") # 1 object
  1550. def test_timeout_configuration(self) -> None:
  1551. """Test that timeout parameter is properly configured."""
  1552. url = "https://github.com/jelmer/dulwich"
  1553. timeout = 30
  1554. c = HttpGitClient(url, timeout=timeout)
  1555. self.assertEqual(c._timeout, timeout)
  1556. def test_timeout_from_config(self) -> None:
  1557. """Test that timeout can be configured via git config."""
  1558. url = "https://github.com/jelmer/dulwich"
  1559. config = ConfigDict()
  1560. config.set((b"http",), b"timeout", b"25")
  1561. c = HttpGitClient(url, config=config)
  1562. # The timeout should be set on the pool manager
  1563. # Since we can't easily access the timeout from the pool manager,
  1564. # we just verify the client was created successfully
  1565. self.assertIsNotNone(c.pool_manager)
  1566. def test_timeout_parameter_precedence(self) -> None:
  1567. """Test that explicit timeout parameter takes precedence over config."""
  1568. url = "https://github.com/jelmer/dulwich"
  1569. config = ConfigDict()
  1570. config.set((b"http",), b"timeout", b"25")
  1571. c = HttpGitClient(url, config=config, timeout=15)
  1572. self.assertEqual(c._timeout, 15)
  1573. def test_http_extra_headers_from_config(self) -> None:
  1574. """Test that http.extraHeader config values are applied."""
  1575. from dulwich.config import ConfigDict
  1576. url = "https://github.com/jelmer/dulwich"
  1577. config = ConfigDict()
  1578. # Set a single extra header
  1579. config.set((b"http",), b"extraHeader", b"X-Custom-Header: test-value")
  1580. c = HttpGitClient(url, config=config)
  1581. # Check that the header was added to the pool manager
  1582. self.assertIn("X-Custom-Header", c.pool_manager.headers)
  1583. self.assertEqual(c.pool_manager.headers["X-Custom-Header"], "test-value")
  1584. def test_http_multiple_extra_headers_from_config(self) -> None:
  1585. """Test that multiple http.extraHeader config values are applied."""
  1586. from dulwich.config import ConfigDict
  1587. url = "https://github.com/jelmer/dulwich"
  1588. config = ConfigDict()
  1589. # Set multiple extra headers
  1590. config.set((b"http",), b"extraHeader", b"X-Header-1: value1")
  1591. config.add((b"http",), b"extraHeader", b"X-Header-2: value2")
  1592. config.add((b"http",), b"extraHeader", b"Authorization: Bearer token123")
  1593. c = HttpGitClient(url, config=config)
  1594. # Check that all headers were added to the pool manager
  1595. self.assertIn("X-Header-1", c.pool_manager.headers)
  1596. self.assertEqual(c.pool_manager.headers["X-Header-1"], "value1")
  1597. self.assertIn("X-Header-2", c.pool_manager.headers)
  1598. self.assertEqual(c.pool_manager.headers["X-Header-2"], "value2")
  1599. self.assertIn("Authorization", c.pool_manager.headers)
  1600. self.assertEqual(c.pool_manager.headers["Authorization"], "Bearer token123")
  1601. def test_http_extra_headers_per_url_config(self) -> None:
  1602. """Test that per-URL http.extraHeader config values are applied (issue #882)."""
  1603. from dulwich.config import ConfigDict
  1604. url = "https://github.com/jelmer/dulwich"
  1605. config = ConfigDict()
  1606. # Set URL-specific extra header
  1607. config.set(
  1608. (b"http", b"https://github.com/"),
  1609. b"extraHeader",
  1610. b"Authorization: basic token123",
  1611. )
  1612. c = HttpGitClient(url, config=config)
  1613. # Check that the header was added to the pool manager
  1614. self.assertIn("Authorization", c.pool_manager.headers)
  1615. self.assertEqual(c.pool_manager.headers["Authorization"], "basic token123")
  1616. def test_http_extra_headers_url_specificity(self) -> None:
  1617. """Test that more specific URL configs override less specific ones."""
  1618. from dulwich.config import ConfigDict
  1619. url = "https://github.com/jelmer/dulwich"
  1620. config = ConfigDict()
  1621. # Set global header
  1622. config.set((b"http",), b"extraHeader", b"X-Global: global-value")
  1623. # Set host-specific header (overrides global)
  1624. config.set(
  1625. (b"http", b"https://github.com/"), b"extraHeader", b"X-Global: github-value"
  1626. )
  1627. config.add(
  1628. (b"http", b"https://github.com/"),
  1629. b"extraHeader",
  1630. b"Authorization: Bearer token123",
  1631. )
  1632. c = HttpGitClient(url, config=config)
  1633. # More specific setting should win
  1634. self.assertEqual(c.pool_manager.headers["X-Global"], "github-value")
  1635. self.assertEqual(c.pool_manager.headers["Authorization"], "Bearer token123")
  1636. def test_http_extra_headers_multiple_url_configs(self) -> None:
  1637. """Test that different URLs can have different extra headers."""
  1638. from dulwich.config import ConfigDict
  1639. config = ConfigDict()
  1640. # Set different headers for different URLs
  1641. config.set(
  1642. (b"http", b"https://github.com/"),
  1643. b"extraHeader",
  1644. b"Authorization: Bearer github-token",
  1645. )
  1646. config.set(
  1647. (b"http", b"https://gitlab.com/"),
  1648. b"extraHeader",
  1649. b"Authorization: Bearer gitlab-token",
  1650. )
  1651. # Test GitHub URL
  1652. c1 = HttpGitClient("https://github.com/user/repo", config=config)
  1653. self.assertEqual(
  1654. c1.pool_manager.headers["Authorization"], "Bearer github-token"
  1655. )
  1656. # Test GitLab URL
  1657. c2 = HttpGitClient("https://gitlab.com/user/repo", config=config)
  1658. self.assertEqual(
  1659. c2.pool_manager.headers["Authorization"], "Bearer gitlab-token"
  1660. )
  1661. def test_http_extra_headers_no_match(self) -> None:
  1662. """Test that non-matching URL configs don't apply."""
  1663. from dulwich.config import ConfigDict
  1664. url = "https://example.com/repo"
  1665. config = ConfigDict()
  1666. # Set header only for GitHub
  1667. config.set(
  1668. (b"http", b"https://github.com/"),
  1669. b"extraHeader",
  1670. b"Authorization: Bearer token123",
  1671. )
  1672. c = HttpGitClient(url, config=config)
  1673. # Authorization header should not be present for example.com
  1674. self.assertNotIn("Authorization", c.pool_manager.headers)
  1675. def test_http_extra_headers_invalid_format(self) -> None:
  1676. """Test that invalid extra headers trigger warnings."""
  1677. import logging
  1678. from dulwich.config import ConfigDict
  1679. url = "https://github.com/jelmer/dulwich"
  1680. config = ConfigDict()
  1681. # Set valid header
  1682. config.set((b"http",), b"extraHeader", b"X-Valid: valid-value")
  1683. # Set invalid headers (no colon-space separator)
  1684. config.add((b"http",), b"extraHeader", b"X-Invalid-No-Separator")
  1685. # Set empty header
  1686. config.add((b"http",), b"extraHeader", b"")
  1687. # Set another valid header to verify we continue processing
  1688. config.add((b"http",), b"extraHeader", b"X-Another-Valid: another-value")
  1689. with self.assertLogs("dulwich.client", level=logging.WARNING) as cm:
  1690. c = HttpGitClient(url, config=config)
  1691. # Check that warnings were logged
  1692. self.assertEqual(len(cm.output), 2)
  1693. self.assertIn("missing ': ' separator", cm.output[0])
  1694. self.assertIn("empty http.extraHeader", cm.output[1])
  1695. # Valid headers should still be applied
  1696. self.assertIn("X-Valid", c.pool_manager.headers)
  1697. self.assertEqual(c.pool_manager.headers["X-Valid"], "valid-value")
  1698. self.assertIn("X-Another-Valid", c.pool_manager.headers)
  1699. self.assertEqual(c.pool_manager.headers["X-Another-Valid"], "another-value")
  1700. # Invalid header should not be present
  1701. self.assertNotIn("X-Invalid-No-Separator", c.pool_manager.headers)
  1702. def test_get_url_preserves_credentials_from_url(self) -> None:
  1703. """Test that credentials from URL are preserved in get_url() (issue #1925)."""
  1704. # When credentials come from the URL (not passed explicitly),
  1705. # they should be included in get_url() so they're saved to git config
  1706. username = "ghp_token123"
  1707. url = f"https://{username}@github.com/jelmer/dulwich"
  1708. path = "/jelmer/dulwich"
  1709. c = HttpGitClient.from_parsedurl(urlparse(url))
  1710. reconstructed_url = c.get_url(path)
  1711. # Credentials should be preserved in the URL
  1712. self.assertIn(username, reconstructed_url)
  1713. self.assertEqual(
  1714. f"https://{username}@github.com/jelmer/dulwich", reconstructed_url
  1715. )
  1716. def test_get_url_preserves_credentials_with_password_from_url(self) -> None:
  1717. """Test that username:password from URL are preserved in get_url()."""
  1718. username = "user"
  1719. password = "pass"
  1720. url = f"https://{username}:{password}@github.com/jelmer/dulwich"
  1721. path = "/jelmer/dulwich"
  1722. c = HttpGitClient.from_parsedurl(urlparse(url))
  1723. reconstructed_url = c.get_url(path)
  1724. # Both username and password should be preserved
  1725. self.assertIn(username, reconstructed_url)
  1726. self.assertIn(password, reconstructed_url)
  1727. self.assertEqual(
  1728. f"https://{username}:{password}@github.com/jelmer/dulwich",
  1729. reconstructed_url,
  1730. )
  1731. def test_get_url_preserves_special_chars_in_credentials(self) -> None:
  1732. """Test that special characters in credentials are properly escaped."""
  1733. # URL-encoded credentials with special characters
  1734. original_username = "user@domain"
  1735. original_password = "p@ss:word"
  1736. quoted_username = urlquote(original_username, safe="")
  1737. quoted_password = urlquote(original_password, safe="")
  1738. url = f"https://{quoted_username}:{quoted_password}@github.com/jelmer/dulwich"
  1739. path = "/jelmer/dulwich"
  1740. c = HttpGitClient.from_parsedurl(urlparse(url))
  1741. reconstructed_url = c.get_url(path)
  1742. # The reconstructed URL should have properly escaped credentials
  1743. self.assertIn(quoted_username, reconstructed_url)
  1744. self.assertIn(quoted_password, reconstructed_url)
  1745. # Verify the URL is valid by parsing it back
  1746. parsed = urlparse(reconstructed_url)
  1747. from urllib.parse import unquote
  1748. self.assertEqual(unquote(parsed.username), original_username)
  1749. self.assertEqual(unquote(parsed.password), original_password)
  1750. def test_get_url_explicit_credentials_not_in_url(self) -> None:
  1751. """Test that explicitly passed credentials are NOT included in get_url()."""
  1752. # When credentials are passed explicitly (not from URL),
  1753. # they should NOT appear in get_url() for security
  1754. base_url = "https://github.com/jelmer/dulwich"
  1755. path = "/jelmer/dulwich"
  1756. username = "explicit_user"
  1757. password = "explicit_pass"
  1758. c = HttpGitClient(base_url, username=username, password=password)
  1759. url = c.get_url(path)
  1760. # Credentials should NOT be in the URL
  1761. self.assertNotIn(username, url)
  1762. self.assertNotIn(password, url)
  1763. self.assertEqual("https://github.com/jelmer/dulwich", url)
  1764. def test_pool_manager_parameter(self) -> None:
  1765. """Test that pool_manager parameter is properly passed through."""
  1766. import urllib3
  1767. # Create a custom pool manager
  1768. custom_pool_manager = urllib3.PoolManager()
  1769. # Test with get_transport_and_path_from_url
  1770. url = "https://github.com/jelmer/dulwich"
  1771. client, _path = get_transport_and_path_from_url(
  1772. url, pool_manager=custom_pool_manager
  1773. )
  1774. # Verify the client is an HTTP client and has our custom pool manager
  1775. self.assertIsInstance(client, HttpGitClient)
  1776. self.assertIs(client.pool_manager, custom_pool_manager)
  1777. # Test with get_transport_and_path
  1778. client2, _path2 = get_transport_and_path(url, pool_manager=custom_pool_manager)
  1779. # Verify the client is an HTTP client and has our custom pool manager
  1780. self.assertIsInstance(client2, HttpGitClient)
  1781. self.assertIs(client2.pool_manager, custom_pool_manager)
  1782. def test_urllib3_subclass_support(self) -> None:
  1783. """Test that subclasses of Urllib3HttpGitClient are properly supported.
  1784. This test verifies that the bug fix for commit d1f41c5c works correctly.
  1785. Previously, the code used `cls is Urllib3HttpGitClient` which failed for
  1786. subclasses. Now it uses `issubclass(cls, Urllib3HttpGitClient)` which
  1787. correctly handles subclasses.
  1788. """
  1789. # Create a custom subclass of Urllib3HttpGitClient
  1790. class CustomUrllib3HttpGitClient(Urllib3HttpGitClient):
  1791. def __init__(self, *args, **kwargs):
  1792. super().__init__(*args, **kwargs)
  1793. self.custom_attribute = "custom_value"
  1794. # Test with AbstractHttpGitClient.from_parsedurl directly
  1795. # This is how subclasses use the client
  1796. from urllib.parse import urlparse
  1797. parsed = urlparse("https://github.com/jelmer/dulwich")
  1798. config = ConfigDict()
  1799. client = CustomUrllib3HttpGitClient.from_parsedurl(parsed, config=config)
  1800. # Verify the client is our custom subclass
  1801. self.assertIsInstance(client, CustomUrllib3HttpGitClient)
  1802. self.assertIsInstance(client, Urllib3HttpGitClient)
  1803. self.assertEqual("custom_value", client.custom_attribute)
  1804. # Verify the config was passed through (this was the bug - it wasn't passed to subclasses before)
  1805. self.assertIsNotNone(client.config)
  1806. def test_auth_callbacks(self) -> None:
  1807. url = "https://github.com/jelmer/dulwich"
  1808. def auth_callback(url, www_authenticate, attempt):
  1809. return {"username": "user", "password": "pass"}
  1810. def proxy_auth_callback(url, proxy_authenticate, attempt):
  1811. return {"username": "proxy_user", "password": "proxy_pass"}
  1812. c = HttpGitClient(
  1813. url, auth_callback=auth_callback, proxy_auth_callback=proxy_auth_callback
  1814. )
  1815. # Check that the pool manager is wrapped with AuthCallbackPoolManager
  1816. self.assertIsInstance(c.pool_manager, AuthCallbackPoolManager)
  1817. self.assertEqual(c._auth_callback, auth_callback)
  1818. self.assertEqual(c._proxy_auth_callback, proxy_auth_callback)
  1819. class TCPGitClientTests(TestCase):
  1820. def test_get_url(self) -> None:
  1821. host = "github.com"
  1822. path = "/jelmer/dulwich"
  1823. c = TCPGitClient(host)
  1824. url = c.get_url(path)
  1825. self.assertEqual("git://github.com/jelmer/dulwich", url)
  1826. def test_get_url_with_port(self) -> None:
  1827. host = "github.com"
  1828. path = "/jelmer/dulwich"
  1829. port = 9090
  1830. c = TCPGitClient(host, port=port)
  1831. url = c.get_url(path)
  1832. self.assertEqual("git://github.com:9090/jelmer/dulwich", url)
  1833. def test_get_url_with_ipv6(self) -> None:
  1834. host = "::1"
  1835. path = "/jelmer/dulwich"
  1836. c = TCPGitClient(host)
  1837. url = c.get_url(path)
  1838. self.assertEqual("git://[::1]/jelmer/dulwich", url)
  1839. def test_get_url_with_ipv6_and_port(self) -> None:
  1840. host = "2001:db8::1"
  1841. path = "/jelmer/dulwich"
  1842. port = 9090
  1843. c = TCPGitClient(host, port=port)
  1844. url = c.get_url(path)
  1845. self.assertEqual("git://[2001:db8::1]:9090/jelmer/dulwich", url)
  1846. def test_get_url_with_ipv6_default_port(self) -> None:
  1847. host = "2001:db8::1"
  1848. path = "/jelmer/dulwich"
  1849. port = TCP_GIT_PORT # Default port should not be included in URL
  1850. c = TCPGitClient(host, port=port)
  1851. url = c.get_url(path)
  1852. self.assertEqual("git://[2001:db8::1]/jelmer/dulwich", url)
  1853. class AuthCallbackPoolManagerTest(TestCase):
  1854. def test_http_auth_callback(self) -> None:
  1855. # Create a mock pool manager
  1856. mock_pool_manager = Mock()
  1857. mock_response = Mock()
  1858. # First request returns 401
  1859. mock_response.status = 401
  1860. mock_response.headers = {"WWW-Authenticate": 'Basic realm="test"'}
  1861. # Second request (after auth) returns 200
  1862. mock_response_success = Mock()
  1863. mock_response_success.status = 200
  1864. mock_response_success.headers = {}
  1865. mock_pool_manager.request = MagicMock(
  1866. side_effect=[mock_response, mock_response_success]
  1867. )
  1868. # Auth callback that returns credentials
  1869. def auth_callback(url, www_authenticate, attempt):
  1870. if attempt == 1:
  1871. return {"username": "testuser", "password": "testpass"}
  1872. return None
  1873. # Create the wrapper
  1874. auth_manager = AuthCallbackPoolManager(
  1875. mock_pool_manager, auth_callback=auth_callback
  1876. )
  1877. # Make request
  1878. result = auth_manager.request("GET", "https://example.com/test")
  1879. # Verify two requests were made
  1880. self.assertEqual(mock_pool_manager.request.call_count, 2)
  1881. # Verify auth headers were added on second request
  1882. second_call_kwargs = mock_pool_manager.request.call_args_list[1][1]
  1883. self.assertIn("headers", second_call_kwargs)
  1884. # urllib3 returns lowercase header names
  1885. self.assertIn("authorization", second_call_kwargs["headers"])
  1886. # Result should be the successful response
  1887. self.assertEqual(result, mock_response_success)
  1888. def test_proxy_auth_callback(self) -> None:
  1889. # Create a mock pool manager
  1890. mock_pool_manager = Mock()
  1891. mock_response = Mock()
  1892. # First request returns 407
  1893. mock_response.status = 407
  1894. mock_response.headers = {"Proxy-Authenticate": 'Basic realm="proxy"'}
  1895. # Second request (after auth) returns 200
  1896. mock_response_success = Mock()
  1897. mock_response_success.status = 200
  1898. mock_response_success.headers = {}
  1899. mock_pool_manager.request = MagicMock(
  1900. side_effect=[mock_response, mock_response_success]
  1901. )
  1902. # Proxy auth callback that returns credentials
  1903. def proxy_auth_callback(url, proxy_authenticate, attempt):
  1904. if attempt == 1:
  1905. return {"username": "proxyuser", "password": "proxypass"}
  1906. return None
  1907. # Create the wrapper
  1908. auth_manager = AuthCallbackPoolManager(
  1909. mock_pool_manager, proxy_auth_callback=proxy_auth_callback
  1910. )
  1911. # Make request
  1912. result = auth_manager.request("GET", "https://example.com/test")
  1913. # Verify two requests were made
  1914. self.assertEqual(mock_pool_manager.request.call_count, 2)
  1915. # Verify proxy auth headers were added on second request
  1916. second_call_kwargs = mock_pool_manager.request.call_args_list[1][1]
  1917. self.assertIn("headers", second_call_kwargs)
  1918. # urllib3 returns lowercase header names
  1919. self.assertIn("proxy-authorization", second_call_kwargs["headers"])
  1920. # Result should be the successful response
  1921. self.assertEqual(result, mock_response_success)
  1922. def test_max_attempts(self) -> None:
  1923. # Create a mock pool manager that always returns 401
  1924. mock_pool_manager = Mock()
  1925. mock_response = Mock()
  1926. mock_response.status = 401
  1927. mock_response.headers = {"WWW-Authenticate": 'Basic realm="test"'}
  1928. mock_pool_manager.request.return_value = mock_response
  1929. # Auth callback that always returns credentials
  1930. def auth_callback(url, www_authenticate, attempt):
  1931. return {"username": "user", "password": "pass"}
  1932. # Create the wrapper
  1933. auth_manager = AuthCallbackPoolManager(
  1934. mock_pool_manager, auth_callback=auth_callback
  1935. )
  1936. # Make request
  1937. result = auth_manager.request("GET", "https://example.com/test")
  1938. # Should have made 3 attempts (initial + 2 retries)
  1939. self.assertEqual(mock_pool_manager.request.call_count, 3)
  1940. # Result should be the last 401 response
  1941. self.assertEqual(result.status, 401)
  1942. class DefaultUrllib3ManagerTest(TestCase):
  1943. def test_no_config(self) -> None:
  1944. manager = default_urllib3_manager(config=None)
  1945. self.assertEqual(manager.connection_pool_kw["cert_reqs"], "CERT_REQUIRED")
  1946. def test_auth_callbacks(self) -> None:
  1947. def auth_callback(url, www_authenticate, attempt):
  1948. return {"username": "user", "password": "pass"}
  1949. def proxy_auth_callback(url, proxy_authenticate, attempt):
  1950. return {"username": "proxy_user", "password": "proxy_pass"}
  1951. manager = default_urllib3_manager(
  1952. config=None,
  1953. auth_callback=auth_callback,
  1954. proxy_auth_callback=proxy_auth_callback,
  1955. )
  1956. self.assertIsInstance(manager, AuthCallbackPoolManager)
  1957. self.assertEqual(manager._auth_callback, auth_callback)
  1958. self.assertEqual(manager._proxy_auth_callback, proxy_auth_callback)
  1959. def test_proxy_auth_method_unsupported(self) -> None:
  1960. import os
  1961. # Test with config
  1962. config = ConfigDict()
  1963. config.set((b"http",), b"proxy", b"http://user@proxy.example.com:8080")
  1964. config.set((b"http",), b"proxyAuthMethod", b"digest")
  1965. with self.assertRaises(NotImplementedError) as cm:
  1966. default_urllib3_manager(config=config)
  1967. self.assertIn("digest", str(cm.exception))
  1968. self.assertIn("not supported", str(cm.exception))
  1969. # Test with environment variable
  1970. config = ConfigDict()
  1971. config.set((b"http",), b"proxy", b"http://user@proxy.example.com:8080")
  1972. old_env = os.environ.get("GIT_HTTP_PROXY_AUTHMETHOD")
  1973. try:
  1974. os.environ["GIT_HTTP_PROXY_AUTHMETHOD"] = "ntlm"
  1975. with self.assertRaises(NotImplementedError) as cm:
  1976. default_urllib3_manager(config=config)
  1977. self.assertIn("ntlm", str(cm.exception))
  1978. self.assertIn("not supported", str(cm.exception))
  1979. finally:
  1980. if old_env is None:
  1981. os.environ.pop("GIT_HTTP_PROXY_AUTHMETHOD", None)
  1982. else:
  1983. os.environ["GIT_HTTP_PROXY_AUTHMETHOD"] = old_env
  1984. def test_proxy_auth_method_supported(self) -> None:
  1985. # Test basic auth method
  1986. config = ConfigDict()
  1987. config.set((b"http",), b"proxy", b"http://user@proxy.example.com:8080")
  1988. config.set((b"http",), b"proxyAuthMethod", b"basic")
  1989. # Should not raise
  1990. manager = default_urllib3_manager(config=config)
  1991. self.assertIsNotNone(manager)
  1992. # Test anyauth (default)
  1993. config.set((b"http",), b"proxyAuthMethod", b"anyauth")
  1994. manager = default_urllib3_manager(config=config)
  1995. self.assertIsNotNone(manager)
  1996. def test_config_no_proxy(self) -> None:
  1997. import urllib3
  1998. manager = default_urllib3_manager(config=ConfigDict())
  1999. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  2000. self.assertIsInstance(manager, urllib3.PoolManager)
  2001. def test_config_no_proxy_custom_cls(self) -> None:
  2002. import urllib3
  2003. class CustomPoolManager(urllib3.PoolManager):
  2004. pass
  2005. manager = default_urllib3_manager(
  2006. config=ConfigDict(), pool_manager_cls=CustomPoolManager
  2007. )
  2008. self.assertIsInstance(manager, CustomPoolManager)
  2009. def test_config_ssl(self) -> None:
  2010. config = ConfigDict()
  2011. config.set(b"http", b"sslVerify", b"true")
  2012. manager = default_urllib3_manager(config=config)
  2013. self.assertEqual(manager.connection_pool_kw["cert_reqs"], "CERT_REQUIRED")
  2014. def test_config_no_ssl(self) -> None:
  2015. config = ConfigDict()
  2016. config.set(b"http", b"sslVerify", b"false")
  2017. manager = default_urllib3_manager(config=config)
  2018. self.assertEqual(manager.connection_pool_kw["cert_reqs"], "CERT_NONE")
  2019. def test_config_proxy(self) -> None:
  2020. import urllib3
  2021. config = ConfigDict()
  2022. config.set(b"http", b"proxy", b"http://localhost:3128/")
  2023. manager = default_urllib3_manager(config=config)
  2024. self.assertIsInstance(manager, urllib3.ProxyManager)
  2025. self.assertTrue(hasattr(manager, "proxy"))
  2026. self.assertEqual(manager.proxy.scheme, "http")
  2027. self.assertEqual(manager.proxy.host, "localhost")
  2028. self.assertEqual(manager.proxy.port, 3128)
  2029. def test_environment_proxy(self) -> None:
  2030. import urllib3
  2031. config = ConfigDict()
  2032. self.overrideEnv("http_proxy", "http://myproxy:8080")
  2033. manager = default_urllib3_manager(config=config)
  2034. self.assertIsInstance(manager, urllib3.ProxyManager)
  2035. self.assertTrue(hasattr(manager, "proxy"))
  2036. self.assertEqual(manager.proxy.scheme, "http")
  2037. self.assertEqual(manager.proxy.host, "myproxy")
  2038. self.assertEqual(manager.proxy.port, 8080)
  2039. def test_environment_empty_proxy(self) -> None:
  2040. import urllib3
  2041. config = ConfigDict()
  2042. self.overrideEnv("http_proxy", "")
  2043. manager = default_urllib3_manager(config=config)
  2044. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  2045. self.assertIsInstance(manager, urllib3.PoolManager)
  2046. def test_environment_no_proxy_1(self) -> None:
  2047. import urllib3
  2048. config = ConfigDict()
  2049. self.overrideEnv("http_proxy", "http://myproxy:8080")
  2050. self.overrideEnv("no_proxy", "xyz,abc.def.gh,abc.gh")
  2051. base_url = "http://xyz.abc.def.gh:8080/path/port"
  2052. manager = default_urllib3_manager(config=config, base_url=base_url)
  2053. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  2054. self.assertIsInstance(manager, urllib3.PoolManager)
  2055. def test_environment_no_proxy_2(self) -> None:
  2056. import urllib3
  2057. config = ConfigDict()
  2058. self.overrideEnv("http_proxy", "http://myproxy:8080")
  2059. self.overrideEnv("no_proxy", "xyz,abc.def.gh,abc.gh,ample.com")
  2060. base_url = "http://ample.com/path/port"
  2061. manager = default_urllib3_manager(config=config, base_url=base_url)
  2062. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  2063. self.assertIsInstance(manager, urllib3.PoolManager)
  2064. def test_environment_no_proxy_3(self) -> None:
  2065. import urllib3
  2066. config = ConfigDict()
  2067. self.overrideEnv("http_proxy", "http://myproxy:8080")
  2068. self.overrideEnv("no_proxy", "xyz,abc.def.gh,abc.gh,ample.com")
  2069. base_url = "http://ample.com:80/path/port"
  2070. manager = default_urllib3_manager(config=config, base_url=base_url)
  2071. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  2072. self.assertIsInstance(manager, urllib3.PoolManager)
  2073. def test_environment_no_proxy_4(self) -> None:
  2074. import urllib3
  2075. config = ConfigDict()
  2076. self.overrideEnv("http_proxy", "http://myproxy:8080")
  2077. self.overrideEnv("no_proxy", "xyz,abc.def.gh,abc.gh,ample.com")
  2078. base_url = "http://www.ample.com/path/port"
  2079. manager = default_urllib3_manager(config=config, base_url=base_url)
  2080. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  2081. self.assertIsInstance(manager, urllib3.PoolManager)
  2082. def test_environment_no_proxy_5(self) -> None:
  2083. import urllib3
  2084. config = ConfigDict()
  2085. self.overrideEnv("http_proxy", "http://myproxy:8080")
  2086. self.overrideEnv("no_proxy", "xyz,abc.def.gh,abc.gh,ample.com")
  2087. base_url = "http://www.example.com/path/port"
  2088. manager = default_urllib3_manager(config=config, base_url=base_url)
  2089. self.assertIsInstance(manager, urllib3.ProxyManager)
  2090. self.assertTrue(hasattr(manager, "proxy"))
  2091. self.assertEqual(manager.proxy.scheme, "http")
  2092. self.assertEqual(manager.proxy.host, "myproxy")
  2093. self.assertEqual(manager.proxy.port, 8080)
  2094. def test_environment_no_proxy_6(self) -> None:
  2095. import urllib3
  2096. config = ConfigDict()
  2097. self.overrideEnv("http_proxy", "http://myproxy:8080")
  2098. self.overrideEnv("no_proxy", "xyz,abc.def.gh,abc.gh,ample.com")
  2099. base_url = "http://ample.com.org/path/port"
  2100. manager = default_urllib3_manager(config=config, base_url=base_url)
  2101. self.assertIsInstance(manager, urllib3.ProxyManager)
  2102. self.assertTrue(hasattr(manager, "proxy"))
  2103. self.assertEqual(manager.proxy.scheme, "http")
  2104. self.assertEqual(manager.proxy.host, "myproxy")
  2105. self.assertEqual(manager.proxy.port, 8080)
  2106. def test_environment_no_proxy_ipv4_address_1(self) -> None:
  2107. import urllib3
  2108. config = ConfigDict()
  2109. self.overrideEnv("http_proxy", "http://myproxy:8080")
  2110. self.overrideEnv("no_proxy", "xyz,abc.def.gh,192.168.0.10,ample.com")
  2111. base_url = "http://192.168.0.10/path/port"
  2112. manager = default_urllib3_manager(config=config, base_url=base_url)
  2113. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  2114. self.assertIsInstance(manager, urllib3.PoolManager)
  2115. def test_environment_no_proxy_ipv4_address_2(self) -> None:
  2116. import urllib3
  2117. config = ConfigDict()
  2118. self.overrideEnv("http_proxy", "http://myproxy:8080")
  2119. self.overrideEnv("no_proxy", "xyz,abc.def.gh,192.168.0.10,ample.com")
  2120. base_url = "http://192.168.0.10:8888/path/port"
  2121. manager = default_urllib3_manager(config=config, base_url=base_url)
  2122. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  2123. self.assertIsInstance(manager, urllib3.PoolManager)
  2124. def test_environment_no_proxy_ipv4_address_3(self) -> None:
  2125. import urllib3
  2126. config = ConfigDict()
  2127. self.overrideEnv("http_proxy", "http://myproxy:8080")
  2128. self.overrideEnv(
  2129. "no_proxy", "xyz,abc.def.gh,ff80:1::/64,192.168.0.0/24,ample.com"
  2130. )
  2131. base_url = "http://192.168.0.10/path/port"
  2132. manager = default_urllib3_manager(config=config, base_url=base_url)
  2133. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  2134. self.assertIsInstance(manager, urllib3.PoolManager)
  2135. def test_environment_no_proxy_ipv6_address_1(self) -> None:
  2136. import urllib3
  2137. config = ConfigDict()
  2138. self.overrideEnv("http_proxy", "http://myproxy:8080")
  2139. self.overrideEnv("no_proxy", "xyz,abc.def.gh,ff80:1::affe,ample.com")
  2140. base_url = "http://[ff80:1::affe]/path/port"
  2141. manager = default_urllib3_manager(config=config, base_url=base_url)
  2142. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  2143. self.assertIsInstance(manager, urllib3.PoolManager)
  2144. def test_environment_no_proxy_ipv6_address_2(self) -> None:
  2145. import urllib3
  2146. config = ConfigDict()
  2147. self.overrideEnv("http_proxy", "http://myproxy:8080")
  2148. self.overrideEnv("no_proxy", "xyz,abc.def.gh,ff80:1::affe,ample.com")
  2149. base_url = "http://[ff80:1::affe]:1234/path/port"
  2150. manager = default_urllib3_manager(config=config, base_url=base_url)
  2151. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  2152. self.assertIsInstance(manager, urllib3.PoolManager)
  2153. def test_environment_no_proxy_ipv6_address_3(self) -> None:
  2154. import urllib3
  2155. config = ConfigDict()
  2156. self.overrideEnv("http_proxy", "http://myproxy:8080")
  2157. self.overrideEnv(
  2158. "no_proxy", "xyz,abc.def.gh,192.168.0.0/24,ff80:1::/64,ample.com"
  2159. )
  2160. base_url = "http://[ff80:1::affe]/path/port"
  2161. manager = default_urllib3_manager(config=config, base_url=base_url)
  2162. self.assertNotIsInstance(manager, urllib3.ProxyManager)
  2163. self.assertIsInstance(manager, urllib3.PoolManager)
  2164. def test_config_proxy_custom_cls(self) -> None:
  2165. import urllib3
  2166. class CustomProxyManager(urllib3.ProxyManager):
  2167. pass
  2168. config = ConfigDict()
  2169. config.set(b"http", b"proxy", b"http://localhost:3128/")
  2170. manager = default_urllib3_manager(
  2171. config=config, proxy_manager_cls=CustomProxyManager
  2172. )
  2173. self.assertIsInstance(manager, CustomProxyManager)
  2174. def test_config_proxy_creds(self) -> None:
  2175. import urllib3
  2176. config = ConfigDict()
  2177. config.set(b"http", b"proxy", b"http://jelmer:example@localhost:3128/")
  2178. manager = default_urllib3_manager(config=config)
  2179. assert isinstance(manager, urllib3.ProxyManager)
  2180. self.assertEqual(
  2181. manager.proxy_headers, {"proxy-authorization": "Basic amVsbWVyOmV4YW1wbGU="}
  2182. )
  2183. def test_config_no_verify_ssl(self) -> None:
  2184. manager = default_urllib3_manager(config=None, cert_reqs="CERT_NONE")
  2185. self.assertEqual(manager.connection_pool_kw["cert_reqs"], "CERT_NONE")
  2186. def test_timeout_parameter(self) -> None:
  2187. """Test that timeout parameter is passed to urllib3 manager."""
  2188. timeout = 30
  2189. manager = default_urllib3_manager(config=None, timeout=timeout)
  2190. self.assertEqual(manager.connection_pool_kw["timeout"], timeout)
  2191. def test_timeout_from_config(self) -> None:
  2192. """Test that timeout can be configured via git config."""
  2193. config = ConfigDict()
  2194. config.set((b"http",), b"timeout", b"25")
  2195. manager = default_urllib3_manager(config=config)
  2196. self.assertEqual(manager.connection_pool_kw["timeout"], 25)
  2197. def test_timeout_parameter_precedence(self) -> None:
  2198. """Test that explicit timeout parameter takes precedence over config."""
  2199. config = ConfigDict()
  2200. config.set((b"http",), b"timeout", b"25")
  2201. manager = default_urllib3_manager(config=config, timeout=15)
  2202. self.assertEqual(manager.connection_pool_kw["timeout"], 15)
  2203. class SubprocessSSHVendorTests(TestCase):
  2204. def setUp(self) -> None:
  2205. # Monkey Patch client subprocess popen
  2206. self._orig_popen = dulwich.client.subprocess.Popen
  2207. dulwich.client.subprocess.Popen = DummyPopen
  2208. def tearDown(self) -> None:
  2209. dulwich.client.subprocess.Popen = self._orig_popen
  2210. def test_run_command_dashes(self) -> None:
  2211. vendor = SubprocessSSHVendor()
  2212. self.assertRaises(
  2213. StrangeHostname,
  2214. vendor.run_command,
  2215. "--weird-host",
  2216. "git-clone-url",
  2217. )
  2218. def test_run_command_password(self) -> None:
  2219. vendor = SubprocessSSHVendor()
  2220. self.assertRaises(
  2221. NotImplementedError,
  2222. vendor.run_command,
  2223. "host",
  2224. "git-clone-url",
  2225. password="12345",
  2226. )
  2227. def test_run_command_password_and_privkey(self) -> None:
  2228. vendor = SubprocessSSHVendor()
  2229. self.assertRaises(
  2230. NotImplementedError,
  2231. vendor.run_command,
  2232. "host",
  2233. "git-clone-url",
  2234. password="12345",
  2235. key_filename="/tmp/id_rsa",
  2236. )
  2237. def test_run_command_with_port_username_and_privkey(self) -> None:
  2238. expected = [
  2239. "ssh",
  2240. "-x",
  2241. "-p",
  2242. "2200",
  2243. "-i",
  2244. "/tmp/id_rsa",
  2245. ]
  2246. if DEFAULT_GIT_PROTOCOL_VERSION_FETCH:
  2247. expected += [
  2248. "-o",
  2249. f"SetEnv GIT_PROTOCOL=version={DEFAULT_GIT_PROTOCOL_VERSION_FETCH}",
  2250. ]
  2251. expected += [
  2252. "user@host",
  2253. "git-clone-url",
  2254. ]
  2255. vendor = SubprocessSSHVendor()
  2256. command = vendor.run_command(
  2257. "host",
  2258. "git-clone-url",
  2259. username="user",
  2260. port="2200",
  2261. key_filename="/tmp/id_rsa",
  2262. )
  2263. args = command.proc.args
  2264. self.assertListEqual(expected, args[0])
  2265. def test_run_with_ssh_command(self) -> None:
  2266. expected = [
  2267. "/path/to/ssh",
  2268. "-o",
  2269. "Option=Value",
  2270. "-x",
  2271. ]
  2272. if DEFAULT_GIT_PROTOCOL_VERSION_FETCH:
  2273. expected += [
  2274. "-o",
  2275. f"SetEnv GIT_PROTOCOL=version={DEFAULT_GIT_PROTOCOL_VERSION_FETCH}",
  2276. ]
  2277. expected += [
  2278. "host",
  2279. "git-clone-url",
  2280. ]
  2281. vendor = SubprocessSSHVendor()
  2282. command = vendor.run_command(
  2283. "host",
  2284. "git-clone-url",
  2285. ssh_command="/path/to/ssh -o Option=Value",
  2286. )
  2287. args = command.proc.args
  2288. self.assertListEqual(expected, args[0])
  2289. class PLinkSSHVendorTests(TestCase):
  2290. def setUp(self) -> None:
  2291. # Monkey Patch client subprocess popen
  2292. self._orig_popen = dulwich.client.subprocess.Popen
  2293. dulwich.client.subprocess.Popen = DummyPopen
  2294. def tearDown(self) -> None:
  2295. dulwich.client.subprocess.Popen = self._orig_popen
  2296. def test_run_command_dashes(self) -> None:
  2297. vendor = PLinkSSHVendor()
  2298. self.assertRaises(
  2299. StrangeHostname,
  2300. vendor.run_command,
  2301. "--weird-host",
  2302. "git-clone-url",
  2303. )
  2304. def test_run_command_password_and_privkey(self) -> None:
  2305. vendor = PLinkSSHVendor()
  2306. warnings.simplefilter("always", UserWarning)
  2307. self.addCleanup(warnings.resetwarnings)
  2308. warnings_list, restore_warnings = setup_warning_catcher()
  2309. self.addCleanup(restore_warnings)
  2310. command = vendor.run_command(
  2311. "host",
  2312. "git-clone-url",
  2313. password="12345",
  2314. key_filename="/tmp/id_rsa",
  2315. )
  2316. expected_warning = UserWarning(
  2317. "Invoking PLink with a password exposes the password in the process list."
  2318. )
  2319. for w in warnings_list:
  2320. if type(w) is type(expected_warning) and w.args == expected_warning.args:
  2321. break
  2322. else:
  2323. raise AssertionError(
  2324. f"Expected warning {expected_warning!r} not in {warnings_list!r}"
  2325. )
  2326. args = command.proc.args
  2327. if sys.platform == "win32":
  2328. binary = ["plink.exe", "-ssh"]
  2329. else:
  2330. binary = ["plink", "-ssh"]
  2331. expected = [
  2332. *binary,
  2333. "-pw",
  2334. "12345",
  2335. "-i",
  2336. "/tmp/id_rsa",
  2337. "host",
  2338. "git-clone-url",
  2339. ]
  2340. self.assertListEqual(expected, args[0])
  2341. def test_run_command_password(self) -> None:
  2342. if sys.platform == "win32":
  2343. binary = ["plink.exe", "-ssh"]
  2344. else:
  2345. binary = ["plink", "-ssh"]
  2346. expected = [*binary, "-pw", "12345", "host", "git-clone-url"]
  2347. vendor = PLinkSSHVendor()
  2348. warnings.simplefilter("always", UserWarning)
  2349. self.addCleanup(warnings.resetwarnings)
  2350. warnings_list, restore_warnings = setup_warning_catcher()
  2351. self.addCleanup(restore_warnings)
  2352. command = vendor.run_command("host", "git-clone-url", password="12345")
  2353. expected_warning = UserWarning(
  2354. "Invoking PLink with a password exposes the password in the process list."
  2355. )
  2356. for w in warnings_list:
  2357. if type(w) is type(expected_warning) and w.args == expected_warning.args:
  2358. break
  2359. else:
  2360. raise AssertionError(
  2361. f"Expected warning {expected_warning!r} not in {warnings_list!r}"
  2362. )
  2363. args = command.proc.args
  2364. self.assertListEqual(expected, args[0])
  2365. def test_run_command_with_port_username_and_privkey(self) -> None:
  2366. if sys.platform == "win32":
  2367. binary = ["plink.exe", "-ssh"]
  2368. else:
  2369. binary = ["plink", "-ssh"]
  2370. expected = [
  2371. *binary,
  2372. "-P",
  2373. "2200",
  2374. "-i",
  2375. "/tmp/id_rsa",
  2376. "user@host",
  2377. "git-clone-url",
  2378. ]
  2379. vendor = PLinkSSHVendor()
  2380. command = vendor.run_command(
  2381. "host",
  2382. "git-clone-url",
  2383. username="user",
  2384. port="2200",
  2385. key_filename="/tmp/id_rsa",
  2386. )
  2387. args = command.proc.args
  2388. self.assertListEqual(expected, args[0])
  2389. def test_run_with_ssh_command(self) -> None:
  2390. expected = [
  2391. "/path/to/plink",
  2392. "-ssh",
  2393. "host",
  2394. "git-clone-url",
  2395. ]
  2396. vendor = PLinkSSHVendor()
  2397. command = vendor.run_command(
  2398. "host",
  2399. "git-clone-url",
  2400. ssh_command="/path/to/plink",
  2401. )
  2402. args = command.proc.args
  2403. self.assertListEqual(expected, args[0])
  2404. class RsyncUrlTests(TestCase):
  2405. def test_simple(self) -> None:
  2406. self.assertEqual(parse_rsync_url("foo:bar/path"), (None, "foo", "bar/path"))
  2407. self.assertEqual(
  2408. parse_rsync_url("user@foo:bar/path"), ("user", "foo", "bar/path")
  2409. )
  2410. def test_path(self) -> None:
  2411. self.assertRaises(ValueError, parse_rsync_url, "/path")
  2412. class CheckWantsTests(TestCase):
  2413. def test_fine(self) -> None:
  2414. check_wants(
  2415. [b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262"],
  2416. {b"refs/heads/blah": b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262"},
  2417. )
  2418. def test_missing(self) -> None:
  2419. self.assertRaises(
  2420. InvalidWants,
  2421. check_wants,
  2422. [b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262"],
  2423. {b"refs/heads/blah": b"3f3dc7a53fb752a6961d3a56683df46d4d3bf262"},
  2424. )
  2425. def test_annotated(self) -> None:
  2426. self.assertRaises(
  2427. InvalidWants,
  2428. check_wants,
  2429. [b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262"],
  2430. {
  2431. b"refs/heads/blah": b"3f3dc7a53fb752a6961d3a56683df46d4d3bf262",
  2432. b"refs/heads/blah^{}": b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262",
  2433. },
  2434. )
  2435. class FetchPackResultTests(TestCase):
  2436. def test_eq(self) -> None:
  2437. self.assertEqual(
  2438. FetchPackResult(
  2439. {b"refs/heads/master": b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262"},
  2440. {},
  2441. b"user/agent",
  2442. ),
  2443. FetchPackResult(
  2444. {b"refs/heads/master": b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262"},
  2445. {},
  2446. b"user/agent",
  2447. ),
  2448. )
  2449. class GitCredentialStoreTests(TestCase):
  2450. @classmethod
  2451. def setUpClass(cls) -> None:
  2452. with tempfile.NamedTemporaryFile(delete=False) as f:
  2453. f.write(b"https://user:pass@example.org\n")
  2454. cls.fname = f.name
  2455. @classmethod
  2456. def tearDownClass(cls) -> None:
  2457. os.unlink(cls.fname)
  2458. def test_nonmatching_scheme(self) -> None:
  2459. result = list(
  2460. get_credentials_from_store("http", "example.org", fnames=[self.fname])
  2461. )
  2462. self.assertEqual(result, [])
  2463. def test_nonmatching_hostname(self) -> None:
  2464. result = list(
  2465. get_credentials_from_store("https", "noentry.org", fnames=[self.fname])
  2466. )
  2467. self.assertEqual(result, [])
  2468. def test_match_without_username(self) -> None:
  2469. result = list(
  2470. get_credentials_from_store("https", "example.org", fnames=[self.fname])
  2471. )
  2472. self.assertEqual(result, [("user", "pass")])
  2473. def test_match_with_matching_username(self) -> None:
  2474. result = list(
  2475. get_credentials_from_store(
  2476. "https", "example.org", "user", fnames=[self.fname]
  2477. )
  2478. )
  2479. self.assertEqual(result, [("user", "pass")])
  2480. def test_no_match_with_nonmatching_username(self) -> None:
  2481. result = list(
  2482. get_credentials_from_store(
  2483. "https", "example.org", "otheruser", fnames=[self.fname]
  2484. )
  2485. )
  2486. self.assertEqual(result, [])
  2487. class RemoteErrorFromStderrTests(TestCase):
  2488. def test_nothing(self) -> None:
  2489. self.assertEqual(_remote_error_from_stderr(None), HangupException())
  2490. def test_error_line(self) -> None:
  2491. b = BytesIO(
  2492. b"""\
  2493. This is some random output.
  2494. ERROR: This is the actual error
  2495. with a tail
  2496. """
  2497. )
  2498. self.assertEqual(
  2499. _remote_error_from_stderr(b),
  2500. GitProtocolError("This is the actual error"),
  2501. )
  2502. def test_no_error_line(self) -> None:
  2503. b = BytesIO(
  2504. b"""\
  2505. This is output without an error line.
  2506. And this line is just random noise, too.
  2507. """
  2508. )
  2509. self.assertEqual(
  2510. _remote_error_from_stderr(b),
  2511. HangupException(
  2512. [
  2513. b"This is output without an error line.",
  2514. b"And this line is just random noise, too.",
  2515. ]
  2516. ),
  2517. )
  2518. class TestExtractAgentAndSymrefs(TestCase):
  2519. def test_extract_agent_and_symrefs(self) -> None:
  2520. (symrefs, agent) = _extract_symrefs_and_agent(
  2521. [b"agent=git/2.31.1", b"symref=HEAD:refs/heads/master"]
  2522. )
  2523. self.assertEqual(agent, b"git/2.31.1")
  2524. self.assertEqual(symrefs, {b"HEAD": b"refs/heads/master"})