| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840384138423843384438453846384738483849385038513852385338543855385638573858385938603861386238633864386538663867386838693870387138723873387438753876387738783879388038813882388338843885388638873888388938903891389238933894389538963897389838993900390139023903390439053906390739083909391039113912391339143915391639173918391939203921392239233924392539263927392839293930393139323933393439353936393739383939394039413942394339443945394639473948394939503951395239533954395539563957395839593960396139623963396439653966396739683969397039713972397339743975397639773978397939803981398239833984398539863987398839893990399139923993399439953996399739983999400040014002400340044005400640074008400940104011401240134014401540164017401840194020402140224023402440254026402740284029403040314032403340344035403640374038403940404041404240434044404540464047404840494050405140524053405440554056405740584059406040614062406340644065406640674068406940704071407240734074407540764077407840794080408140824083408440854086408740884089409040914092409340944095409640974098409941004101410241034104410541064107410841094110411141124113411441154116411741184119412041214122412341244125412641274128412941304131413241334134413541364137413841394140414141424143414441454146414741484149415041514152415341544155415641574158415941604161416241634164416541664167416841694170417141724173417441754176417741784179418041814182418341844185418641874188418941904191419241934194419541964197419841994200420142024203420442054206420742084209421042114212421342144215421642174218421942204221422242234224422542264227422842294230423142324233423442354236423742384239424042414242424342444245424642474248424942504251425242534254425542564257425842594260426142624263426442654266426742684269427042714272427342744275427642774278427942804281428242834284428542864287428842894290429142924293429442954296429742984299430043014302430343044305430643074308430943104311431243134314431543164317431843194320432143224323432443254326432743284329433043314332433343344335433643374338433943404341434243434344434543464347434843494350435143524353435443554356435743584359436043614362436343644365436643674368436943704371437243734374437543764377437843794380438143824383438443854386438743884389439043914392439343944395439643974398439944004401440244034404440544064407440844094410441144124413441444154416441744184419442044214422442344244425442644274428442944304431443244334434443544364437443844394440444144424443444444454446444744484449445044514452445344544455445644574458445944604461446244634464446544664467446844694470447144724473447444754476447744784479448044814482448344844485448644874488448944904491449244934494449544964497449844994500450145024503450445054506450745084509451045114512451345144515451645174518451945204521452245234524452545264527452845294530453145324533453445354536453745384539454045414542454345444545454645474548454945504551455245534554455545564557455845594560456145624563456445654566456745684569457045714572457345744575457645774578457945804581458245834584458545864587458845894590459145924593459445954596459745984599460046014602460346044605460646074608460946104611461246134614461546164617461846194620462146224623462446254626462746284629463046314632463346344635463646374638463946404641464246434644464546464647464846494650465146524653465446554656465746584659466046614662466346644665466646674668466946704671467246734674467546764677467846794680468146824683468446854686468746884689469046914692469346944695469646974698469947004701470247034704470547064707470847094710471147124713471447154716471747184719472047214722472347244725472647274728472947304731473247334734473547364737473847394740474147424743474447454746474747484749475047514752475347544755475647574758475947604761 |
- # client.py -- Implementation of the client side git protocols
- # Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
- #
- # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
- # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
- # General Public License as published by the Free Software Foundation; version 2.0
- # or (at your option) any later version. You can redistribute it and/or
- # modify it under the terms of either of these two licenses.
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- # You should have received a copy of the licenses; if not, see
- # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
- # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
- # License, Version 2.0.
- #
- """Client side support for the Git protocol.
- The Dulwich client supports the following capabilities:
- * thin-pack
- * multi_ack_detailed
- * multi_ack
- * side-band-64k
- * ofs-delta
- * quiet
- * report-status
- * delete-refs
- * shallow
- Known capabilities that are not supported:
- * no-progress
- * include-tag
- """
- __all__ = [
- "COMMON_CAPABILITIES",
- "DEFAULT_GIT_CREDENTIALS_PATHS",
- "DEFAULT_REF_PREFIX",
- "RECEIVE_CAPABILITIES",
- "UPLOAD_CAPABILITIES",
- "AbstractHttpGitClient",
- "BundleClient",
- "FetchPackResult",
- "GitClient",
- "HTTPProxyUnauthorized",
- "HTTPUnauthorized",
- "InvalidWants",
- "LocalGitClient",
- "LsRemoteResult",
- "PLinkSSHVendor",
- "ReportStatusParser",
- "SSHGitClient",
- "SSHVendor",
- "SendPackResult",
- "StrangeHostname",
- "SubprocessGitClient",
- "SubprocessSSHVendor",
- "SubprocessWrapper",
- "TCPGitClient",
- "TraditionalGitClient",
- "Urllib3HttpGitClient",
- "check_for_proxy_bypass",
- "check_wants",
- "default_urllib3_manager",
- "default_user_agent_string",
- "find_capability",
- "find_git_command",
- "get_credentials_from_store",
- "get_transport_and_path",
- "get_transport_and_path_from_url",
- "negotiate_protocol_version",
- "parse_rsync_url",
- "read_pkt_refs_v1",
- "read_pkt_refs_v2",
- "read_server_capabilities",
- ]
- import copy
- import functools
- import logging
- import os
- import select
- import socket
- import subprocess
- import sys
- from collections.abc import Callable, Iterable, Iterator, Mapping, Sequence, Set
- from contextlib import closing
- from io import BufferedReader, BytesIO
- from typing import (
- IO,
- TYPE_CHECKING,
- Any,
- ClassVar,
- )
- from urllib.parse import ParseResult, urljoin, urlparse, urlunparse, urlunsplit
- from urllib.parse import quote as urlquote
- if TYPE_CHECKING:
- import urllib3
- import dulwich
- if TYPE_CHECKING:
- from typing import Protocol as TypingProtocol
- from .objects import ObjectID
- from .pack import UnpackedObject
- from .refs import Ref
- class HTTPResponse(TypingProtocol):
- """Protocol for HTTP response objects."""
- redirect_location: str | None
- content_type: str | None
- def close(self) -> None:
- """Close the response."""
- ...
- class GeneratePackDataFunc(TypingProtocol):
- """Protocol for generate_pack_data functions."""
- def __call__(
- self,
- have: Set[ObjectID],
- want: Set[ObjectID],
- *,
- ofs_delta: bool = False,
- progress: Callable[[bytes], None] | None = None,
- ) -> tuple[int, Iterator[UnpackedObject]]:
- """Generate pack data for the given have and want sets."""
- ...
- class DetermineWantsFunc(TypingProtocol):
- """Protocol for determine_wants functions."""
- def __call__(
- self,
- refs: Mapping[Ref, ObjectID],
- depth: int | None = None,
- ) -> list[ObjectID]:
- """Determine the objects to fetch from the given refs."""
- ...
- from .bundle import Bundle
- from .config import Config, apply_instead_of, get_xdg_config_home_path
- from .credentials import match_partial_url, match_urls
- from .errors import GitProtocolError, HangupException, NotGitRepository, SendPackError
- from .object_format import DEFAULT_OBJECT_FORMAT
- from .object_store import GraphWalker
- from .objects import ObjectID
- from .pack import (
- PACK_SPOOL_FILE_MAX_SIZE,
- PackChunkGenerator,
- PackData,
- write_pack_from_container,
- )
- from .protocol import (
- _RBUFSIZE,
- CAPABILITIES_REF,
- CAPABILITY_AGENT,
- CAPABILITY_DELETE_REFS,
- CAPABILITY_FETCH,
- CAPABILITY_FILTER,
- CAPABILITY_INCLUDE_TAG,
- CAPABILITY_MULTI_ACK,
- CAPABILITY_MULTI_ACK_DETAILED,
- CAPABILITY_OFS_DELTA,
- CAPABILITY_QUIET,
- CAPABILITY_REPORT_STATUS,
- CAPABILITY_SHALLOW,
- CAPABILITY_SIDE_BAND_64K,
- CAPABILITY_SYMREF,
- CAPABILITY_THIN_PACK,
- COMMAND_DEEPEN,
- COMMAND_DEEPEN_NOT,
- COMMAND_DEEPEN_SINCE,
- COMMAND_DONE,
- COMMAND_HAVE,
- COMMAND_SHALLOW,
- COMMAND_UNSHALLOW,
- COMMAND_WANT,
- DEFAULT_GIT_PROTOCOL_VERSION_FETCH,
- DEFAULT_GIT_PROTOCOL_VERSION_SEND,
- GIT_PROTOCOL_VERSIONS,
- KNOWN_RECEIVE_CAPABILITIES,
- KNOWN_UPLOAD_CAPABILITIES,
- PEELED_TAG_SUFFIX,
- SIDE_BAND_CHANNEL_DATA,
- SIDE_BAND_CHANNEL_FATAL,
- SIDE_BAND_CHANNEL_PROGRESS,
- TCP_GIT_PORT,
- ZERO_SHA,
- PktLineParser,
- Protocol,
- agent_string,
- capability_agent,
- extract_capabilities,
- extract_capability_names,
- parse_capability,
- pkt_line,
- pkt_seq,
- split_peeled_refs,
- )
- from .refs import (
- HEADREF,
- SYMREF,
- Ref,
- _import_remote_refs,
- _set_default_branch,
- _set_head,
- _set_origin_head,
- filter_ref_prefix,
- read_info_refs,
- )
- from .repo import BaseRepo, Repo
- # Default ref prefix, used if none is specified.
- # GitHub defaults to just sending HEAD if no ref-prefix is
- # specified, so explicitly request all refs to match
- # behaviour with v1 when no ref-prefix is specified.
- DEFAULT_REF_PREFIX = [b"HEAD", b"refs/"]
- logger = logging.getLogger(__name__)
- class InvalidWants(Exception):
- """Invalid wants."""
- def __init__(self, wants: Set[bytes]) -> None:
- """Initialize InvalidWants exception.
- Args:
- wants: List of invalid wants
- """
- Exception.__init__(
- self, f"requested wants not in server provided refs: {wants!r}"
- )
- class HTTPUnauthorized(Exception):
- """Raised when authentication fails."""
- def __init__(self, www_authenticate: str | None, url: str) -> None:
- """Initialize HTTPUnauthorized exception.
- Args:
- www_authenticate: WWW-Authenticate header value
- url: URL that requires authentication
- """
- Exception.__init__(self, "No valid credentials provided")
- self.www_authenticate = www_authenticate
- self.url = url
- def _to_optional_dict(refs: Mapping[Ref, ObjectID]) -> dict[Ref, ObjectID | None]:
- """Convert a dict[Ref, ObjectID] to dict[Ref, Optional[ObjectID]].
- This is needed for compatibility with result types that expect Optional values.
- """
- return {k: v for k, v in refs.items()}
- class HTTPProxyUnauthorized(Exception):
- """Raised when proxy authentication fails."""
- def __init__(self, proxy_authenticate: str | None, url: str) -> None:
- """Initialize HTTPProxyUnauthorized exception.
- Args:
- proxy_authenticate: Proxy-Authenticate header value
- url: URL that requires proxy authentication
- """
- Exception.__init__(self, "No valid proxy credentials provided")
- self.proxy_authenticate = proxy_authenticate
- self.url = url
- def _fileno_can_read(fileno: int) -> bool:
- """Check if a file descriptor is readable."""
- return len(select.select([fileno], [], [], 0)[0]) > 0
- def _win32_peek_avail(handle: int) -> int:
- """Wrapper around PeekNamedPipe to check how many bytes are available."""
- from ctypes import ( # type: ignore[attr-defined,unused-ignore]
- byref,
- windll,
- wintypes,
- )
- c_avail = wintypes.DWORD()
- c_message = wintypes.DWORD()
- success = windll.kernel32.PeekNamedPipe(
- handle, None, 0, None, byref(c_avail), byref(c_message)
- )
- if not success:
- from ctypes import GetLastError # type: ignore[attr-defined,unused-ignore]
- raise OSError(GetLastError())
- return c_avail.value
- COMMON_CAPABILITIES = [CAPABILITY_OFS_DELTA, CAPABILITY_SIDE_BAND_64K]
- UPLOAD_CAPABILITIES = [
- CAPABILITY_THIN_PACK,
- CAPABILITY_MULTI_ACK,
- CAPABILITY_MULTI_ACK_DETAILED,
- CAPABILITY_SHALLOW,
- *COMMON_CAPABILITIES,
- ]
- RECEIVE_CAPABILITIES = [
- CAPABILITY_REPORT_STATUS,
- CAPABILITY_DELETE_REFS,
- *COMMON_CAPABILITIES,
- ]
- class ReportStatusParser:
- """Handle status as reported by servers with 'report-status' capability."""
- def __init__(self) -> None:
- """Initialize ReportStatusParser."""
- self._done = False
- self._pack_status: bytes | None = None
- self._ref_statuses: list[bytes] = []
- def check(self) -> Iterator[tuple[bytes, str | None]]:
- """Check if there were any errors and, if so, raise exceptions.
- Raises:
- SendPackError: Raised when the server could not unpack
- Returns:
- iterator over refs
- """
- if self._pack_status not in (b"unpack ok", None):
- raise SendPackError(self._pack_status)
- for status in self._ref_statuses:
- try:
- status, rest = status.split(b" ", 1)
- except ValueError:
- # malformed response, move on to the next one
- continue
- if status == b"ng":
- ref, error = rest.split(b" ", 1)
- yield ref, error.decode("utf-8")
- elif status == b"ok":
- yield rest, None
- else:
- raise GitProtocolError(f"invalid ref status {status!r}")
- def handle_packet(self, pkt: bytes | None) -> None:
- """Handle a packet.
- Raises:
- GitProtocolError: Raised when packets are received after a flush
- packet.
- """
- if self._done:
- raise GitProtocolError("received more data after status report")
- if pkt is None:
- self._done = True
- return
- if self._pack_status is None:
- self._pack_status = pkt.strip()
- else:
- ref_status = pkt.strip()
- self._ref_statuses.append(ref_status)
- def negotiate_protocol_version(proto: Protocol) -> int:
- """Negotiate protocol version with the server."""
- pkt = proto.read_pkt_line()
- if pkt is not None and pkt.strip() == b"version 2":
- return 2
- proto.unread_pkt_line(pkt)
- return 0
- def read_server_capabilities(pkt_seq: Iterable[bytes]) -> set[bytes]:
- """Read server capabilities from packet sequence."""
- server_capabilities = []
- for pkt in pkt_seq:
- server_capabilities.append(pkt)
- return set(server_capabilities)
- def extract_object_format_from_capabilities(
- capabilities: set[bytes],
- ) -> str | None:
- """Extract object format from server capabilities.
- Args:
- capabilities: Server capabilities
- Returns:
- Object format name as string (e.g., "sha1", "sha256"), or None if not specified
- """
- for capability in capabilities:
- k, v = parse_capability(capability)
- if k == b"object-format" and v is not None:
- return v.decode("ascii").strip()
- return None
- def read_pkt_refs_v2(
- pkt_seq: Iterable[bytes],
- ) -> tuple[dict[Ref, ObjectID | None], dict[Ref, Ref], dict[Ref, ObjectID]]:
- """Read references using protocol version 2."""
- refs: dict[Ref, ObjectID | None] = {}
- symrefs: dict[Ref, Ref] = {}
- peeled: dict[Ref, ObjectID] = {}
- # Receive refs from server
- for pkt in pkt_seq:
- parts = pkt.rstrip(b"\n").split(b" ")
- sha_bytes = parts[0]
- sha: ObjectID | None
- if sha_bytes == b"unborn":
- sha = None
- else:
- sha = ObjectID(sha_bytes)
- ref = Ref(parts[1])
- for part in parts[2:]:
- if part.startswith(b"peeled:"):
- peeled[ref] = ObjectID(part[7:])
- elif part.startswith(b"symref-target:"):
- symrefs[ref] = Ref(part[14:])
- else:
- logging.warning("unknown part in pkt-ref: %s", part)
- refs[ref] = sha
- return refs, symrefs, peeled
- def read_pkt_refs_v1(
- pkt_seq: Iterable[bytes],
- ) -> tuple[dict[Ref, ObjectID], set[bytes]]:
- """Read references using protocol version 1."""
- server_capabilities = None
- refs: dict[Ref, ObjectID] = {}
- # Receive refs from server
- for pkt in pkt_seq:
- (sha, ref) = pkt.rstrip(b"\n").split(None, 1)
- if sha == b"ERR":
- raise GitProtocolError(ref.decode("utf-8", "replace"))
- if server_capabilities is None:
- (ref, server_capabilities) = extract_capabilities(ref)
- refs[Ref(ref)] = ObjectID(sha)
- if len(refs) == 0:
- return {}, set()
- if refs == {CAPABILITIES_REF: ZERO_SHA}:
- refs = {}
- assert server_capabilities is not None
- return refs, set(server_capabilities)
- class _DeprecatedDictProxy:
- """Base class for result objects that provide deprecated dict-like interface."""
- refs: dict[Ref, ObjectID | None] # To be overridden by subclasses
- _FORWARDED_ATTRS: ClassVar[set[str]] = {
- "clear",
- "copy",
- "fromkeys",
- "get",
- "items",
- "keys",
- "pop",
- "popitem",
- "setdefault",
- "update",
- "values",
- "viewitems",
- "viewkeys",
- "viewvalues",
- }
- def _warn_deprecated(self) -> None:
- import warnings
- warnings.warn(
- f"Use {self.__class__.__name__}.refs instead.",
- DeprecationWarning,
- stacklevel=3,
- )
- def __contains__(self, name: Ref) -> bool:
- self._warn_deprecated()
- return name in self.refs
- def __getitem__(self, name: Ref) -> ObjectID | None:
- self._warn_deprecated()
- return self.refs[name]
- def __len__(self) -> int:
- self._warn_deprecated()
- return len(self.refs)
- def __iter__(self) -> Iterator[Ref]:
- self._warn_deprecated()
- return iter(self.refs)
- def __getattribute__(self, name: str) -> object:
- # Avoid infinite recursion by checking against class variable directly
- if name != "_FORWARDED_ATTRS" and name in type(self)._FORWARDED_ATTRS:
- self._warn_deprecated()
- # Direct attribute access to avoid recursion
- refs = object.__getattribute__(self, "refs")
- return getattr(refs, name)
- return super().__getattribute__(name)
- class FetchPackResult(_DeprecatedDictProxy):
- """Result of a fetch-pack operation.
- Attributes:
- refs: Dictionary with all remote refs
- symrefs: Dictionary with remote symrefs
- agent: User agent string
- object_format: Object format name (e.g., "sha1", "sha256") used by the remote, or None if not specified
- """
- refs: dict[Ref, ObjectID | None]
- symrefs: dict[Ref, Ref]
- agent: bytes | None
- object_format: str | None
- def __init__(
- self,
- refs: dict[Ref, ObjectID | None],
- symrefs: dict[Ref, Ref],
- agent: bytes | None,
- new_shallow: set[ObjectID] | None = None,
- new_unshallow: set[ObjectID] | None = None,
- object_format: str | None = None,
- ) -> None:
- """Initialize FetchPackResult.
- Args:
- refs: Dictionary with all remote refs
- symrefs: Dictionary with remote symrefs
- agent: User agent string
- new_shallow: New shallow commits
- new_unshallow: New unshallow commits
- object_format: Object format name (e.g., "sha1", "sha256") used by the remote
- """
- self.refs = refs
- self.symrefs = symrefs
- self.agent = agent
- self.new_shallow = new_shallow
- self.new_unshallow = new_unshallow
- self.object_format = object_format
- def __eq__(self, other: object) -> bool:
- """Check equality with another object."""
- if isinstance(other, dict):
- self._warn_deprecated()
- return self.refs == other
- if not isinstance(other, FetchPackResult):
- return False
- return (
- self.refs == other.refs
- and self.symrefs == other.symrefs
- and self.agent == other.agent
- )
- def __repr__(self) -> str:
- """Return string representation of FetchPackResult."""
- return f"{self.__class__.__name__}({self.refs!r}, {self.symrefs!r}, {self.agent!r})"
- class LsRemoteResult(_DeprecatedDictProxy):
- """Result of a ls-remote operation.
- Attributes:
- refs: Dictionary with all remote refs
- symrefs: Dictionary with remote symrefs
- object_format: Object format name (e.g., "sha1", "sha256") used by the remote, or None if not specified
- """
- symrefs: dict[Ref, Ref]
- object_format: str | None
- def __init__(
- self,
- refs: dict[Ref, ObjectID | None],
- symrefs: dict[Ref, Ref],
- object_format: str | None = None,
- ) -> None:
- """Initialize LsRemoteResult.
- Args:
- refs: Dictionary with all remote refs
- symrefs: Dictionary with remote symrefs
- object_format: Object format name (e.g., "sha1", "sha256") used by the remote
- """
- self.refs = refs
- self.symrefs = symrefs
- self.object_format = object_format
- def _warn_deprecated(self) -> None:
- import warnings
- warnings.warn(
- "Treating LsRemoteResult as a dictionary is deprecated. "
- "Use result.refs instead.",
- DeprecationWarning,
- stacklevel=3,
- )
- def __eq__(self, other: object) -> bool:
- """Check equality with another object."""
- if isinstance(other, dict):
- self._warn_deprecated()
- return self.refs == other
- if not isinstance(other, LsRemoteResult):
- return False
- return self.refs == other.refs and self.symrefs == other.symrefs
- def __repr__(self) -> str:
- """Return string representation of LsRemoteResult."""
- return f"{self.__class__.__name__}({self.refs!r}, {self.symrefs!r})"
- class SendPackResult(_DeprecatedDictProxy):
- """Result of a upload-pack operation.
- Attributes:
- refs: Dictionary with all remote refs
- agent: User agent string
- ref_status: Optional dictionary mapping ref name to error message (if it
- failed to update), or None if it was updated successfully
- """
- def __init__(
- self,
- refs: dict[Ref, ObjectID | None],
- agent: bytes | None = None,
- ref_status: dict[bytes, str | None] | None = None,
- ) -> None:
- """Initialize SendPackResult.
- Args:
- refs: Dictionary with all remote refs
- agent: User agent string
- ref_status: Optional dictionary mapping ref name to error message
- """
- self.refs = refs
- self.agent = agent
- self.ref_status = ref_status
- def __eq__(self, other: object) -> bool:
- """Check equality with another object."""
- if isinstance(other, dict):
- self._warn_deprecated()
- return self.refs == other
- if not isinstance(other, SendPackResult):
- return False
- return self.refs == other.refs and self.agent == other.agent
- def __repr__(self) -> str:
- """Return string representation of SendPackResult."""
- return f"{self.__class__.__name__}({self.refs!r}, {self.agent!r})"
- def _read_shallow_updates(
- pkt_seq: Iterable[bytes],
- ) -> tuple[set[ObjectID], set[ObjectID]]:
- new_shallow: set[ObjectID] = set()
- new_unshallow: set[ObjectID] = set()
- for pkt in pkt_seq:
- if pkt == b"shallow-info\n": # Git-protocol v2
- continue
- try:
- cmd, sha = pkt.split(b" ", 1)
- except ValueError:
- raise GitProtocolError(f"unknown command {pkt!r}")
- if cmd == COMMAND_SHALLOW:
- new_shallow.add(ObjectID(sha.strip()))
- elif cmd == COMMAND_UNSHALLOW:
- new_unshallow.add(ObjectID(sha.strip()))
- else:
- raise GitProtocolError(f"unknown command {pkt!r}")
- return (new_shallow, new_unshallow)
- class _v1ReceivePackHeader:
- def __init__(
- self,
- capabilities: Sequence[bytes],
- old_refs: Mapping[Ref, ObjectID],
- new_refs: Mapping[Ref, ObjectID],
- ) -> None:
- self.want: set[ObjectID] = set()
- self.have: set[ObjectID] = set()
- self._it = self._handle_receive_pack_head(capabilities, old_refs, new_refs)
- self.sent_capabilities = False
- def __iter__(self) -> Iterator[bytes | None]:
- return self._it
- def _handle_receive_pack_head(
- self,
- capabilities: Sequence[bytes],
- old_refs: Mapping[Ref, ObjectID],
- new_refs: Mapping[Ref, ObjectID],
- ) -> Iterator[bytes | None]:
- """Handle the head of a 'git-receive-pack' request.
- Args:
- capabilities: List of negotiated capabilities
- old_refs: Old refs, as received from the server
- new_refs: Refs to change
- Returns:
- (have, want) tuple
- """
- self.have = {x for x in old_refs.values() if not x == ZERO_SHA}
- for refname in new_refs:
- if not isinstance(refname, bytes):
- raise TypeError(f"refname is not a bytestring: {refname!r}")
- old_sha1 = old_refs.get(refname, ZERO_SHA)
- if not isinstance(old_sha1, bytes):
- raise TypeError(
- f"old sha1 for {refname!r} is not a bytestring: {old_sha1!r}"
- )
- new_sha1 = new_refs.get(refname, ZERO_SHA)
- if not isinstance(new_sha1, bytes):
- raise TypeError(
- f"old sha1 for {refname!r} is not a bytestring {new_sha1!r}"
- )
- if old_sha1 != new_sha1:
- logger.debug(
- "Sending updated ref %r: %r -> %r", refname, old_sha1, new_sha1
- )
- if self.sent_capabilities:
- yield old_sha1 + b" " + new_sha1 + b" " + refname
- else:
- yield (
- old_sha1
- + b" "
- + new_sha1
- + b" "
- + refname
- + b"\0"
- + b" ".join(sorted(capabilities))
- )
- self.sent_capabilities = True
- if new_sha1 not in self.have and new_sha1 != ZERO_SHA:
- self.want.add(new_sha1)
- yield None
- def _read_side_band64k_data(pkt_seq: Iterable[bytes]) -> Iterator[tuple[int, bytes]]:
- """Read per-channel data.
- This requires the side-band-64k capability.
- Args:
- pkt_seq: Sequence of packets to read
- """
- for pkt in pkt_seq:
- channel = ord(pkt[:1])
- yield channel, pkt[1:]
- def find_capability(
- capabilities: Iterable[bytes], key: bytes, value: bytes | None
- ) -> bytes | None:
- """Find a capability with a specific key and value."""
- for capability in capabilities:
- k, v = parse_capability(capability)
- if k != key:
- continue
- if value and v and value not in v.split(b" "):
- continue
- return capability
- return None
- def _handle_upload_pack_head(
- proto: Protocol,
- capabilities: Iterable[bytes],
- graph_walker: GraphWalker,
- wants: list[ObjectID],
- can_read: Callable[[], bool] | None,
- depth: int | None,
- protocol_version: int | None,
- shallow_since: str | None = None,
- shallow_exclude: list[str] | None = None,
- ) -> tuple[set[ObjectID] | None, set[ObjectID] | None]:
- """Handle the head of a 'git-upload-pack' request.
- Args:
- proto: Protocol object to read from
- capabilities: List of negotiated capabilities
- graph_walker: GraphWalker instance to call .ack() on
- wants: List of commits to fetch
- can_read: function that returns a boolean that indicates
- whether there is extra graph data to read on proto
- depth: Depth for request
- protocol_version: Neogiated Git protocol version.
- shallow_since: Deepen the history to include commits after this date
- shallow_exclude: Deepen the history to exclude commits reachable from these refs
- """
- new_shallow: set[ObjectID] | None
- new_unshallow: set[ObjectID] | None
- assert isinstance(wants, list) and isinstance(wants[0], bytes)
- wantcmd = COMMAND_WANT + b" " + wants[0]
- if protocol_version is None:
- protocol_version = DEFAULT_GIT_PROTOCOL_VERSION_SEND
- if protocol_version != 2:
- wantcmd += b" " + b" ".join(sorted(capabilities))
- wantcmd += b"\n"
- proto.write_pkt_line(wantcmd)
- for want in wants[1:]:
- proto.write_pkt_line(COMMAND_WANT + b" " + want + b"\n")
- if (
- depth not in (0, None)
- or shallow_since is not None
- or shallow_exclude
- or (hasattr(graph_walker, "shallow") and graph_walker.shallow)
- ):
- if protocol_version == 2:
- if not find_capability(capabilities, CAPABILITY_FETCH, CAPABILITY_SHALLOW):
- raise GitProtocolError(
- "server does not support shallow capability required for depth"
- )
- elif CAPABILITY_SHALLOW not in capabilities:
- raise GitProtocolError(
- "server does not support shallow capability required for depth"
- )
- if hasattr(graph_walker, "shallow"):
- for sha in graph_walker.shallow:
- proto.write_pkt_line(COMMAND_SHALLOW + b" " + sha + b"\n")
- if depth is not None:
- proto.write_pkt_line(
- COMMAND_DEEPEN + b" " + str(depth).encode("ascii") + b"\n"
- )
- if shallow_since is not None:
- proto.write_pkt_line(
- COMMAND_DEEPEN_SINCE + b" " + shallow_since.encode("ascii") + b"\n"
- )
- if shallow_exclude:
- for ref in shallow_exclude:
- proto.write_pkt_line(
- COMMAND_DEEPEN_NOT + b" " + ref.encode("ascii") + b"\n"
- )
- if protocol_version != 2:
- proto.write_pkt_line(None)
- have = next(graph_walker)
- while have:
- proto.write_pkt_line(COMMAND_HAVE + b" " + have + b"\n")
- if can_read is not None and can_read():
- pkt = proto.read_pkt_line()
- assert pkt is not None
- parts = pkt.rstrip(b"\n").split(b" ")
- if parts[0] == b"ACK":
- graph_walker.ack(ObjectID(parts[1]))
- if parts[2] in (b"continue", b"common"):
- pass
- elif parts[2] == b"ready":
- break
- else:
- raise AssertionError(
- f"{parts[2]!r} not in ('continue', 'ready', 'common)"
- )
- have = next(graph_walker)
- proto.write_pkt_line(COMMAND_DONE + b"\n")
- if protocol_version == 2:
- proto.write_pkt_line(None)
- if depth not in (0, None) or shallow_since is not None or shallow_exclude:
- if can_read is not None:
- (new_shallow, new_unshallow) = _read_shallow_updates(proto.read_pkt_seq())
- else:
- new_shallow = None
- new_unshallow = None
- else:
- new_shallow = new_unshallow = set[ObjectID]()
- return (new_shallow, new_unshallow)
- def _handle_upload_pack_tail(
- proto: "Protocol",
- capabilities: Set[bytes],
- graph_walker: "GraphWalker",
- pack_data: Callable[[bytes], int],
- progress: Callable[[bytes], None] | None = None,
- rbufsize: int = _RBUFSIZE,
- protocol_version: int = 0,
- ) -> None:
- """Handle the tail of a 'git-upload-pack' request.
- Args:
- proto: Protocol object to read from
- capabilities: List of negotiated capabilities
- graph_walker: GraphWalker instance to call .ack() on
- pack_data: Function to call with pack data
- progress: Optional progress reporting function
- rbufsize: Read buffer size
- protocol_version: Neogiated Git protocol version.
- """
- pkt = proto.read_pkt_line()
- while pkt:
- parts = pkt.rstrip(b"\n").split(b" ")
- if protocol_version == 2 and parts[0] != b"packfile":
- break
- else:
- if parts[0] == b"ACK":
- graph_walker.ack(ObjectID(parts[1]))
- if parts[0] == b"NAK":
- graph_walker.nak()
- if len(parts) < 3 or parts[2] not in (
- b"ready",
- b"continue",
- b"common",
- ):
- break
- pkt = proto.read_pkt_line()
- if CAPABILITY_SIDE_BAND_64K in capabilities or protocol_version == 2:
- if progress is None:
- # Just ignore progress data
- def progress(x: bytes) -> None:
- pass
- for chan, data in _read_side_band64k_data(proto.read_pkt_seq()):
- if chan == SIDE_BAND_CHANNEL_DATA:
- pack_data(data)
- elif chan == SIDE_BAND_CHANNEL_PROGRESS:
- progress(data)
- else:
- raise AssertionError(f"Invalid sideband channel {chan}")
- else:
- while True:
- data = proto.read(rbufsize)
- if data == b"":
- break
- pack_data(data)
- def _extract_symrefs_and_agent(
- capabilities: Iterable[bytes],
- ) -> tuple[dict[Ref, Ref], bytes | None]:
- """Extract symrefs and agent from capabilities.
- Args:
- capabilities: List of capabilities
- Returns:
- (symrefs, agent) tuple
- """
- symrefs: dict[Ref, Ref] = {}
- agent = None
- for capability in capabilities:
- k, v = parse_capability(capability)
- if k == CAPABILITY_SYMREF:
- assert v is not None
- (src, dst) = v.split(b":", 1)
- symrefs[Ref(src)] = Ref(dst)
- if k == CAPABILITY_AGENT:
- agent = v
- return (symrefs, agent)
- # TODO(durin42): this doesn't correctly degrade if the server doesn't
- # support some capabilities. This should work properly with servers
- # that don't support multi_ack.
- class GitClient:
- """Git smart server client."""
- def __init__(
- self,
- thin_packs: bool = True,
- report_activity: Callable[[int, str], None] | None = None,
- quiet: bool = False,
- include_tags: bool = False,
- ) -> None:
- """Create a new GitClient instance.
- Args:
- thin_packs: Whether or not thin packs should be retrieved
- report_activity: Optional callback for reporting transport
- activity.
- quiet: Whether to suppress output
- include_tags: send annotated tags when sending the objects they point
- to
- """
- self._report_activity = report_activity
- self._report_status_parser: ReportStatusParser | None = None
- self._fetch_capabilities = set(UPLOAD_CAPABILITIES)
- self._fetch_capabilities.add(capability_agent())
- self._send_capabilities = set(RECEIVE_CAPABILITIES)
- self._send_capabilities.add(capability_agent())
- if quiet:
- self._send_capabilities.add(CAPABILITY_QUIET)
- if not thin_packs:
- self._fetch_capabilities.remove(CAPABILITY_THIN_PACK)
- if include_tags:
- self._fetch_capabilities.add(CAPABILITY_INCLUDE_TAG)
- self.protocol_version = 0 # will be overridden later
- def get_url(self, path: str) -> str:
- """Retrieves full url to given path.
- Args:
- path: Repository path (as string)
- Returns:
- Url to path (as string)
- """
- raise NotImplementedError(self.get_url)
- @classmethod
- def from_parsedurl(
- cls,
- parsedurl: ParseResult,
- thin_packs: bool = True,
- report_activity: Callable[[int, str], None] | None = None,
- quiet: bool = False,
- include_tags: bool = False,
- dumb: bool = False,
- username: str | None = None,
- password: str | None = None,
- config: Config | None = None,
- ) -> "GitClient":
- """Create an instance of this client from a urlparse.parsed object.
- Args:
- parsedurl: Result of urlparse()
- thin_packs: Whether or not thin packs should be retrieved
- report_activity: Optional callback for reporting transport activity
- quiet: Whether to suppress progress output
- include_tags: Whether to include tags
- dumb: Whether to use dumb HTTP transport (only for HTTP)
- username: Optional username for authentication (only for HTTP)
- password: Optional password for authentication (only for HTTP)
- config: Optional configuration object
- Returns:
- A `GitClient` object
- """
- raise NotImplementedError(cls.from_parsedurl)
- def send_pack(
- self,
- path: bytes,
- update_refs: Callable[[dict[Ref, ObjectID]], dict[Ref, ObjectID]],
- generate_pack_data: "GeneratePackDataFunc",
- progress: Callable[[bytes], None] | None = None,
- ) -> SendPackResult:
- """Upload a pack to a remote repository.
- Args:
- path: Repository path (as bytestring)
- update_refs: Function to determine changes to remote refs. Receive
- dict with existing remote refs, returns dict with
- changed refs (name -> sha, where sha=ZERO_SHA for deletions)
- generate_pack_data: Function that can return a tuple
- with number of objects and list of pack data to include
- progress: Optional progress function
- Returns:
- SendPackResult object
- Raises:
- SendPackError: if server rejects the pack data
- """
- raise NotImplementedError(self.send_pack)
- def clone(
- self,
- path: str,
- target_path: str,
- mkdir: bool = True,
- bare: bool = False,
- origin: str | None = "origin",
- checkout: bool | None = None,
- branch: str | None = None,
- progress: Callable[[bytes], None] | None = None,
- depth: int | None = None,
- ref_prefix: Sequence[bytes] | None = None,
- filter_spec: bytes | None = None,
- protocol_version: int | None = None,
- ) -> Repo:
- """Clone a repository."""
- if mkdir:
- os.mkdir(target_path)
- try:
- # For network clones, create repository with default SHA-1 format initially.
- # If remote uses a different format, fetch() will auto-change the repo's format
- # (since repo is empty at this point).
- # Subclasses (e.g., LocalGitClient) override to detect format first for efficiency.
- target = None
- if not bare:
- target = Repo.init(target_path)
- if checkout is None:
- checkout = True
- else:
- if checkout:
- raise ValueError("checkout and bare are incompatible")
- target = Repo.init_bare(target_path)
- # TODO(jelmer): abstract method for get_location?
- if isinstance(self, (LocalGitClient, SubprocessGitClient)):
- encoded_path = path.encode("utf-8")
- else:
- encoded_path = self.get_url(path).encode("utf-8")
- assert target is not None
- if origin is not None:
- target_config = target.get_config()
- target_config.set(
- (b"remote", origin.encode("utf-8")), b"url", encoded_path
- )
- target_config.set(
- (b"remote", origin.encode("utf-8")),
- b"fetch",
- b"+refs/heads/*:refs/remotes/" + origin.encode("utf-8") + b"/*",
- )
- target_config.write_to_path()
- ref_message = b"clone: from " + encoded_path
- result = self.fetch(
- path.encode("utf-8"),
- target,
- progress=progress,
- depth=depth,
- ref_prefix=ref_prefix,
- filter_spec=filter_spec,
- protocol_version=protocol_version,
- )
- if origin is not None:
- _import_remote_refs(
- target.refs, origin, result.refs, message=ref_message
- )
- origin_head = result.symrefs.get(HEADREF)
- origin_sha = result.refs.get(HEADREF)
- if origin is None or (origin_sha and not origin_head):
- # set detached HEAD
- if origin_sha is not None:
- target.refs[HEADREF] = origin_sha
- head = origin_sha
- else:
- head = None
- else:
- from dulwich import porcelain
- _set_origin_head(target.refs, origin.encode("utf-8"), origin_head)
- default_branch_name = porcelain.var(
- target, variable="GIT_DEFAULT_BRANCH"
- ).encode("utf-8")
- default_branch: bytes | None = default_branch_name
- default_ref = Ref(b"refs/remotes/origin/" + default_branch_name)
- if default_ref not in target.refs:
- default_branch = None
- head_ref = _set_default_branch(
- target.refs,
- origin.encode("utf-8"),
- origin_head,
- (branch.encode("utf-8") if branch is not None else default_branch),
- ref_message,
- )
- # Update target head
- if head_ref:
- head = _set_head(target.refs, head_ref, ref_message)
- else:
- head = None
- if checkout and head is not None:
- target.get_worktree().reset_index()
- except BaseException:
- if target is not None:
- target.close()
- if mkdir:
- import shutil
- shutil.rmtree(target_path)
- raise
- return target
- def fetch(
- self,
- path: bytes | str,
- target: BaseRepo,
- determine_wants: "DetermineWantsFunc | None" = None,
- progress: Callable[[bytes], None] | None = None,
- depth: int | None = None,
- ref_prefix: Sequence[bytes] | None = None,
- filter_spec: bytes | None = None,
- protocol_version: int | None = None,
- shallow_since: str | None = None,
- shallow_exclude: list[str] | None = None,
- ) -> FetchPackResult:
- """Fetch into a target repository.
- Args:
- path: Path to fetch from (as bytestring)
- target: Target repository to fetch into
- determine_wants: Optional function to determine what refs to fetch.
- Receives dictionary of name->sha, should return
- list of shas to fetch. Defaults to all shas.
- progress: Optional progress function
- depth: Depth to fetch at
- ref_prefix: List of prefixes of desired references, as a list of
- bytestrings. Filtering is done by the server if supported, and
- client side otherwise.
- filter_spec: A git-rev-list-style object filter spec, as bytestring.
- Only used if the server supports the Git protocol-v2 'filter'
- feature, and ignored otherwise.
- protocol_version: Desired Git protocol version. By default the highest
- mutually supported protocol version will be used.
- shallow_since: Deepen the history to include commits after this date
- shallow_exclude: Deepen the history to exclude commits reachable from these refs
- Returns:
- Dictionary with all remote refs (not just those fetched)
- """
- if determine_wants is None:
- determine_wants = target.object_store.determine_wants_all
- if CAPABILITY_THIN_PACK in self._fetch_capabilities:
- from tempfile import SpooledTemporaryFile
- f: IO[bytes] = SpooledTemporaryFile(
- max_size=PACK_SPOOL_FILE_MAX_SIZE,
- prefix="incoming-",
- dir=getattr(target.object_store, "path", None),
- )
- def commit() -> None:
- if f.tell():
- f.seek(0)
- target.object_store.add_thin_pack(f.read, None, progress=progress) # type: ignore
- f.close()
- def abort() -> None:
- f.close()
- else:
- f, commit, abort = target.object_store.add_pack()
- try:
- result = self.fetch_pack(
- path,
- determine_wants,
- target.get_graph_walker(),
- f.write,
- progress=progress,
- depth=depth,
- ref_prefix=ref_prefix,
- filter_spec=filter_spec,
- protocol_version=protocol_version,
- shallow_since=shallow_since,
- shallow_exclude=shallow_exclude,
- )
- # Fix object format if needed
- if (
- result.object_format
- and result.object_format != target.object_format.name
- ):
- # Change the target repo's format if it's empty
- target._change_object_format(result.object_format)
- except BaseException:
- abort()
- raise
- else:
- commit()
- target.update_shallow(result.new_shallow, result.new_unshallow)
- return result
- def fetch_pack(
- self,
- path: bytes | str,
- determine_wants: "DetermineWantsFunc",
- graph_walker: GraphWalker,
- pack_data: Callable[[bytes], int],
- *,
- progress: Callable[[bytes], None] | None = None,
- depth: int | None = None,
- ref_prefix: Sequence[bytes] | None = None,
- filter_spec: bytes | None = None,
- protocol_version: int | None = None,
- shallow_since: str | None = None,
- shallow_exclude: list[str] | None = None,
- ) -> FetchPackResult:
- """Retrieve a pack from a git smart server.
- Args:
- path: Remote path to fetch from
- determine_wants: Function determine what refs
- to fetch. Receives dictionary of name->sha, should return
- list of shas to fetch.
- graph_walker: Object with next() and ack().
- pack_data: Callback called for each bit of data in the pack
- progress: Callback for progress reports (strings)
- depth: Shallow fetch depth
- ref_prefix: List of prefixes of desired references, as a list of
- bytestrings. Filtering is done by the server if supported, and
- client side otherwise.
- filter_spec: A git-rev-list-style object filter spec, as bytestring.
- Only used if the server supports the Git protocol-v2 'filter'
- feature, and ignored otherwise.
- protocol_version: Desired Git protocol version. By default the highest
- mutually supported protocol version will be used.
- shallow_since: Deepen the history to include commits after this date
- shallow_exclude: Deepen the history to exclude commits reachable from these refs
- Returns:
- FetchPackResult object
- """
- raise NotImplementedError(self.fetch_pack)
- def get_refs(
- self,
- path: bytes,
- protocol_version: int | None = None,
- ref_prefix: Sequence[bytes] | None = None,
- ) -> LsRemoteResult:
- """Retrieve the current refs from a git smart server.
- Args:
- path: Path to the repo to fetch from. (as bytestring)
- protocol_version: Desired Git protocol version.
- ref_prefix: Prefix filter for refs.
- Returns:
- LsRemoteResult object with refs and symrefs
- """
- raise NotImplementedError(self.get_refs)
- @staticmethod
- def _should_send_pack(new_refs: Mapping[Ref, ObjectID]) -> bool:
- # The packfile MUST NOT be sent if the only command used is delete.
- return any(sha != ZERO_SHA for sha in new_refs.values())
- def _negotiate_receive_pack_capabilities(
- self, server_capabilities: set[bytes]
- ) -> tuple[set[bytes], bytes | None]:
- negotiated_capabilities = self._send_capabilities & server_capabilities
- (_symrefs, agent) = _extract_symrefs_and_agent(server_capabilities)
- (extract_capability_names(server_capabilities) - KNOWN_RECEIVE_CAPABILITIES)
- # TODO(jelmer): warn about unknown capabilities
- return (negotiated_capabilities, agent)
- def _handle_receive_pack_tail(
- self,
- proto: Protocol,
- capabilities: Set[bytes],
- progress: Callable[[bytes], None] | None = None,
- ) -> dict[bytes, str | None] | None:
- """Handle the tail of a 'git-receive-pack' request.
- Args:
- proto: Protocol object to read from
- capabilities: List of negotiated capabilities
- progress: Optional progress reporting function
- Returns:
- dict mapping ref name to:
- error message if the ref failed to update
- None if it was updated successfully
- """
- if CAPABILITY_SIDE_BAND_64K in capabilities or self.protocol_version == 2:
- if progress is None:
- def progress(x: bytes) -> None:
- pass
- if CAPABILITY_REPORT_STATUS in capabilities:
- assert self._report_status_parser is not None
- pktline_parser = PktLineParser(self._report_status_parser.handle_packet)
- for chan, data in _read_side_band64k_data(proto.read_pkt_seq()):
- if chan == SIDE_BAND_CHANNEL_DATA:
- if CAPABILITY_REPORT_STATUS in capabilities:
- pktline_parser.parse(data)
- elif chan == SIDE_BAND_CHANNEL_PROGRESS:
- progress(data)
- else:
- raise AssertionError(f"Invalid sideband channel {chan}")
- else:
- if CAPABILITY_REPORT_STATUS in capabilities:
- assert self._report_status_parser
- for pkt in proto.read_pkt_seq():
- self._report_status_parser.handle_packet(pkt)
- if self._report_status_parser is not None:
- return dict(self._report_status_parser.check())
- return None
- def _negotiate_upload_pack_capabilities(
- self, server_capabilities: set[bytes]
- ) -> tuple[set[bytes], dict[Ref, Ref], bytes | None]:
- (extract_capability_names(server_capabilities) - KNOWN_UPLOAD_CAPABILITIES)
- # TODO(jelmer): warn about unknown capabilities
- fetch_capa = None
- for capability in server_capabilities:
- k, v = parse_capability(capability)
- if self.protocol_version == 2 and k == CAPABILITY_FETCH:
- fetch_capa = CAPABILITY_FETCH
- fetch_features = []
- assert v is not None
- v_list = v.strip().split(b" ")
- if b"shallow" in v_list:
- fetch_features.append(CAPABILITY_SHALLOW)
- if b"filter" in v_list:
- fetch_features.append(CAPABILITY_FILTER)
- for i in range(len(fetch_features)):
- if i == 0:
- fetch_capa += b"="
- else:
- fetch_capa += b" "
- fetch_capa += fetch_features[i]
- (symrefs, agent) = _extract_symrefs_and_agent(server_capabilities)
- negotiated_capabilities = self._fetch_capabilities & server_capabilities
- if fetch_capa:
- negotiated_capabilities.add(fetch_capa)
- return (negotiated_capabilities, symrefs, agent)
- def archive(
- self,
- path: bytes,
- committish: bytes,
- write_data: Callable[[bytes], None],
- progress: Callable[[bytes], None] | None = None,
- write_error: Callable[[bytes], None] | None = None,
- format: bytes | None = None,
- subdirs: Sequence[bytes] | None = None,
- prefix: bytes | None = None,
- ) -> None:
- """Retrieve an archive of the specified tree."""
- raise NotImplementedError(self.archive)
- @staticmethod
- def _warn_filter_objects() -> None:
- import warnings
- warnings.warn(
- "object filtering not recognized by server, ignoring",
- UserWarning,
- )
- def check_wants(wants: Set[bytes], refs: Mapping[bytes, bytes]) -> None:
- """Check that a set of wants is valid.
- Args:
- wants: Set of object SHAs to fetch
- refs: Refs dictionary to check against
- """
- missing = set(wants) - {
- v for (k, v) in refs.items() if not k.endswith(PEELED_TAG_SUFFIX)
- }
- if missing:
- raise InvalidWants(missing)
- def _remote_error_from_stderr(stderr: IO[bytes] | None) -> Exception:
- if stderr is None:
- return HangupException()
- lines = [line.rstrip(b"\n") for line in stderr.readlines()]
- for line in lines:
- if line.startswith(b"ERROR: "):
- return GitProtocolError(line[len(b"ERROR: ") :].decode("utf-8", "replace"))
- return HangupException(lines)
- class TraditionalGitClient(GitClient):
- """Traditional Git client."""
- DEFAULT_ENCODING = "utf-8"
- def __init__(
- self,
- path_encoding: str = DEFAULT_ENCODING,
- thin_packs: bool = True,
- report_activity: Callable[[int, str], None] | None = None,
- quiet: bool = False,
- include_tags: bool = False,
- ) -> None:
- """Initialize a TraditionalGitClient.
- Args:
- path_encoding: Encoding for paths (default: utf-8)
- thin_packs: Whether or not thin packs should be retrieved
- report_activity: Optional callback for reporting transport activity
- quiet: Whether to suppress progress output
- include_tags: Whether to include tags
- """
- self._remote_path_encoding = path_encoding
- super().__init__(
- thin_packs=thin_packs,
- report_activity=report_activity,
- quiet=quiet,
- include_tags=include_tags,
- )
- def _connect(
- self,
- cmd: bytes,
- path: str | bytes,
- protocol_version: int | None = None,
- ) -> tuple[Protocol, Callable[[], bool], IO[bytes] | None]:
- """Create a connection to the server.
- This method is abstract - concrete implementations should
- implement their own variant which connects to the server and
- returns an initialized Protocol object with the service ready
- for use and a can_read function which may be used to see if
- reads would block.
- Args:
- cmd: The git service name to which we should connect.
- path: The path we should pass to the service. (as bytestirng)
- protocol_version: Desired Git protocol version. By default the highest
- mutually supported protocol version will be used.
- """
- raise NotImplementedError
- def send_pack(
- self,
- path: bytes,
- update_refs: Callable[[dict[Ref, ObjectID]], dict[Ref, ObjectID]],
- generate_pack_data: "GeneratePackDataFunc",
- progress: Callable[[bytes], None] | None = None,
- ) -> SendPackResult:
- """Upload a pack to a remote repository.
- Args:
- path: Repository path (as bytestring)
- update_refs: Function to determine changes to remote refs.
- Receive dict with existing remote refs, returns dict with
- changed refs (name -> sha, where sha=ZERO_SHA for deletions)
- generate_pack_data: Function that can return a tuple with
- number of objects and pack data to upload.
- progress: Optional callback called with progress updates
- Returns:
- SendPackResult
- Raises:
- SendPackError: if server rejects the pack data
- """
- self.protocol_version = DEFAULT_GIT_PROTOCOL_VERSION_SEND
- proto, _unused_can_read, stderr = self._connect(b"receive-pack", path)
- with proto:
- try:
- old_refs, server_capabilities = read_pkt_refs_v1(proto.read_pkt_seq())
- except HangupException as exc:
- raise _remote_error_from_stderr(stderr) from exc
- (
- negotiated_capabilities,
- agent,
- ) = self._negotiate_receive_pack_capabilities(server_capabilities)
- if CAPABILITY_REPORT_STATUS in negotiated_capabilities:
- self._report_status_parser = ReportStatusParser()
- report_status_parser = self._report_status_parser
- try:
- new_refs = orig_new_refs = update_refs(old_refs)
- except BaseException:
- proto.write_pkt_line(None)
- raise
- if set(new_refs.items()).issubset(set(old_refs.items())):
- proto.write_pkt_line(None)
- # Convert new_refs to match SendPackResult expected type
- return SendPackResult(
- _to_optional_dict(new_refs), agent=agent, ref_status={}
- )
- if CAPABILITY_DELETE_REFS not in server_capabilities:
- # Server does not support deletions. Fail later.
- new_refs = dict(orig_new_refs)
- for ref, sha in orig_new_refs.items():
- if sha == ZERO_SHA:
- if CAPABILITY_REPORT_STATUS in negotiated_capabilities:
- assert report_status_parser is not None
- report_status_parser._ref_statuses.append(
- b"ng " + ref + b" remote does not support deleting refs"
- )
- del new_refs[ref]
- if new_refs is None:
- proto.write_pkt_line(None)
- return SendPackResult(old_refs, agent=agent, ref_status={})
- if len(new_refs) == 0 and orig_new_refs:
- # NOOP - Original new refs filtered out by policy
- proto.write_pkt_line(None)
- if report_status_parser is not None:
- ref_status = dict(report_status_parser.check())
- else:
- ref_status = None
- # Convert to Optional type for SendPackResult
- return SendPackResult(
- _to_optional_dict(old_refs), agent=agent, ref_status=ref_status
- )
- header_handler = _v1ReceivePackHeader(
- list(negotiated_capabilities),
- old_refs,
- new_refs,
- )
- for pkt in header_handler:
- proto.write_pkt_line(pkt)
- pack_data_count, pack_data = generate_pack_data(
- header_handler.have,
- header_handler.want,
- ofs_delta=(CAPABILITY_OFS_DELTA in negotiated_capabilities),
- progress=progress,
- )
- if self._should_send_pack(new_refs):
- for chunk in PackChunkGenerator(
- num_records=pack_data_count,
- records=pack_data,
- object_format=DEFAULT_OBJECT_FORMAT,
- ):
- proto.write(chunk)
- ref_status = self._handle_receive_pack_tail(
- proto, negotiated_capabilities, progress
- )
- return SendPackResult(
- _to_optional_dict(new_refs), agent=agent, ref_status=ref_status
- )
- def fetch_pack(
- self,
- path: bytes | str,
- determine_wants: "DetermineWantsFunc",
- graph_walker: GraphWalker,
- pack_data: Callable[[bytes], int],
- progress: Callable[[bytes], None] | None = None,
- depth: int | None = None,
- ref_prefix: Sequence[bytes] | None = None,
- filter_spec: bytes | None = None,
- protocol_version: int | None = None,
- shallow_since: str | None = None,
- shallow_exclude: list[str] | None = None,
- ) -> FetchPackResult:
- """Retrieve a pack from a git smart server.
- Args:
- path: Remote path to fetch from
- determine_wants: Function determine what refs
- to fetch. Receives dictionary of name->sha, should return
- list of shas to fetch.
- graph_walker: Object with next() and ack().
- pack_data: Callback called for each bit of data in the pack
- progress: Callback for progress reports (strings)
- depth: Shallow fetch depth
- ref_prefix: List of prefixes of desired references, as a list of
- bytestrings. Filtering is done by the server if supported, and
- client side otherwise.
- filter_spec: A git-rev-list-style object filter spec, as bytestring.
- Only used if the server supports the Git protocol-v2 'filter'
- feature, and ignored otherwise.
- protocol_version: Desired Git protocol version. By default the highest
- mutually supported protocol version will be used.
- shallow_since: Deepen the history to include commits after this date
- shallow_exclude: Deepen the history to exclude commits reachable from these refs
- Returns:
- FetchPackResult object
- """
- if (
- protocol_version is not None
- and protocol_version not in GIT_PROTOCOL_VERSIONS
- ):
- raise ValueError(f"unknown Git protocol version {protocol_version}")
- proto, can_read, stderr = self._connect(b"upload-pack", path, protocol_version)
- server_protocol_version = negotiate_protocol_version(proto)
- if server_protocol_version not in GIT_PROTOCOL_VERSIONS:
- raise ValueError(
- f"unknown Git protocol version {server_protocol_version} used by server"
- )
- if protocol_version and server_protocol_version > protocol_version:
- raise ValueError(
- f"bad Git protocol version {server_protocol_version} used by server"
- )
- self.protocol_version = server_protocol_version
- with proto:
- # refs may have None values in v2 but not in v1
- refs: dict[Ref, ObjectID | None]
- symrefs: dict[Ref, Ref]
- agent: bytes | None
- object_format: str | None
- if self.protocol_version == 2:
- try:
- server_capabilities = read_server_capabilities(proto.read_pkt_seq())
- except HangupException as exc:
- raise _remote_error_from_stderr(stderr) from exc
- (
- negotiated_capabilities,
- symrefs,
- agent,
- ) = self._negotiate_upload_pack_capabilities(server_capabilities)
- object_format = extract_object_format_from_capabilities(
- server_capabilities
- )
- proto.write_pkt_line(b"command=ls-refs\n")
- proto.write(b"0001") # delim-pkt
- proto.write_pkt_line(b"symrefs")
- proto.write_pkt_line(b"peel")
- if ref_prefix is None:
- ref_prefix = DEFAULT_REF_PREFIX
- for prefix in ref_prefix:
- proto.write_pkt_line(b"ref-prefix " + prefix)
- proto.write_pkt_line(None)
- refs, symrefs, _peeled = read_pkt_refs_v2(proto.read_pkt_seq())
- else:
- try:
- refs_v1, server_capabilities = read_pkt_refs_v1(
- proto.read_pkt_seq()
- )
- # v1 refs never have None values, but we need Optional type for compatibility
- refs = _to_optional_dict(refs_v1)
- except HangupException as exc:
- raise _remote_error_from_stderr(stderr) from exc
- (
- negotiated_capabilities,
- symrefs,
- agent,
- ) = self._negotiate_upload_pack_capabilities(server_capabilities)
- object_format = extract_object_format_from_capabilities(
- server_capabilities
- )
- if ref_prefix is not None:
- refs = filter_ref_prefix(refs, ref_prefix)
- if refs is None:
- proto.write_pkt_line(None)
- return FetchPackResult(
- refs, symrefs, agent, object_format=object_format
- )
- try:
- # Filter out None values (shouldn't be any in v1 protocol)
- refs_no_none = {k: v for k, v in refs.items() if v is not None}
- # Handle both old and new style determine_wants
- try:
- wants = determine_wants(refs_no_none, depth)
- except TypeError:
- # Old-style determine_wants that doesn't accept depth
- wants = determine_wants(refs_no_none)
- except BaseException:
- proto.write_pkt_line(None)
- raise
- if wants is not None:
- wants = [cid for cid in wants if cid != ZERO_SHA]
- if not wants:
- proto.write_pkt_line(None)
- return FetchPackResult(
- refs, symrefs, agent, object_format=object_format
- )
- if self.protocol_version == 2:
- proto.write_pkt_line(b"command=fetch\n")
- proto.write(b"0001") # delim-pkt
- if CAPABILITY_THIN_PACK in self._fetch_capabilities:
- proto.write(pkt_line(b"thin-pack\n"))
- if (
- find_capability(
- list(negotiated_capabilities),
- CAPABILITY_FETCH,
- CAPABILITY_FILTER,
- )
- and filter_spec
- ):
- proto.write(pkt_line(b"filter %s\n" % filter_spec))
- elif filter_spec:
- self._warn_filter_objects()
- elif filter_spec:
- self._warn_filter_objects()
- (new_shallow, new_unshallow) = _handle_upload_pack_head(
- proto,
- list(negotiated_capabilities),
- graph_walker,
- wants,
- can_read,
- depth=depth,
- protocol_version=self.protocol_version,
- shallow_since=shallow_since,
- shallow_exclude=shallow_exclude,
- )
- _handle_upload_pack_tail(
- proto,
- negotiated_capabilities,
- graph_walker,
- pack_data,
- progress,
- protocol_version=self.protocol_version,
- )
- return FetchPackResult(
- refs, symrefs, agent, new_shallow, new_unshallow, object_format
- )
- def get_refs(
- self,
- path: bytes,
- protocol_version: int | None = None,
- ref_prefix: Sequence[bytes] | None = None,
- ) -> LsRemoteResult:
- """Retrieve the current refs from a git smart server."""
- # stock `git ls-remote` uses upload-pack
- if (
- protocol_version is not None
- and protocol_version not in GIT_PROTOCOL_VERSIONS
- ):
- raise ValueError(f"unknown Git protocol version {protocol_version}")
- proto, _, stderr = self._connect(b"upload-pack", path, protocol_version)
- server_protocol_version = negotiate_protocol_version(proto)
- if server_protocol_version not in GIT_PROTOCOL_VERSIONS:
- raise ValueError(
- f"unknown Git protocol version {server_protocol_version} used by server"
- )
- if protocol_version and server_protocol_version > protocol_version:
- raise ValueError(
- f"bad Git protocol version {server_protocol_version} used by server"
- )
- self.protocol_version = server_protocol_version
- if self.protocol_version == 2:
- server_capabilities = read_server_capabilities(proto.read_pkt_seq())
- object_format = extract_object_format_from_capabilities(server_capabilities)
- proto.write_pkt_line(b"command=ls-refs\n")
- proto.write(b"0001") # delim-pkt
- proto.write_pkt_line(b"symrefs")
- proto.write_pkt_line(b"peel")
- if ref_prefix is None:
- ref_prefix = DEFAULT_REF_PREFIX
- for prefix in ref_prefix:
- proto.write_pkt_line(b"ref-prefix " + prefix)
- proto.write_pkt_line(None)
- with proto:
- try:
- refs, symrefs, peeled = read_pkt_refs_v2(proto.read_pkt_seq())
- except HangupException as exc:
- raise _remote_error_from_stderr(stderr) from exc
- proto.write_pkt_line(None)
- for refname, refvalue in peeled.items():
- refs[Ref(refname + PEELED_TAG_SUFFIX)] = refvalue
- return LsRemoteResult(refs, symrefs, object_format=object_format)
- else:
- with proto:
- try:
- refs_v1, server_capabilities = read_pkt_refs_v1(
- proto.read_pkt_seq()
- )
- # v1 refs never have None values, but we need Optional type for compatibility
- refs = _to_optional_dict(refs_v1)
- except HangupException as exc:
- raise _remote_error_from_stderr(stderr) from exc
- proto.write_pkt_line(None)
- object_format = extract_object_format_from_capabilities(
- server_capabilities
- )
- (symrefs, _agent) = _extract_symrefs_and_agent(server_capabilities)
- if ref_prefix is not None:
- refs = filter_ref_prefix(refs, ref_prefix)
- return LsRemoteResult(refs, symrefs, object_format=object_format)
- def archive(
- self,
- path: bytes,
- committish: bytes,
- write_data: Callable[[bytes], None],
- progress: Callable[[bytes], None] | None = None,
- write_error: Callable[[bytes], None] | None = None,
- format: bytes | None = None,
- subdirs: Sequence[bytes] | None = None,
- prefix: bytes | None = None,
- ) -> None:
- """Request an archive of a specific commit.
- Args:
- path: Repository path
- committish: Commit ID or ref to archive
- write_data: Function to write archive data
- progress: Optional progress callback
- write_error: Optional error callback
- format: Optional archive format
- subdirs: Optional subdirectories to include
- prefix: Optional prefix for archived files
- """
- proto, _can_read, stderr = self._connect(b"upload-archive", path)
- with proto:
- if format is not None:
- proto.write_pkt_line(b"argument --format=" + format)
- proto.write_pkt_line(b"argument " + committish)
- if subdirs is not None:
- for subdir in subdirs:
- proto.write_pkt_line(b"argument " + subdir)
- if prefix is not None:
- proto.write_pkt_line(b"argument --prefix=" + prefix)
- proto.write_pkt_line(None)
- try:
- pkt = proto.read_pkt_line()
- except HangupException as exc:
- raise _remote_error_from_stderr(stderr) from exc
- if pkt == b"NACK\n" or pkt == b"NACK":
- return
- elif pkt == b"ACK\n" or pkt == b"ACK":
- pass
- elif pkt and pkt.startswith(b"ERR "):
- raise GitProtocolError(pkt[4:].rstrip(b"\n").decode("utf-8", "replace"))
- else:
- raise AssertionError(f"invalid response {pkt!r}")
- ret = proto.read_pkt_line()
- if ret is not None:
- raise AssertionError("expected pkt tail")
- for chan, data in _read_side_band64k_data(proto.read_pkt_seq()):
- if chan == SIDE_BAND_CHANNEL_DATA:
- write_data(data)
- elif chan == SIDE_BAND_CHANNEL_PROGRESS:
- if progress is not None:
- progress(data)
- elif chan == SIDE_BAND_CHANNEL_FATAL:
- if write_error is not None:
- write_error(data)
- else:
- raise AssertionError(f"Invalid sideband channel {chan}")
- class TCPGitClient(TraditionalGitClient):
- """A Git Client that works over TCP directly (i.e. git://)."""
- def __init__(
- self,
- host: str,
- port: int | None = None,
- thin_packs: bool = True,
- report_activity: Callable[[int, str], None] | None = None,
- quiet: bool = False,
- include_tags: bool = False,
- ) -> None:
- """Initialize a TCPGitClient.
- Args:
- host: Hostname or IP address to connect to
- port: Port number (defaults to TCP_GIT_PORT)
- thin_packs: Whether or not thin packs should be retrieved
- report_activity: Optional callback for reporting transport activity
- quiet: Whether to suppress progress output
- include_tags: Whether to include tags
- """
- if port is None:
- port = TCP_GIT_PORT
- self._host = host
- self._port = port
- super().__init__(
- thin_packs=thin_packs,
- report_activity=report_activity,
- quiet=quiet,
- include_tags=include_tags,
- )
- @classmethod
- def from_parsedurl(
- cls,
- parsedurl: ParseResult,
- thin_packs: bool = True,
- report_activity: Callable[[int, str], None] | None = None,
- quiet: bool = False,
- include_tags: bool = False,
- dumb: bool = False,
- username: str | None = None,
- password: str | None = None,
- config: Config | None = None,
- ) -> "TCPGitClient":
- """Create an instance of TCPGitClient from a parsed URL.
- Args:
- parsedurl: Result of urlparse()
- thin_packs: Whether or not thin packs should be retrieved
- report_activity: Optional callback for reporting transport activity
- quiet: Whether to suppress progress output
- include_tags: Whether to include tags
- dumb: Whether to use dumb protocol (not used for TCPGitClient)
- username: Username for authentication (not used for TCPGitClient)
- password: Password for authentication (not used for TCPGitClient)
- config: Configuration object (not used for TCPGitClient)
- Returns:
- A TCPGitClient instance
- """
- assert parsedurl.hostname is not None
- return cls(
- parsedurl.hostname,
- port=parsedurl.port,
- thin_packs=thin_packs,
- report_activity=report_activity,
- quiet=quiet,
- include_tags=include_tags,
- )
- def get_url(self, path: str) -> str:
- r"""Get the URL for a TCP git connection.
- Args:
- path: Repository path
- Returns:
- ``git://`` URL for the path
- """
- # IPv6 addresses contain colons and need to be wrapped in brackets
- if ":" in self._host:
- netloc = f"[{self._host}]"
- else:
- netloc = self._host
- if self._port is not None and self._port != TCP_GIT_PORT:
- netloc += f":{self._port}"
- return urlunsplit(("git", netloc, path, "", ""))
- def _connect(
- self,
- cmd: bytes,
- path: str | bytes,
- protocol_version: int | None = None,
- ) -> tuple[Protocol, Callable[[], bool], IO[bytes] | None]:
- if not isinstance(cmd, bytes):
- raise TypeError(cmd)
- if not isinstance(path, bytes):
- path = path.encode(self._remote_path_encoding)
- sockaddrs = socket.getaddrinfo(
- self._host, self._port, socket.AF_UNSPEC, socket.SOCK_STREAM
- )
- s = None
- err = OSError(f"no address found for {self._host}")
- for family, socktype, protof, canonname, sockaddr in sockaddrs:
- s = socket.socket(family, socktype, protof)
- s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
- try:
- s.connect(sockaddr)
- break
- except OSError as e:
- err = e
- if s is not None:
- s.close()
- s = None
- if s is None:
- raise err
- # -1 means system default buffering
- rfile = s.makefile("rb", -1)
- # 0 means unbuffered
- wfile = s.makefile("wb", 0)
- def close() -> None:
- rfile.close()
- wfile.close()
- s.close()
- proto = Protocol(
- rfile.read,
- wfile.write,
- close,
- report_activity=self._report_activity,
- )
- if path.startswith(b"/~"):
- path = path[1:]
- if cmd == b"upload-pack":
- if protocol_version is None:
- self.protocol_version = DEFAULT_GIT_PROTOCOL_VERSION_FETCH
- else:
- self.protocol_version = protocol_version
- else:
- self.protocol_version = DEFAULT_GIT_PROTOCOL_VERSION_SEND
- if cmd == b"upload-pack" and self.protocol_version == 2:
- # Git protocol version advertisement is hidden behind two NUL bytes
- # for compatibility with older Git server implementations, which
- # would crash if something other than a "host=" header was found
- # after the first NUL byte.
- version_str = b"\0\0version=%d\0" % self.protocol_version
- else:
- version_str = b""
- # TODO(jelmer): Alternative to ascii?
- proto.send_cmd(
- b"git-" + cmd, path, b"host=" + self._host.encode("ascii") + version_str
- )
- return proto, lambda: _fileno_can_read(s.fileno()), None
- class SubprocessWrapper:
- """A socket-like object that talks to a subprocess via pipes."""
- def __init__(self, proc: subprocess.Popen[bytes]) -> None:
- """Initialize a SubprocessWrapper.
- Args:
- proc: Subprocess.Popen instance to wrap
- """
- self.proc = proc
- assert proc.stdout is not None
- assert proc.stdin is not None
- self.read = BufferedReader(proc.stdout).read # type: ignore[type-var]
- self.write = proc.stdin.write
- @property
- def stderr(self) -> IO[bytes] | None:
- """Return the stderr stream of the subprocess."""
- return self.proc.stderr
- def can_read(self) -> bool:
- """Check if there is data available to read.
- Returns: True if data is available, False otherwise
- """
- if sys.platform == "win32":
- from msvcrt import get_osfhandle
- assert self.proc.stdout is not None
- handle = get_osfhandle(self.proc.stdout.fileno())
- return _win32_peek_avail(handle) != 0
- else:
- assert self.proc.stdout is not None
- return _fileno_can_read(self.proc.stdout.fileno())
- def close(self, timeout: int | None = 60) -> None:
- """Close the subprocess and wait for it to terminate.
- Args:
- timeout: Maximum time to wait for subprocess to terminate (seconds)
- Raises:
- GitProtocolError: If subprocess doesn't terminate within timeout
- """
- if self.proc.stdin:
- self.proc.stdin.close()
- if self.proc.stdout:
- self.proc.stdout.close()
- if self.proc.stderr:
- self.proc.stderr.close()
- try:
- self.proc.wait(timeout=timeout)
- except subprocess.TimeoutExpired as e:
- self.proc.kill()
- self.proc.wait()
- raise GitProtocolError(
- f"Git subprocess did not terminate within {timeout} seconds; killed it."
- ) from e
- def find_git_command() -> list[str]:
- """Find command to run for system Git (usually C Git)."""
- if sys.platform == "win32": # support .exe, .bat and .cmd
- try: # to avoid overhead
- import pywintypes
- import win32api
- except ImportError: # run through cmd.exe with some overhead
- return ["cmd", "/c", "git"]
- else:
- try:
- _status, git = win32api.FindExecutable("git")
- return [git]
- except pywintypes.error:
- return ["cmd", "/c", "git"]
- else:
- return ["git"]
- class SubprocessGitClient(TraditionalGitClient):
- """Git client that talks to a server using a subprocess."""
- @classmethod
- def from_parsedurl(
- cls,
- parsedurl: ParseResult,
- thin_packs: bool = True,
- report_activity: Callable[[int, str], None] | None = None,
- quiet: bool = False,
- include_tags: bool = False,
- dumb: bool = False,
- username: str | None = None,
- password: str | None = None,
- config: Config | None = None,
- ) -> "SubprocessGitClient":
- """Create an instance of SubprocessGitClient from a parsed URL.
- Args:
- parsedurl: Result of urlparse()
- thin_packs: Whether or not thin packs should be retrieved
- report_activity: Optional callback for reporting transport activity
- quiet: Whether to suppress progress output
- include_tags: Whether to include tags
- dumb: Whether to use dumb protocol (not used for SubprocessGitClient)
- username: Username for authentication (not used for SubprocessGitClient)
- password: Password for authentication (not used for SubprocessGitClient)
- config: Configuration object (not used for SubprocessGitClient)
- Returns:
- A SubprocessGitClient instance
- """
- return cls(
- thin_packs=thin_packs,
- report_activity=report_activity,
- quiet=quiet,
- include_tags=include_tags,
- )
- git_command: str | None = None
- def _connect(
- self,
- service: bytes,
- path: bytes | str,
- protocol_version: int | None = None,
- ) -> tuple[Protocol, Callable[[], bool], IO[bytes] | None]:
- if not isinstance(service, bytes):
- raise TypeError(service)
- if isinstance(path, bytes):
- path = path.decode(self._remote_path_encoding)
- if self.git_command is None:
- git_command = find_git_command()
- argv = [*git_command, service.decode("ascii"), path]
- p = subprocess.Popen(
- argv,
- bufsize=0,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- )
- pw = SubprocessWrapper(p)
- return (
- Protocol(
- pw.read,
- pw.write,
- pw.close,
- report_activity=self._report_activity,
- ),
- pw.can_read,
- p.stderr,
- )
- class LocalGitClient(GitClient):
- """Git Client that just uses a local on-disk repository."""
- def __init__(
- self,
- thin_packs: bool = True,
- report_activity: Callable[[int, str], None] | None = None,
- config: Config | None = None,
- quiet: bool = False,
- include_tags: bool = False,
- ) -> None:
- """Create a new LocalGitClient instance.
- Args:
- thin_packs: Whether or not thin packs should be retrieved
- report_activity: Optional callback for reporting transport
- activity.
- config: Optional configuration object
- quiet: Whether to suppress progress output
- include_tags: Whether to include tags
- """
- self._report_activity = report_activity
- self._quiet = quiet
- self._include_tags = include_tags
- # Ignore the thin_packs argument
- def get_url(self, path: str) -> str:
- """Get the URL for a local file path.
- Args:
- path: Local file path
- Returns:
- file:// URL for the path
- """
- return urlunsplit(("file", "", path, "", ""))
- @classmethod
- def from_parsedurl(
- cls,
- parsedurl: ParseResult,
- thin_packs: bool = True,
- report_activity: Callable[[int, str], None] | None = None,
- quiet: bool = False,
- include_tags: bool = False,
- dumb: bool = False,
- username: str | None = None,
- password: str | None = None,
- config: Config | None = None,
- ) -> "LocalGitClient":
- """Create an instance of LocalGitClient from a parsed URL.
- Args:
- parsedurl: Result of urlparse()
- thin_packs: Whether or not thin packs should be retrieved
- report_activity: Optional callback for reporting transport activity
- quiet: Whether to suppress progress output
- include_tags: Whether to include tags
- dumb: Whether to use dumb protocol (not used for LocalGitClient)
- username: Username for authentication (not used for LocalGitClient)
- password: Password for authentication (not used for LocalGitClient)
- config: Optional configuration object
- Returns:
- A LocalGitClient instance
- """
- return cls(
- thin_packs=thin_packs,
- report_activity=report_activity,
- quiet=quiet,
- include_tags=include_tags,
- config=config,
- )
- @classmethod
- def _open_repo(cls, path: str | bytes) -> "closing[Repo]":
- """Open a local repository.
- Args:
- path: Repository path (as bytes or str)
- Returns:
- Repo instance wrapped in a closing context manager
- """
- if not isinstance(path, str):
- path = os.fsdecode(path)
- return closing(Repo(path))
- def send_pack(
- self,
- path: str | bytes,
- update_refs: Callable[[dict[Ref, ObjectID]], dict[Ref, ObjectID]],
- generate_pack_data: "GeneratePackDataFunc",
- progress: Callable[[bytes], None] | None = None,
- ) -> SendPackResult:
- """Upload a pack to a local on-disk repository.
- Args:
- path: Repository path (as bytestring)
- update_refs: Function to determine changes to remote refs.
- Receive dict with existing remote refs, returns dict with
- changed refs (name -> sha, where sha=ZERO_SHA for deletions)
- with number of items and pack data to upload.
- generate_pack_data: Function that generates pack data given
- have and want object sets
- progress: Optional progress function
- Returns:
- SendPackResult
- Raises:
- SendPackError: if server rejects the pack data
- """
- if not progress:
- def progress(x: bytes) -> None:
- pass
- with self._open_repo(path) as target:
- old_refs = target.get_refs()
- new_refs = update_refs(dict(old_refs))
- have = [sha1 for sha1 in old_refs.values() if sha1 != ZERO_SHA]
- want = []
- for refname, new_sha1 in new_refs.items():
- if (
- new_sha1 not in have
- and new_sha1 not in want
- and new_sha1 != ZERO_SHA
- ):
- want.append(new_sha1)
- if not want and set(new_refs.items()).issubset(set(old_refs.items())):
- return SendPackResult(_to_optional_dict(new_refs), ref_status={})
- target.object_store.add_pack_data(
- *generate_pack_data(
- set(have), set(want), ofs_delta=True, progress=progress
- )
- )
- ref_status: dict[bytes, str | None] = {}
- for refname, new_sha1 in new_refs.items():
- old_sha1 = old_refs.get(refname, ZERO_SHA)
- if new_sha1 != ZERO_SHA:
- if not target.refs.set_if_equals(refname, old_sha1, new_sha1):
- msg = f"unable to set {refname!r} to {new_sha1!r}"
- progress(msg.encode())
- ref_status[refname] = msg
- else:
- if not target.refs.remove_if_equals(refname, old_sha1):
- progress(f"unable to remove {refname!r}".encode())
- ref_status[refname] = "unable to remove"
- return SendPackResult(_to_optional_dict(new_refs), ref_status=ref_status)
- def fetch(
- self,
- path: bytes | str,
- target: BaseRepo,
- determine_wants: "DetermineWantsFunc | None" = None,
- progress: Callable[[bytes], None] | None = None,
- depth: int | None = None,
- ref_prefix: Sequence[bytes] | None = None,
- filter_spec: bytes | None = None,
- protocol_version: int | None = None,
- shallow_since: str | None = None,
- shallow_exclude: list[str] | None = None,
- ) -> FetchPackResult:
- """Fetch into a target repository.
- Args:
- path: Path to fetch from (as bytestring)
- target: Target repository to fetch into
- determine_wants: Optional function determine what refs
- to fetch. Receives dictionary of name->sha, should return
- list of shas to fetch. Defaults to all shas.
- progress: Optional progress function
- depth: Shallow fetch depth
- ref_prefix: List of prefixes of desired references, as a list of
- bytestrings. Filtering is done by the server if supported, and
- client side otherwise.
- filter_spec: A git-rev-list-style object filter spec, as bytestring.
- Only used if the server supports the Git protocol-v2 'filter'
- feature, and ignored otherwise.
- protocol_version: Optional Git protocol version
- shallow_since: Deepen the history to include commits after this date
- shallow_exclude: Deepen the history to exclude commits reachable from these refs
- Returns:
- FetchPackResult object
- """
- with self._open_repo(path) as r:
- refs = r.fetch(
- target,
- determine_wants=determine_wants,
- progress=progress,
- depth=depth,
- )
- return FetchPackResult(
- _to_optional_dict(refs),
- r.refs.get_symrefs(),
- agent_string(),
- object_format=r.object_format.name,
- )
- def fetch_pack(
- self,
- path: str | bytes,
- determine_wants: "DetermineWantsFunc",
- graph_walker: GraphWalker,
- pack_data: Callable[[bytes], int],
- progress: Callable[[bytes], None] | None = None,
- depth: int | None = None,
- ref_prefix: Sequence[bytes] | None = None,
- filter_spec: bytes | None = None,
- protocol_version: int | None = None,
- shallow_since: str | None = None,
- shallow_exclude: list[str] | None = None,
- ) -> FetchPackResult:
- """Retrieve a pack from a local on-disk repository.
- Args:
- path: Remote path to fetch from
- determine_wants: Function determine what refs
- to fetch. Receives dictionary of name->sha, should return
- list of shas to fetch.
- graph_walker: Object with next() and ack().
- pack_data: Callback called for each bit of data in the pack
- progress: Callback for progress reports (strings)
- depth: Shallow fetch depth
- ref_prefix: List of prefixes of desired references, as a list of
- bytestrings. Filtering is done by the server if supported, and
- client side otherwise.
- filter_spec: A git-rev-list-style object filter spec, as bytestring.
- Only used if the server supports the Git protocol-v2 'filter'
- feature, and ignored otherwise.
- protocol_version: Optional Git protocol version
- shallow_since: Deepen the history to include commits after this date
- shallow_exclude: Deepen the history to exclude commits reachable from these refs
- Returns:
- FetchPackResult object
- """
- with self._open_repo(path) as r:
- missing_objects = r.find_missing_objects(
- determine_wants, graph_walker, progress=progress, depth=depth
- )
- if missing_objects is None:
- other_haves = set()
- object_ids = []
- else:
- other_haves = missing_objects.get_remote_has()
- object_ids = list(missing_objects)
- symrefs = r.refs.get_symrefs()
- agent = agent_string()
- # Did the process short-circuit (e.g. in a stateless RPC call)?
- # Note that the client still expects a 0-object pack in most cases.
- if object_ids is None:
- return FetchPackResult(
- None, symrefs, agent, object_format=r.object_format.name
- )
- write_pack_from_container(
- pack_data, # type: ignore[arg-type]
- r.object_store,
- object_ids,
- other_haves=other_haves,
- object_format=r.object_format,
- )
- # Convert refs to Optional type for FetchPackResult
- return FetchPackResult(
- _to_optional_dict(r.get_refs()),
- symrefs,
- agent,
- object_format=r.object_format.name,
- )
- def get_refs(
- self,
- path: str | bytes,
- protocol_version: int | None = None,
- ref_prefix: Sequence[bytes] | None = None,
- ) -> LsRemoteResult:
- """Retrieve the current refs from a local on-disk repository."""
- with self._open_repo(path) as target:
- refs_dict = target.get_refs()
- refs = _to_optional_dict(refs_dict)
- # Extract symrefs from the local repository
- from dulwich.refs import Ref
- symrefs: dict[Ref, Ref] = {}
- for ref in refs:
- try:
- # Check if this ref is symbolic by reading it directly
- ref_value = target.refs.read_ref(ref)
- if ref_value and ref_value.startswith(SYMREF):
- # Extract the target from the symref
- symrefs[ref] = Ref(ref_value[len(SYMREF) :])
- except (KeyError, ValueError):
- # Not a symbolic ref or error reading it
- pass
- return LsRemoteResult(
- refs, symrefs, object_format=target.object_format.name
- )
- def clone(
- self,
- path: str,
- target_path: str,
- mkdir: bool = True,
- bare: bool = False,
- origin: str | None = "origin",
- checkout: bool | None = None,
- branch: str | None = None,
- progress: Callable[[bytes], None] | None = None,
- depth: int | None = None,
- ref_prefix: Sequence[bytes] | None = None,
- filter_spec: bytes | None = None,
- protocol_version: int | None = None,
- ) -> Repo:
- """Clone a local repository.
- For local clones, we can detect the object format before creating
- the target repository.
- """
- # Detect the object format from the source repository
- with self._open_repo(path) as source_repo:
- object_format_name = source_repo.object_format.name
- if mkdir:
- os.mkdir(target_path)
- try:
- # Create repository with the correct object format from the start
- target = None
- if not bare:
- target = Repo.init(target_path, object_format=object_format_name)
- if checkout is None:
- checkout = True
- else:
- if checkout:
- raise ValueError("checkout and bare are incompatible")
- target = Repo.init_bare(target_path, object_format=object_format_name)
- encoded_path = path.encode("utf-8")
- assert target is not None
- if origin is not None:
- target_config = target.get_config()
- target_config.set(
- (b"remote", origin.encode("utf-8")), b"url", encoded_path
- )
- target_config.set(
- (b"remote", origin.encode("utf-8")),
- b"fetch",
- b"+refs/heads/*:refs/remotes/" + origin.encode("utf-8") + b"/*",
- )
- target_config.write_to_path()
- ref_message = b"clone: from " + encoded_path
- result = self.fetch(
- path.encode("utf-8"),
- target,
- progress=progress,
- depth=depth,
- ref_prefix=ref_prefix,
- filter_spec=filter_spec,
- protocol_version=protocol_version,
- )
- if origin is not None:
- _import_remote_refs(
- target.refs, origin, result.refs, message=ref_message
- )
- origin_head = result.symrefs.get(HEADREF)
- origin_sha = result.refs.get(HEADREF)
- if origin is None or (origin_sha and not origin_head):
- # set detached HEAD
- if origin_sha is not None:
- target.refs[HEADREF] = origin_sha
- head = origin_sha
- else:
- head = None
- else:
- _set_origin_head(target.refs, origin.encode("utf-8"), origin_head)
- head_ref = _set_default_branch(
- target.refs,
- origin.encode("utf-8"),
- origin_head,
- branch.encode("utf-8") if branch is not None else None,
- ref_message,
- )
- # Update target head
- if head_ref:
- head = _set_head(target.refs, head_ref, ref_message)
- else:
- head = None
- if checkout and head is not None:
- target.get_worktree().reset_index()
- except BaseException:
- if target is not None:
- target.close()
- if mkdir:
- import shutil
- shutil.rmtree(target_path)
- raise
- return target
- class BundleClient(GitClient):
- """Git Client that reads from a bundle file."""
- def __init__(
- self,
- thin_packs: bool = True,
- report_activity: Callable[[int, str], None] | None = None,
- config: Config | None = None,
- quiet: bool = False,
- include_tags: bool = False,
- ) -> None:
- """Create a new BundleClient instance.
- Args:
- thin_packs: Whether or not thin packs should be retrieved
- report_activity: Optional callback for reporting transport
- activity.
- config: Optional configuration object
- quiet: Whether to suppress progress output
- include_tags: Whether to include tags
- """
- self._report_activity = report_activity
- self._quiet = quiet
- self._include_tags = include_tags
- def get_url(self, path: str) -> str:
- """Get the URL for a bundle file path.
- Args:
- path: Bundle file path
- Returns:
- The path unchanged (bundle files use local paths)
- """
- return path
- @classmethod
- def from_parsedurl(
- cls,
- parsedurl: ParseResult,
- thin_packs: bool = True,
- report_activity: Callable[[int, str], None] | None = None,
- quiet: bool = False,
- include_tags: bool = False,
- dumb: bool = False,
- username: str | None = None,
- password: str | None = None,
- config: Config | None = None,
- ) -> "BundleClient":
- """Create an instance of BundleClient from a parsed URL.
- Args:
- parsedurl: Result of urlparse()
- thin_packs: Whether or not thin packs should be retrieved
- report_activity: Optional callback for reporting transport activity
- quiet: Whether to suppress progress output
- include_tags: Whether to include tags
- dumb: Whether to use dumb protocol (not used for BundleClient)
- username: Username for authentication (not used for BundleClient)
- password: Password for authentication (not used for BundleClient)
- config: Configuration object (not used for BundleClient)
- Returns:
- A BundleClient instance
- """
- return cls(
- thin_packs=thin_packs,
- report_activity=report_activity,
- quiet=quiet,
- include_tags=include_tags,
- )
- @classmethod
- def _is_bundle_file(cls, path: str) -> bool:
- """Check if a file is a git bundle by reading the first line."""
- try:
- with open(path, "rb") as f:
- first_line = f.readline()
- return first_line in (b"# v2 git bundle\n", b"# v3 git bundle\n")
- except OSError:
- return False
- @classmethod
- def _open_bundle(cls, path: str | bytes) -> "Bundle":
- """Open and parse a bundle file.
- Args:
- path: Path to the bundle file (bytes or str)
- Returns:
- Bundle object with parsed metadata
- Raises:
- AssertionError: If bundle format is unsupported
- """
- if not isinstance(path, str):
- path = os.fsdecode(path)
- # Read bundle metadata without PackData to avoid file handle issues
- with open(path, "rb") as f:
- from dulwich.bundle import Bundle
- version = None
- firstline = f.readline()
- if firstline == b"# v2 git bundle\n":
- version = 2
- elif firstline == b"# v3 git bundle\n":
- version = 3
- else:
- raise AssertionError(f"unsupported bundle format header: {firstline!r}")
- capabilities: dict[str, str | None] = {}
- prerequisites: list[tuple[ObjectID, bytes]] = []
- references: dict[Ref, ObjectID] = {}
- line = f.readline()
- if version >= 3:
- while line.startswith(b"@"):
- line = line[1:].rstrip(b"\n")
- try:
- key, value_bytes = line.split(b"=", 1)
- value = value_bytes.decode("utf-8")
- except ValueError:
- key = line
- value = None
- capabilities[key.decode("utf-8")] = value
- line = f.readline()
- while line.startswith(b"-"):
- (obj_id, comment) = line[1:].rstrip(b"\n").split(b" ", 1)
- prerequisites.append((ObjectID(obj_id), comment))
- line = f.readline()
- while line != b"\n":
- (obj_id, ref) = line.rstrip(b"\n").split(b" ", 1)
- references[Ref(ref)] = ObjectID(obj_id)
- line = f.readline()
- # Don't read PackData here, we'll do it later
- bundle = Bundle()
- bundle.version = version
- bundle.capabilities = capabilities
- bundle.prerequisites = prerequisites
- bundle.references = references
- bundle.pack_data = None # Will be read on demand
- return bundle
- @staticmethod
- def _skip_to_pack_data(f: IO[bytes], version: int) -> None:
- """Skip to the pack data section in a bundle file.
- Args:
- f: File object positioned at the beginning of the bundle
- version: Bundle format version (2 or 3)
- Raises:
- AssertionError: If bundle header is invalid
- """
- # Skip header
- header = f.readline()
- if header not in (b"# v2 git bundle\n", b"# v3 git bundle\n"):
- raise AssertionError(f"Invalid bundle header: {header!r}")
- line = f.readline()
- # Skip capabilities (v3 only)
- if version >= 3:
- while line.startswith(b"@"):
- line = f.readline()
- # Skip prerequisites
- while line.startswith(b"-"):
- line = f.readline()
- # Skip references
- while line != b"\n":
- line = f.readline()
- # Now at pack data
- def send_pack(
- self,
- path: str | bytes,
- update_refs: Callable[[dict[Ref, ObjectID]], dict[Ref, ObjectID]],
- generate_pack_data: "GeneratePackDataFunc",
- progress: Callable[[bytes], None] | None = None,
- ) -> SendPackResult:
- """Upload is not supported for bundle files."""
- raise NotImplementedError("Bundle files are read-only")
- def fetch(
- self,
- path: bytes | str,
- target: BaseRepo,
- determine_wants: "DetermineWantsFunc | None" = None,
- progress: Callable[[bytes], None] | None = None,
- depth: int | None = None,
- ref_prefix: Sequence[bytes] | None = None,
- filter_spec: bytes | None = None,
- protocol_version: int | None = None,
- shallow_since: str | None = None,
- shallow_exclude: list[str] | None = None,
- ) -> FetchPackResult:
- """Fetch into a target repository from a bundle file."""
- bundle = self._open_bundle(path)
- # Get references from bundle
- refs = dict(bundle.references)
- # Determine what we want to fetch
- if determine_wants is None:
- _ = list(refs.values())
- else:
- _ = determine_wants(refs, None)
- # Add pack data to target repository
- # Need to reopen the file for pack data access
- with open(path, "rb") as pack_file:
- # Skip to pack data section
- assert bundle.version is not None
- BundleClient._skip_to_pack_data(pack_file, bundle.version)
- # Read pack data into memory to avoid file positioning issues
- pack_bytes = pack_file.read()
- # Create PackData from in-memory bytes
- from io import BytesIO
- pack_io = BytesIO(pack_bytes)
- pack_data = PackData.from_file(pack_io, object_format=DEFAULT_OBJECT_FORMAT)
- target.object_store.add_pack_data(len(pack_data), pack_data.iter_unpacked())
- # Apply ref filtering if specified
- if ref_prefix:
- filtered_refs = {}
- for ref_name, ref_value in refs.items():
- for prefix in ref_prefix:
- if ref_name.startswith(prefix):
- filtered_refs[ref_name] = ref_value
- break
- refs = filtered_refs
- return FetchPackResult(_to_optional_dict(refs), {}, agent_string())
- def fetch_pack(
- self,
- path: str | bytes,
- determine_wants: "DetermineWantsFunc",
- graph_walker: GraphWalker,
- pack_data: Callable[[bytes], int],
- progress: Callable[[bytes], None] | None = None,
- depth: int | None = None,
- ref_prefix: Sequence[bytes] | None = None,
- filter_spec: bytes | None = None,
- protocol_version: int | None = None,
- shallow_since: str | None = None,
- shallow_exclude: list[str] | None = None,
- ) -> FetchPackResult:
- """Retrieve a pack from a bundle file."""
- bundle = self._open_bundle(path)
- # Get references from bundle
- refs = dict(bundle.references)
- # Determine what we want to fetch
- try:
- _ = determine_wants(refs, depth)
- except TypeError:
- # Old-style determine_wants that doesn't accept depth
- _ = determine_wants(refs)
- # Write pack data to the callback
- # Need to reopen the file for pack data access
- with open(path, "rb") as pack_file:
- # Skip to pack data section
- assert bundle.version is not None
- BundleClient._skip_to_pack_data(pack_file, bundle.version)
- # Read pack data and write it to the callback
- pack_bytes = pack_file.read()
- pack_data(pack_bytes)
- # Apply ref filtering if specified
- if ref_prefix:
- filtered_refs = {}
- for ref_name, ref_value in refs.items():
- for prefix in ref_prefix:
- if ref_name.startswith(prefix):
- filtered_refs[ref_name] = ref_value
- break
- refs = filtered_refs
- return FetchPackResult(_to_optional_dict(refs), {}, agent_string())
- def get_refs(
- self,
- path: str | bytes,
- protocol_version: int | None = None,
- ref_prefix: Sequence[bytes] | None = None,
- ) -> LsRemoteResult:
- """Retrieve the current refs from a bundle file."""
- bundle = self._open_bundle(path)
- refs = dict(bundle.references)
- # Apply ref filtering if specified
- if ref_prefix:
- filtered_refs = {}
- for ref_name, ref_value in refs.items():
- for prefix in ref_prefix:
- if ref_name.startswith(prefix):
- filtered_refs[ref_name] = ref_value
- break
- refs = filtered_refs
- # Bundle refs are always concrete (never None), but LsRemoteResult expects Optional
- return LsRemoteResult(_to_optional_dict(refs), {})
- # What Git client to use for local access
- default_local_git_client_cls = LocalGitClient
- class SSHVendor:
- """A client side SSH implementation."""
- def run_command(
- self,
- host: str,
- command: bytes,
- username: str | None = None,
- port: int | None = None,
- password: str | None = None,
- key_filename: str | None = None,
- ssh_command: str | None = None,
- protocol_version: int | None = None,
- ) -> SubprocessWrapper:
- """Connect to an SSH server.
- Run a command remotely and return a file-like object for interaction
- with the remote command.
- Args:
- host: Host name
- command: Command to run (as argv array)
- username: Optional ame of user to log in as
- port: Optional SSH port to use
- password: Optional ssh password for login or private key
- key_filename: Optional path to private keyfile
- ssh_command: Optional SSH command
- protocol_version: Desired Git protocol version. By default the highest
- mutually supported protocol version will be used.
- """
- raise NotImplementedError(self.run_command)
- class StrangeHostname(Exception):
- """Refusing to connect to strange SSH hostname."""
- def __init__(self, hostname: str) -> None:
- """Initialize StrangeHostname exception.
- Args:
- hostname: The strange hostname that was rejected
- """
- super().__init__(hostname)
- class SubprocessSSHVendor(SSHVendor):
- """SSH vendor that shells out to the local 'ssh' command."""
- def run_command(
- self,
- host: str,
- command: bytes,
- username: str | None = None,
- port: int | None = None,
- password: str | None = None,
- key_filename: str | None = None,
- ssh_command: str | None = None,
- protocol_version: int | None = None,
- ) -> SubprocessWrapper:
- """Run a git command over SSH.
- Args:
- host: SSH host to connect to
- command: Git command to run
- username: Optional username
- port: Optional port number
- password: Optional password (not supported)
- key_filename: Optional SSH key file
- ssh_command: Optional custom SSH command
- protocol_version: Optional Git protocol version
- Returns:
- Tuple of (subprocess.Popen, Protocol, stderr_stream)
- """
- if password is not None:
- raise NotImplementedError(
- "Setting password not supported by SubprocessSSHVendor."
- )
- if ssh_command:
- import shlex
- args = [*shlex.split(ssh_command, posix=sys.platform != "win32"), "-x"]
- else:
- args = ["ssh", "-x"]
- if port:
- args.extend(["-p", str(port)])
- if key_filename:
- args.extend(["-i", str(key_filename)])
- if protocol_version is None:
- protocol_version = DEFAULT_GIT_PROTOCOL_VERSION_FETCH
- if protocol_version > 0:
- args.extend(["-o", f"SetEnv GIT_PROTOCOL=version={protocol_version}"])
- if username:
- host = f"{username}@{host}"
- if host.startswith("-"):
- raise StrangeHostname(hostname=host)
- args.append(host)
- proc = subprocess.Popen(
- [*args, command],
- bufsize=0,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- )
- return SubprocessWrapper(proc)
- class PLinkSSHVendor(SSHVendor):
- """SSH vendor that shells out to the local 'plink' command."""
- def run_command(
- self,
- host: str,
- command: bytes,
- username: str | None = None,
- port: int | None = None,
- password: str | None = None,
- key_filename: str | None = None,
- ssh_command: str | None = None,
- protocol_version: int | None = None,
- ) -> SubprocessWrapper:
- """Run a git command over SSH using PLink.
- Args:
- host: SSH host to connect to
- command: Git command to run
- username: Optional username
- port: Optional port number
- password: Optional password
- key_filename: Optional SSH key file
- ssh_command: Optional custom SSH command
- protocol_version: Optional Git protocol version
- Returns:
- Tuple of (subprocess.Popen, Protocol, stderr_stream)
- """
- if ssh_command:
- import shlex
- args = [*shlex.split(ssh_command, posix=sys.platform != "win32"), "-ssh"]
- elif sys.platform == "win32":
- args = ["plink.exe", "-ssh"]
- else:
- args = ["plink", "-ssh"]
- if password is not None:
- import warnings
- warnings.warn(
- "Invoking PLink with a password exposes the password in the "
- "process list."
- )
- args.extend(["-pw", str(password)])
- if port:
- args.extend(["-P", str(port)])
- if key_filename:
- args.extend(["-i", str(key_filename)])
- if username:
- host = f"{username}@{host}"
- if host.startswith("-"):
- raise StrangeHostname(hostname=host)
- args.append(host)
- # plink.exe does not provide a way to pass environment variables
- # via the command line. The best we can do is set an environment
- # variable and hope that plink will pass it to the server. If this
- # does not work then the server should behave as if we had requested
- # protocol version 0.
- env = copy.deepcopy(os.environ)
- if protocol_version is None:
- protocol_version = DEFAULT_GIT_PROTOCOL_VERSION_FETCH
- if protocol_version > 0:
- env["GIT_PROTOCOL"] = f"version={protocol_version}"
- proc = subprocess.Popen(
- [*args, command],
- bufsize=0,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- env=env,
- )
- return SubprocessWrapper(proc)
- # Can be overridden by users
- get_ssh_vendor: Callable[[], SSHVendor] = SubprocessSSHVendor
- class SSHGitClient(TraditionalGitClient):
- """Git client that connects over SSH."""
- def __init__(
- self,
- host: str,
- port: int | None = None,
- username: str | None = None,
- vendor: SSHVendor | None = None,
- config: Config | None = None,
- password: str | None = None,
- key_filename: str | None = None,
- ssh_command: str | None = None,
- path_encoding: str = TraditionalGitClient.DEFAULT_ENCODING,
- thin_packs: bool = True,
- report_activity: Callable[[int, str], None] | None = None,
- quiet: bool = False,
- include_tags: bool = False,
- ) -> None:
- """Initialize SSHGitClient.
- Args:
- host: SSH hostname
- port: Optional SSH port
- username: Optional username
- vendor: Optional SSH vendor
- config: Optional configuration
- password: Optional password
- key_filename: Optional SSH key file
- ssh_command: Optional custom SSH command
- path_encoding: Encoding for paths (default: utf-8)
- thin_packs: Whether or not thin packs should be retrieved
- report_activity: Optional callback for reporting transport activity
- quiet: Whether to suppress output
- include_tags: Send annotated tags when sending the objects they point to
- """
- self.host = host
- self.port = port
- self.username = username
- self.password = password
- self.key_filename = key_filename
- # Priority: ssh_command parameter, then env vars, then core.sshCommand config
- if ssh_command:
- self.ssh_command = ssh_command
- else:
- # Check environment variables first
- env_ssh_command = os.environ.get("GIT_SSH_COMMAND")
- if env_ssh_command:
- self.ssh_command = env_ssh_command
- else:
- env_ssh = os.environ.get("GIT_SSH")
- if env_ssh:
- self.ssh_command = env_ssh
- else:
- # Fall back to config if no environment variable set
- if config is not None:
- try:
- config_ssh_command = config.get((b"core",), b"sshCommand")
- self.ssh_command = (
- config_ssh_command.decode()
- if config_ssh_command
- else "ssh"
- )
- except KeyError:
- self.ssh_command = "ssh"
- else:
- self.ssh_command = "ssh"
- super().__init__(
- path_encoding=path_encoding,
- thin_packs=thin_packs,
- report_activity=report_activity,
- quiet=quiet,
- include_tags=include_tags,
- )
- self.alternative_paths: dict[bytes, bytes] = {}
- if vendor is not None:
- self.ssh_vendor = vendor
- else:
- self.ssh_vendor = get_ssh_vendor()
- def get_url(self, path: str) -> str:
- """Get the SSH URL for a path."""
- netloc = self.host
- if self.port is not None:
- netloc += f":{self.port}"
- if self.username is not None:
- netloc = urlquote(self.username, "@/:") + "@" + netloc
- return urlunsplit(("ssh", netloc, path, "", ""))
- @classmethod
- def from_parsedurl(
- cls,
- parsedurl: ParseResult,
- thin_packs: bool = True,
- report_activity: Callable[[int, str], None] | None = None,
- quiet: bool = False,
- include_tags: bool = False,
- dumb: bool = False,
- username: str | None = None,
- password: str | None = None,
- config: Config | None = None,
- path_encoding: str = TraditionalGitClient.DEFAULT_ENCODING,
- vendor: SSHVendor | None = None,
- key_filename: str | None = None,
- ssh_command: str | None = None,
- ) -> "SSHGitClient":
- """Create an SSHGitClient from a parsed URL.
- Args:
- parsedurl: Result of urlparse()
- thin_packs: Whether or not thin packs should be retrieved
- report_activity: Optional callback for reporting transport activity
- quiet: Whether to suppress progress output
- include_tags: Whether to include tags
- dumb: Whether to use dumb protocol (not used for SSHGitClient)
- username: SSH username
- password: SSH password
- config: Configuration object
- path_encoding: Encoding for paths
- vendor: SSH implementation to use
- key_filename: Optional SSH key file
- ssh_command: Optional custom SSH command
- Returns:
- An SSHGitClient instance
- """
- if parsedurl.hostname is None:
- raise ValueError("SSH URL must have a hostname")
- return cls(
- host=parsedurl.hostname,
- port=parsedurl.port,
- username=username or parsedurl.username,
- thin_packs=thin_packs,
- report_activity=report_activity,
- quiet=quiet,
- include_tags=include_tags,
- path_encoding=path_encoding,
- vendor=vendor,
- config=config,
- password=password,
- key_filename=key_filename,
- ssh_command=ssh_command,
- )
- def _get_cmd_path(self, cmd: bytes) -> bytes:
- cmd = self.alternative_paths.get(cmd, b"git-" + cmd)
- assert isinstance(cmd, bytes)
- return cmd
- def _connect(
- self,
- cmd: bytes,
- path: str | bytes,
- protocol_version: int | None = None,
- ) -> tuple[Protocol, Callable[[], bool], IO[bytes] | None]:
- if not isinstance(cmd, bytes):
- raise TypeError(cmd)
- if isinstance(path, bytes):
- path = path.decode(self._remote_path_encoding)
- if path.startswith("/~"):
- path = path[1:]
- argv = (
- self._get_cmd_path(cmd)
- + b" '"
- + path.encode(self._remote_path_encoding)
- + b"'"
- )
- kwargs = {}
- if self.password is not None:
- kwargs["password"] = self.password
- if self.key_filename is not None:
- kwargs["key_filename"] = self.key_filename
- # GIT_SSH_COMMAND takes precedence over GIT_SSH
- if self.ssh_command is not None:
- kwargs["ssh_command"] = self.ssh_command
- con = self.ssh_vendor.run_command(
- self.host,
- argv,
- port=self.port,
- username=self.username,
- protocol_version=protocol_version,
- **kwargs,
- )
- return (
- Protocol(
- con.read,
- con.write,
- con.close,
- report_activity=self._report_activity,
- ),
- con.can_read,
- getattr(con, "stderr", None),
- )
- def default_user_agent_string() -> str:
- """Return the default user agent string for Dulwich."""
- # Start user agent with "git/", because GitHub requires this. :-( See
- # https://github.com/jelmer/dulwich/issues/562 for details.
- return "git/dulwich/{}".format(".".join([str(x) for x in dulwich.__version__]))
- def _urlmatch_http_sections(
- config: Config, url: str | None
- ) -> Iterator[tuple[bytes, ...]]:
- """Yield http config sections matching the given URL, ordered by specificity.
- Yields sections from least specific to most specific, so callers can
- apply settings in order with more specific settings overriding less specific ones.
- Args:
- config: Git configuration object
- url: URL to match against config sections (if None, only yields global http section)
- Yields:
- Config section tuples that match the URL, ordered by specificity
- """
- encoding = getattr(config, "encoding", None) or sys.getdefaultencoding()
- parsed_url = urlparse(url) if url else None
- # Collect all matching sections with their specificity
- # (specificity is based on URL path length - longer = more specific)
- matching_sections: list[tuple[int, tuple[bytes, ...]]] = []
- for config_section in config.sections():
- if config_section[0] != b"http":
- continue
- if len(config_section) < 2:
- # Global http section (least specific)
- matching_sections.append((0, config_section))
- elif parsed_url is not None:
- # URL-specific http section - only match if we have a URL
- config_url = config_section[1].decode(encoding)
- parsed_config_url = urlparse(config_url)
- is_match = False
- if parsed_config_url.scheme and parsed_config_url.netloc:
- is_match = match_urls(parsed_url, parsed_config_url)
- else:
- is_match = match_partial_url(parsed_url, config_url)
- if is_match:
- # Calculate specificity based on URL path length
- specificity = len(parsed_config_url.path.rstrip("/"))
- matching_sections.append((specificity, config_section))
- # Sort by specificity (least specific first)
- matching_sections.sort(key=lambda x: x[0])
- for _, section in matching_sections:
- yield section
- def default_urllib3_manager(
- config: Config | None,
- pool_manager_cls: type | None = None,
- proxy_manager_cls: type | None = None,
- base_url: str | None = None,
- timeout: float | None = None,
- cert_reqs: str | None = None,
- ) -> "urllib3.ProxyManager | urllib3.PoolManager":
- """Return urllib3 connection pool manager.
- Honour detected proxy configurations.
- Args:
- config: `dulwich.config.ConfigDict` instance with Git configuration.
- pool_manager_cls: Pool manager class to use
- proxy_manager_cls: Proxy manager class to use
- base_url: Base URL for proxy bypass checks
- timeout: Timeout for HTTP requests in seconds
- cert_reqs: SSL certificate requirements (e.g. "CERT_REQUIRED", "CERT_NONE")
- Returns:
- Either pool_manager_cls (defaults to ``urllib3.ProxyManager``) instance for
- proxy configurations, proxy_manager_cls
- (defaults to ``urllib3.PoolManager``) instance otherwise
- """
- proxy_server: str | None = None
- user_agent: str | None = None
- ca_certs: str | None = None
- ssl_verify: bool | None = None
- if proxy_server is None:
- for proxyname in ("https_proxy", "http_proxy", "all_proxy"):
- proxy_server = os.environ.get(proxyname)
- if proxy_server:
- break
- if proxy_server:
- if check_for_proxy_bypass(base_url):
- proxy_server = None
- if config is not None:
- # Iterate through all matching http sections from least to most specific
- # More specific settings will override less specific ones
- for section in _urlmatch_http_sections(config, base_url):
- if proxy_server is None:
- try:
- proxy_server_bytes = config.get(section, b"proxy")
- except KeyError:
- pass
- else:
- if proxy_server_bytes is not None:
- proxy_server = proxy_server_bytes.decode("utf-8")
- try:
- user_agent_bytes = config.get(section, b"useragent")
- except KeyError:
- pass
- else:
- if user_agent_bytes is not None:
- user_agent = user_agent_bytes.decode("utf-8")
- try:
- ssl_verify_value = config.get_boolean(section, b"sslVerify")
- except KeyError:
- pass
- else:
- if ssl_verify_value is not None:
- ssl_verify = ssl_verify_value
- try:
- ca_certs_bytes = config.get(section, b"sslCAInfo")
- except KeyError:
- pass
- else:
- if ca_certs_bytes is not None:
- ca_certs = ca_certs_bytes.decode("utf-8")
- if timeout is None:
- try:
- timeout_bytes = config.get(section, b"timeout")
- except KeyError:
- pass
- else:
- if timeout_bytes is not None:
- timeout = float(timeout_bytes.decode("utf-8"))
- # Default ssl_verify to True if not set
- if ssl_verify is None:
- ssl_verify = True
- if user_agent is None:
- user_agent = default_user_agent_string()
- headers = {"User-agent": user_agent}
- # Check for extra headers in config with URL matching
- if config is not None:
- # Apply extra headers from least specific to most specific
- for section in _urlmatch_http_sections(config, base_url):
- try:
- extra_headers = config.get_multivar(section, b"extraHeader")
- except KeyError:
- continue
- for extra_header in extra_headers:
- if not extra_header:
- logger.warning("Ignoring empty http.extraHeader value")
- continue
- if b": " not in extra_header:
- logger.warning(
- "Ignoring invalid http.extraHeader value %r (missing ': ' separator)",
- extra_header,
- )
- continue
- # Parse the header (format: "Header-Name: value")
- header_name, header_value = extra_header.split(b": ", 1)
- try:
- headers[header_name.decode("utf-8")] = header_value.decode("utf-8")
- except UnicodeDecodeError as e:
- logger.warning(
- "Ignoring http.extraHeader with invalid UTF-8: %s", e
- )
- kwargs: dict[str, str | float | None] = {
- "ca_certs": ca_certs,
- }
- # Add timeout if specified
- if timeout is not None:
- kwargs["timeout"] = timeout
- # Handle cert_reqs - allow override from parameter
- if cert_reqs is not None:
- kwargs["cert_reqs"] = cert_reqs
- elif ssl_verify is True:
- kwargs["cert_reqs"] = "CERT_REQUIRED"
- elif ssl_verify is False:
- kwargs["cert_reqs"] = "CERT_NONE"
- else:
- # Default to SSL verification
- kwargs["cert_reqs"] = "CERT_REQUIRED"
- import urllib3
- manager: urllib3.ProxyManager | urllib3.PoolManager
- if proxy_server is not None:
- if proxy_manager_cls is None:
- proxy_manager_cls = urllib3.ProxyManager
- if not isinstance(proxy_server, str):
- proxy_server = proxy_server.decode()
- proxy_server_url = urlparse(proxy_server)
- if proxy_server_url.username is not None:
- proxy_headers = urllib3.make_headers(
- proxy_basic_auth=f"{proxy_server_url.username}:{proxy_server_url.password or ''}"
- )
- else:
- proxy_headers = {}
- manager = proxy_manager_cls(
- proxy_server, proxy_headers=proxy_headers, headers=headers, **kwargs
- )
- else:
- if pool_manager_cls is None:
- pool_manager_cls = urllib3.PoolManager
- manager = pool_manager_cls(headers=headers, **kwargs)
- return manager
- def check_for_proxy_bypass(base_url: str | None) -> bool:
- """Check if proxy should be bypassed for the given URL."""
- # Check if a proxy bypass is defined with the no_proxy environment variable
- if base_url: # only check if base_url is provided
- no_proxy_str = os.environ.get("no_proxy")
- if no_proxy_str:
- # implementation based on curl behavior: https://curl.se/libcurl/c/CURLOPT_NOPROXY.html
- # get hostname of provided parsed url
- parsed_url = urlparse(base_url)
- hostname = parsed_url.hostname
- if hostname:
- import ipaddress
- # check if hostname is an ip address
- try:
- hostname_ip = ipaddress.ip_address(hostname)
- except ValueError:
- hostname_ip = None
- no_proxy_values = no_proxy_str.split(",")
- for no_proxy_value in no_proxy_values:
- no_proxy_value = no_proxy_value.strip()
- if no_proxy_value:
- no_proxy_value = no_proxy_value.lower()
- no_proxy_value = no_proxy_value.lstrip(
- "."
- ) # ignore leading dots
- if hostname_ip:
- # check if no_proxy_value is a ip network
- try:
- no_proxy_value_network = ipaddress.ip_network(
- no_proxy_value, strict=False
- )
- except ValueError:
- no_proxy_value_network = None
- if no_proxy_value_network:
- # if hostname is a ip address and no_proxy_value is a ip network -> check if ip address is part of network
- if hostname_ip in no_proxy_value_network:
- return True
- if no_proxy_value == "*":
- # '*' is special case for always bypass proxy
- return True
- if hostname == no_proxy_value:
- return True
- no_proxy_value = (
- "." + no_proxy_value
- ) # add a dot to only match complete domains
- if hostname.endswith(no_proxy_value):
- return True
- return False
- class AbstractHttpGitClient(GitClient):
- """Abstract base class for HTTP Git Clients.
- This is agonistic of the actual HTTP implementation.
- Subclasses should provide an implementation of the
- _http_request method.
- """
- def __init__(
- self,
- base_url: str,
- dumb: bool = False,
- thin_packs: bool = True,
- report_activity: Callable[[int, str], None] | None = None,
- quiet: bool = False,
- include_tags: bool = False,
- username: str | None = None,
- password: str | None = None,
- ) -> None:
- """Initialize AbstractHttpGitClient."""
- self._base_url = base_url.rstrip("/") + "/"
- self._username = username
- self._password = password
- # Track original URL with credentials (set by from_parsedurl when credentials come from URL)
- self._url_with_auth: str | None = None
- self.dumb = dumb
- GitClient.__init__(
- self,
- thin_packs=thin_packs,
- report_activity=report_activity,
- quiet=quiet,
- include_tags=include_tags,
- )
- def _http_request(
- self,
- url: str,
- headers: dict[str, str] | None = None,
- data: bytes | Iterator[bytes] | None = None,
- raise_for_status: bool = True,
- ) -> tuple["HTTPResponse", Callable[[int], bytes]]:
- """Perform HTTP request.
- Args:
- url: Request URL.
- headers: Optional custom headers to override defaults.
- data: Request data.
- raise_for_status: Whether to raise an exception for HTTP errors.
- Returns:
- Tuple (response, read), where response is an urllib3
- response object with additional content_type and
- redirect_location properties, and read is a consumable read
- method for the response data.
- Raises:
- GitProtocolError
- """
- raise NotImplementedError(self._http_request)
- def _discover_references(
- self,
- service: bytes,
- base_url: str,
- protocol_version: int | None = None,
- ref_prefix: Sequence[bytes] | None = None,
- ) -> tuple[
- dict[Ref, ObjectID | None],
- set[bytes],
- str,
- dict[Ref, Ref],
- dict[Ref, ObjectID],
- ]:
- if (
- protocol_version is not None
- and protocol_version not in GIT_PROTOCOL_VERSIONS
- ):
- raise ValueError(f"unknown Git protocol version {protocol_version}")
- assert base_url[-1] == "/"
- tail = "info/refs"
- headers = {"Accept": "*/*"}
- if self.dumb is not True:
- tail += "?service={}".format(service.decode("ascii"))
- # Enable protocol v2 only when fetching, not when pushing.
- # Git does not yet implement push over protocol v2, and as of
- # git version 2.37.3 git-http-backend's behaviour is erratic if
- # we try: It responds with a Git-protocol-v1-style ref listing
- # which lacks the "001f# service=git-receive-pack" marker.
- if service == b"git-upload-pack":
- if protocol_version is None:
- self.protocol_version = DEFAULT_GIT_PROTOCOL_VERSION_FETCH
- else:
- self.protocol_version = protocol_version
- if self.protocol_version == 2:
- headers["Git-Protocol"] = "version=2"
- else:
- self.protocol_version = DEFAULT_GIT_PROTOCOL_VERSION_SEND
- url = urljoin(base_url, tail)
- resp, read = self._http_request(url, headers)
- if resp.redirect_location:
- # Something changed (redirect!), so let's update the base URL
- if not resp.redirect_location.endswith(tail):
- raise GitProtocolError(
- f"Redirected from URL {url} to URL {resp.redirect_location} without {tail}"
- )
- base_url = urljoin(url, resp.redirect_location[: -len(tail)])
- try:
- self.dumb = resp.content_type is None or not resp.content_type.startswith(
- "application/x-git-"
- )
- if not self.dumb:
- def begin_protocol_v2(
- proto: Protocol,
- ) -> tuple[set[bytes], Any, Callable[[int], bytes], Protocol]:
- nonlocal ref_prefix
- server_capabilities = read_server_capabilities(proto.read_pkt_seq())
- if ref_prefix is None:
- ref_prefix = DEFAULT_REF_PREFIX
- pkts = [
- b"symrefs",
- b"peel",
- ]
- for prefix in ref_prefix:
- pkts.append(b"ref-prefix " + prefix)
- body = b"".join(
- [pkt_line(b"command=ls-refs\n"), b"0001", pkt_seq(*pkts)]
- )
- resp, read = self._smart_request(
- service.decode("ascii"), base_url, body
- )
- proto = Protocol(read, lambda data: None)
- return server_capabilities, resp, read, proto
- proto = Protocol(read, lambda data: None)
- server_protocol_version = negotiate_protocol_version(proto)
- if server_protocol_version not in GIT_PROTOCOL_VERSIONS:
- raise ValueError(
- f"unknown Git protocol version {server_protocol_version} used by server"
- )
- if protocol_version and server_protocol_version > protocol_version:
- raise ValueError(
- f"bad Git protocol version {server_protocol_version} used by server"
- )
- self.protocol_version = server_protocol_version
- if self.protocol_version == 2:
- server_capabilities, resp, read, proto = begin_protocol_v2(proto)
- (refs, symrefs, peeled) = read_pkt_refs_v2(proto.read_pkt_seq())
- return refs, server_capabilities, base_url, symrefs, peeled
- else:
- try:
- [pkt] = list(proto.read_pkt_seq())
- except ValueError as exc:
- raise GitProtocolError(
- "unexpected number of packets received"
- ) from exc
- if pkt.rstrip(b"\n") != (b"# service=" + service):
- raise GitProtocolError(
- f"unexpected first line {pkt!r} from smart server"
- )
- # Github sends "version 2" after sending the service name.
- # Try to negotiate protocol version 2 again.
- server_protocol_version = negotiate_protocol_version(proto)
- if server_protocol_version not in GIT_PROTOCOL_VERSIONS:
- raise ValueError(
- f"unknown Git protocol version {server_protocol_version} used by server"
- )
- if protocol_version and server_protocol_version > protocol_version:
- raise ValueError(
- f"bad Git protocol version {server_protocol_version} used by server"
- )
- self.protocol_version = server_protocol_version
- if self.protocol_version == 2:
- server_capabilities, resp, read, proto = begin_protocol_v2(
- proto
- )
- (refs, symrefs, peeled) = read_pkt_refs_v2(proto.read_pkt_seq())
- else:
- (
- refs_v1,
- server_capabilities,
- ) = read_pkt_refs_v1(proto.read_pkt_seq())
- # Convert v1 refs to Optional type
- refs = _to_optional_dict(refs_v1)
- # TODO: split_peeled_refs should accept Optional values
- (refs, peeled) = split_peeled_refs(refs) # type: ignore[arg-type,assignment]
- (symrefs, _agent) = _extract_symrefs_and_agent(
- server_capabilities
- )
- if ref_prefix is not None:
- refs = filter_ref_prefix(refs, ref_prefix)
- return refs, server_capabilities, base_url, symrefs, peeled
- else:
- self.protocol_version = 0 # dumb servers only support protocol v0
- # Read all the response data
- data = b""
- while True:
- chunk = read(4096)
- if not chunk:
- break
- data += chunk
- from typing import cast
- info_refs = read_info_refs(BytesIO(data))
- (refs_nonopt, peeled) = split_peeled_refs(info_refs)
- if ref_prefix is not None:
- refs_nonopt = filter_ref_prefix(refs_nonopt, ref_prefix)
- refs_result: dict[Ref, ObjectID | None] = cast(
- dict[Ref, ObjectID | None], refs_nonopt
- )
- return refs_result, set(), base_url, {}, peeled
- finally:
- resp.close()
- def _smart_request(
- self, service: str, url: str, data: bytes | Iterator[bytes]
- ) -> tuple["HTTPResponse", Callable[[int], bytes]]:
- """Send a 'smart' HTTP request.
- This is a simple wrapper around _http_request that sets
- a couple of extra headers.
- """
- assert url[-1] == "/"
- url = urljoin(url, service)
- result_content_type = f"application/x-{service}-result"
- headers = {
- "Content-Type": f"application/x-{service}-request",
- "Accept": result_content_type,
- }
- if self.protocol_version == 2:
- headers["Git-Protocol"] = "version=2"
- if isinstance(data, bytes):
- headers["Content-Length"] = str(len(data))
- resp, read = self._http_request(url, headers, data)
- if (
- not resp.content_type
- or resp.content_type.split(";")[0] != result_content_type
- ):
- raise GitProtocolError(
- f"Invalid content-type from server: {resp.content_type}"
- )
- return resp, read
- def send_pack(
- self,
- path: str | bytes,
- update_refs: Callable[[dict[Ref, ObjectID]], dict[Ref, ObjectID]],
- generate_pack_data: "GeneratePackDataFunc",
- progress: Callable[[bytes], None] | None = None,
- ) -> SendPackResult:
- """Upload a pack to a remote repository.
- Args:
- path: Repository path (as bytestring or string)
- update_refs: Function to determine changes to remote refs.
- Receives dict with existing remote refs, returns dict with
- changed refs (name -> sha, where sha=ZERO_SHA for deletions)
- generate_pack_data: Function that can return a tuple
- with number of elements and pack data to upload.
- progress: Optional progress function
- Returns:
- SendPackResult
- Raises:
- SendPackError: if server rejects the pack data
- """
- url = self._get_url(path)
- old_refs, server_capabilities, url, _symrefs, _peeled = (
- self._discover_references(b"git-receive-pack", url)
- )
- (
- negotiated_capabilities,
- agent,
- ) = self._negotiate_receive_pack_capabilities(server_capabilities)
- negotiated_capabilities.add(capability_agent())
- if CAPABILITY_REPORT_STATUS in negotiated_capabilities:
- self._report_status_parser = ReportStatusParser()
- # Assert that old_refs has no None values
- assert all(v is not None for v in old_refs.values()), (
- "old_refs should not contain None values"
- )
- old_refs_typed: dict[Ref, ObjectID] = old_refs # type: ignore[assignment]
- new_refs = update_refs(dict(old_refs_typed))
- if new_refs is None:
- # Determine wants function is aborting the push.
- # Convert to Optional type for SendPackResult
- return SendPackResult(
- _to_optional_dict(old_refs_typed), agent=agent, ref_status={}
- )
- if set(new_refs.items()).issubset(set(old_refs_typed.items())):
- # Convert to Optional type for SendPackResult
- return SendPackResult(
- _to_optional_dict(new_refs), agent=agent, ref_status={}
- )
- if self.dumb:
- raise NotImplementedError(self.fetch_pack)
- def body_generator() -> Iterator[bytes]:
- header_handler = _v1ReceivePackHeader(
- list(negotiated_capabilities), old_refs_typed, new_refs
- )
- for pkt in header_handler:
- yield pkt_line(pkt)
- pack_data_count, pack_data = generate_pack_data(
- header_handler.have,
- header_handler.want,
- ofs_delta=(CAPABILITY_OFS_DELTA in negotiated_capabilities),
- progress=progress,
- )
- if self._should_send_pack(new_refs):
- yield from PackChunkGenerator(
- # TODO: Don't hardcode object format
- num_records=pack_data_count,
- records=pack_data,
- object_format=DEFAULT_OBJECT_FORMAT,
- )
- resp, read = self._smart_request("git-receive-pack", url, data=body_generator())
- try:
- resp_proto = Protocol(read, lambda data: None)
- ref_status = self._handle_receive_pack_tail(
- resp_proto, negotiated_capabilities, progress
- )
- # Convert to Optional type for SendPackResult
- return SendPackResult(
- _to_optional_dict(new_refs), agent=agent, ref_status=ref_status
- )
- finally:
- resp.close()
- def fetch_pack(
- self,
- path: str | bytes,
- determine_wants: "DetermineWantsFunc",
- graph_walker: GraphWalker,
- pack_data: Callable[[bytes], int],
- progress: Callable[[bytes], None] | None = None,
- depth: int | None = None,
- ref_prefix: Sequence[bytes] | None = None,
- filter_spec: bytes | None = None,
- protocol_version: int | None = None,
- shallow_since: str | None = None,
- shallow_exclude: list[str] | None = None,
- ) -> FetchPackResult:
- """Retrieve a pack from a git smart server.
- Args:
- path: Path to fetch from
- determine_wants: Callback that returns list of commits to fetch
- graph_walker: Object with next() and ack().
- pack_data: Callback called for each bit of data in the pack
- progress: Callback for progress reports (strings)
- depth: Depth for request
- ref_prefix: List of prefixes of desired references, as a list of
- bytestrings. Filtering is done by the server if supported, and
- client side otherwise.
- filter_spec: A git-rev-list-style object filter spec, as bytestring.
- Only used if the server supports the Git protocol-v2 'filter'
- feature, and ignored otherwise.
- protocol_version: Desired Git protocol version. By default the highest
- mutually supported protocol version will be used.
- shallow_since: Deepen the history to include commits after this date
- shallow_exclude: Deepen the history to exclude commits reachable from these refs
- Returns:
- FetchPackResult object
- """
- url = self._get_url(path)
- refs, server_capabilities, url, symrefs, _peeled = self._discover_references(
- b"git-upload-pack",
- url,
- protocol_version=protocol_version,
- ref_prefix=ref_prefix,
- )
- (
- negotiated_capabilities,
- capa_symrefs,
- agent,
- ) = self._negotiate_upload_pack_capabilities(server_capabilities)
- object_format = extract_object_format_from_capabilities(server_capabilities)
- if not symrefs and capa_symrefs:
- symrefs = capa_symrefs
- # Filter out None values from refs for determine_wants
- refs_filtered = {k: v for k, v in refs.items() if v is not None}
- if depth is not None:
- wants = determine_wants(refs_filtered, depth=depth)
- else:
- wants = determine_wants(refs_filtered)
- if wants is not None:
- wants = [cid for cid in wants if cid != ZERO_SHA]
- if not wants and not self.dumb:
- return FetchPackResult(refs, symrefs, agent, object_format=object_format)
- elif self.dumb:
- # Use dumb HTTP protocol
- from .dumb import DumbRemoteHTTPRepo
- # Pass http_request function
- dumb_repo = DumbRemoteHTTPRepo(
- url, functools.partial(self._http_request, raise_for_status=False)
- )
- # Fetch pack data from dumb remote
- pack_data_list = list(
- dumb_repo.fetch_pack_data(
- lambda refs, depth: wants,
- graph_walker,
- progress=progress,
- depth=depth,
- )
- )
- head = dumb_repo.get_head()
- if head is not None:
- symrefs[HEADREF] = head
- # Write pack data
- if pack_data_list:
- from .pack import write_pack_data
- # Wrap pack_data to match expected signature
- def write_fn(data: bytes) -> None:
- pack_data(data)
- # Write pack data directly using the unpacked objects
- write_pack_data(
- write_fn,
- iter(pack_data_list),
- num_records=len(pack_data_list),
- progress=progress,
- object_format=DEFAULT_OBJECT_FORMAT,
- )
- return FetchPackResult(refs, symrefs, agent, object_format=object_format)
- req_data = BytesIO()
- req_proto = Protocol(None, req_data.write) # type: ignore
- (new_shallow, new_unshallow) = _handle_upload_pack_head(
- req_proto,
- negotiated_capabilities,
- graph_walker,
- wants,
- can_read=None,
- depth=depth,
- protocol_version=self.protocol_version,
- shallow_since=shallow_since,
- shallow_exclude=shallow_exclude,
- )
- if self.protocol_version == 2:
- data = pkt_line(b"command=fetch\n") + b"0001"
- if CAPABILITY_THIN_PACK in self._fetch_capabilities:
- data += pkt_line(b"thin-pack\n")
- if (
- find_capability(
- negotiated_capabilities, CAPABILITY_FETCH, CAPABILITY_FILTER
- )
- and filter_spec
- ):
- data += pkt_line(b"filter %s\n" % filter_spec)
- elif filter_spec:
- self._warn_filter_objects()
- data += req_data.getvalue()
- else:
- if filter_spec:
- self._warn_filter_objects()
- data = req_data.getvalue()
- resp, read = self._smart_request("git-upload-pack", url, data)
- try:
- resp_proto = Protocol(read, None) # type: ignore
- if new_shallow is None and new_unshallow is None:
- (new_shallow, new_unshallow) = _read_shallow_updates(
- resp_proto.read_pkt_seq()
- )
- _handle_upload_pack_tail(
- resp_proto,
- negotiated_capabilities,
- graph_walker,
- pack_data,
- progress,
- protocol_version=self.protocol_version,
- )
- return FetchPackResult(
- refs, symrefs, agent, new_shallow, new_unshallow, object_format
- )
- finally:
- resp.close()
- def get_refs(
- self,
- path: str | bytes,
- protocol_version: int | None = None,
- ref_prefix: Sequence[bytes] | None = None,
- ) -> LsRemoteResult:
- """Retrieve the current refs from a git smart server."""
- url = self._get_url(path)
- refs, server_capabilities, _, symrefs, peeled = self._discover_references(
- b"git-upload-pack",
- url,
- protocol_version=protocol_version,
- ref_prefix=ref_prefix,
- )
- object_format = extract_object_format_from_capabilities(server_capabilities)
- for refname, refvalue in peeled.items():
- refs[Ref(refname + PEELED_TAG_SUFFIX)] = refvalue
- return LsRemoteResult(refs, symrefs, object_format=object_format)
- def get_url(self, path: str) -> str:
- """Get the HTTP URL for a path."""
- url = self._get_url(path).rstrip("/")
- # Include credentials in the URL only if they came from a URL (not passed explicitly)
- # This preserves credentials that were in the original URL for git config storage
- if self._url_with_auth is not None:
- from urllib.parse import quote, urlparse, urlunparse
- assert self._username is not None
- parsed = urlparse(url)
- # Construct netloc with credentials
- if self._password is not None:
- netloc = f"{quote(self._username, safe='')}:{quote(self._password, safe='')}@{parsed.hostname}"
- else:
- netloc = f"{quote(self._username, safe='')}@{parsed.hostname}"
- if parsed.port:
- netloc += f":{parsed.port}"
- # Reconstruct URL with credentials
- url = urlunparse(
- (
- parsed.scheme,
- netloc,
- parsed.path,
- parsed.params,
- parsed.query,
- parsed.fragment,
- )
- )
- return url
- def _get_url(self, path: str | bytes) -> str:
- path_str = path if isinstance(path, str) else path.decode("utf-8")
- return urljoin(self._base_url, path_str).rstrip("/") + "/"
- @classmethod
- def from_parsedurl(
- cls,
- parsedurl: ParseResult,
- thin_packs: bool = True,
- report_activity: Callable[[int, str], None] | None = None,
- quiet: bool = False,
- include_tags: bool = False,
- dumb: bool = False,
- username: str | None = None,
- password: str | None = None,
- config: Config | None = None,
- pool_manager: "urllib3.PoolManager | None" = None,
- ) -> "AbstractHttpGitClient":
- """Create an AbstractHttpGitClient from a parsed URL.
- Args:
- parsedurl: Result of urlparse()
- thin_packs: Whether or not thin packs should be retrieved
- report_activity: Optional callback for reporting transport activity
- quiet: Whether to suppress progress output
- include_tags: Whether to include tags
- dumb: Whether to use dumb HTTP transport
- username: Optional username for authentication
- password: Optional password for authentication
- config: Configuration object
- pool_manager: Optional urllib3 PoolManager for HTTP(S) connections
- Returns:
- An AbstractHttpGitClient instance
- """
- # Extract credentials from URL if present
- # ParseResult.username and .password are URL-encoded, need to unquote them
- from urllib.parse import unquote
- url_username = unquote(parsedurl.username) if parsedurl.username else None
- url_password = unquote(parsedurl.password) if parsedurl.password else None
- # Explicit parameters take precedence over URL credentials
- final_username = username if username is not None else url_username
- final_password = password if password is not None else url_password
- # Remove credentials from URL for base_url
- hostname = parsedurl.hostname or ""
- base_parsed = parsedurl._replace(netloc=hostname)
- if parsedurl.port:
- base_parsed = base_parsed._replace(netloc=f"{hostname}:{parsedurl.port}")
- # Pass credentials to constructor if it's a subclass that supports them
- if issubclass(cls, Urllib3HttpGitClient):
- client: AbstractHttpGitClient = cls(
- urlunparse(base_parsed),
- dumb=dumb,
- thin_packs=thin_packs,
- report_activity=report_activity,
- quiet=quiet,
- include_tags=include_tags,
- username=final_username,
- password=final_password,
- config=config,
- pool_manager=pool_manager,
- )
- else:
- # Base class now supports credentials in constructor
- client = cls(
- urlunparse(base_parsed),
- dumb=dumb,
- thin_packs=thin_packs,
- report_activity=report_activity,
- quiet=quiet,
- include_tags=include_tags,
- username=final_username,
- password=final_password,
- )
- # Mark that credentials came from URL (not passed explicitly) if URL had credentials
- if url_username is not None or url_password is not None:
- client._url_with_auth = urlunparse(parsedurl)
- return client
- def __repr__(self) -> str:
- """Return string representation of this client."""
- return f"{type(self).__name__}({self._base_url!r}, dumb={self.dumb!r})"
- def _wrap_urllib3_exceptions(
- func: Callable[..., bytes],
- ) -> Callable[..., bytes]:
- from urllib3.exceptions import ProtocolError
- def wrapper(*args: object, **kwargs: object) -> bytes:
- try:
- return func(*args, **kwargs)
- except ProtocolError as error:
- raise GitProtocolError(str(error)) from error
- return wrapper
- class Urllib3HttpGitClient(AbstractHttpGitClient):
- """Git client that uses urllib3 for HTTP(S) connections."""
- def __init__(
- self,
- base_url: str,
- dumb: bool | None = None,
- pool_manager: "urllib3.PoolManager | None" = None,
- config: Config | None = None,
- username: str | None = None,
- password: str | None = None,
- timeout: float | None = None,
- extra_headers: dict[str, str] | None = None,
- thin_packs: bool = True,
- report_activity: Callable[[int, str], None] | None = None,
- quiet: bool = False,
- include_tags: bool = False,
- ) -> None:
- """Initialize Urllib3HttpGitClient."""
- self._timeout = timeout
- self._extra_headers = extra_headers or {}
- if pool_manager is None:
- self.pool_manager = default_urllib3_manager(
- config, base_url=base_url, timeout=timeout
- )
- else:
- self.pool_manager = pool_manager
- if username is not None:
- # No escaping needed: ":" is not allowed in username:
- # https://tools.ietf.org/html/rfc2617#section-2
- credentials = f"{username}:{password or ''}"
- import urllib3.util
- basic_auth = urllib3.util.make_headers(basic_auth=credentials)
- self.pool_manager.headers.update(basic_auth) # type: ignore
- self.config = config
- super().__init__(
- base_url=base_url,
- dumb=dumb if dumb is not None else False,
- thin_packs=thin_packs,
- report_activity=report_activity,
- quiet=quiet,
- include_tags=include_tags,
- username=username,
- password=password,
- )
- def _get_url(self, path: str | bytes) -> str:
- if not isinstance(path, str):
- # urllib3.util.url._encode_invalid_chars() converts the path back
- # to bytes using the utf-8 codec.
- path = path.decode("utf-8")
- return urljoin(self._base_url, path).rstrip("/") + "/"
- def _http_request(
- self,
- url: str,
- headers: dict[str, str] | None = None,
- data: bytes | Iterator[bytes] | None = None,
- raise_for_status: bool = True,
- ) -> tuple["HTTPResponse", Callable[[int], bytes]]:
- import urllib3.exceptions
- req_headers = dict(self.pool_manager.headers)
- if headers is not None:
- req_headers.update(headers)
- req_headers["Pragma"] = "no-cache"
- try:
- request_kwargs = {
- "headers": req_headers,
- "preload_content": False,
- }
- if self._timeout is not None:
- request_kwargs["timeout"] = self._timeout
- if data is None:
- resp = self.pool_manager.request("GET", url, **request_kwargs) # type: ignore[arg-type]
- else:
- request_kwargs["body"] = data
- resp = self.pool_manager.request("POST", url, **request_kwargs) # type: ignore[arg-type]
- except urllib3.exceptions.HTTPError as e:
- raise GitProtocolError(str(e)) from e
- if raise_for_status:
- if resp.status == 404:
- raise NotGitRepository
- if resp.status == 401:
- raise HTTPUnauthorized(resp.headers.get("WWW-Authenticate"), url)
- if resp.status == 407:
- raise HTTPProxyUnauthorized(resp.headers.get("Proxy-Authenticate"), url)
- if resp.status != 200:
- raise GitProtocolError(f"unexpected http resp {resp.status} for {url}")
- resp.content_type = resp.headers.get("Content-Type") # type: ignore[attr-defined]
- # Check if geturl() is available (urllib3 version >= 1.23)
- try:
- resp_url = resp.geturl()
- except AttributeError:
- # get_redirect_location() is available for urllib3 >= 1.1
- resp.redirect_location = resp.get_redirect_location() # type: ignore[attr-defined]
- else:
- resp.redirect_location = resp_url if resp_url != url else "" # type: ignore[attr-defined]
- return resp, _wrap_urllib3_exceptions(resp.read) # type: ignore[return-value]
- HttpGitClient = Urllib3HttpGitClient
- def _win32_url_to_path(parsed: ParseResult) -> str:
- """Convert a file: URL to a path.
- https://datatracker.ietf.org/doc/html/rfc8089
- """
- assert parsed.scheme == "file"
- _, netloc, path, _, _, _ = parsed
- if netloc == "localhost" or not netloc:
- netloc = ""
- elif (
- netloc
- and len(netloc) >= 2
- and netloc[0].isalpha()
- and netloc[1:2] in (":", ":/")
- ):
- # file://C:/foo.bar/baz or file://C://foo.bar//baz
- netloc = netloc[:2]
- else:
- raise NotImplementedError("Non-local file URLs are not supported")
- from nturl2path import url2pathname
- return url2pathname(netloc + path)
- def get_transport_and_path_from_url(
- url: str,
- config: Config | None = None,
- operation: str | None = None,
- thin_packs: bool = True,
- report_activity: Callable[[int, str], None] | None = None,
- quiet: bool = False,
- include_tags: bool = False,
- username: str | None = None,
- password: str | None = None,
- key_filename: str | None = None,
- ssh_command: str | None = None,
- pool_manager: "urllib3.PoolManager | None" = None,
- ) -> tuple[GitClient, str]:
- """Obtain a git client from a URL.
- Args:
- url: URL to open (a unicode string)
- config: Optional config object
- operation: Kind of operation that'll be performed; "pull" or "push"
- thin_packs: Whether or not thin packs should be retrieved
- report_activity: Optional callback for reporting transport activity
- quiet: Whether to suppress output
- include_tags: Send annotated tags when sending the objects they point to
- username: Optional username for authentication
- password: Optional password for authentication
- key_filename: Optional SSH key file
- ssh_command: Optional custom SSH command
- pool_manager: Optional urllib3 PoolManager for HTTP(S) connections
- Returns:
- Tuple with client instance and relative path.
- """
- if config is not None:
- url = apply_instead_of(config, url, push=(operation == "push"))
- return _get_transport_and_path_from_url(
- url,
- config=config,
- operation=operation,
- thin_packs=thin_packs,
- report_activity=report_activity,
- quiet=quiet,
- include_tags=include_tags,
- username=username,
- password=password,
- key_filename=key_filename,
- ssh_command=ssh_command,
- pool_manager=pool_manager,
- )
- def _get_transport_and_path_from_url(
- url: str,
- config: Config | None,
- operation: str | None,
- thin_packs: bool = True,
- report_activity: Callable[[int, str], None] | None = None,
- quiet: bool = False,
- include_tags: bool = False,
- username: str | None = None,
- password: str | None = None,
- key_filename: str | None = None,
- ssh_command: str | None = None,
- pool_manager: "urllib3.PoolManager | None" = None,
- ) -> tuple[GitClient, str]:
- parsed = urlparse(url)
- if parsed.scheme == "git":
- return (
- TCPGitClient.from_parsedurl(
- parsed,
- thin_packs=thin_packs,
- report_activity=report_activity,
- quiet=quiet,
- include_tags=include_tags,
- ),
- parsed.path,
- )
- elif parsed.scheme in ("git+ssh", "ssh"):
- return SSHGitClient.from_parsedurl(
- parsed,
- config=config,
- username=username,
- password=password,
- thin_packs=thin_packs,
- report_activity=report_activity,
- quiet=quiet,
- include_tags=include_tags,
- key_filename=key_filename,
- ssh_command=ssh_command,
- ), parsed.path
- elif parsed.scheme in ("http", "https"):
- return (
- HttpGitClient.from_parsedurl(
- parsed,
- config=config,
- username=username,
- password=password,
- thin_packs=thin_packs,
- report_activity=report_activity,
- quiet=quiet,
- include_tags=include_tags,
- pool_manager=pool_manager,
- ),
- parsed.path,
- )
- elif parsed.scheme == "file":
- if sys.platform == "win32" or os.name == "nt":
- return default_local_git_client_cls(
- thin_packs=thin_packs,
- report_activity=report_activity,
- quiet=quiet,
- include_tags=include_tags,
- ), _win32_url_to_path(parsed)
- return (
- default_local_git_client_cls.from_parsedurl(
- parsed,
- thin_packs=thin_packs,
- report_activity=report_activity,
- quiet=quiet,
- include_tags=include_tags,
- ),
- parsed.path,
- )
- raise ValueError(f"unknown scheme '{parsed.scheme}'")
- def parse_rsync_url(location: str) -> tuple[str | None, str, str]:
- """Parse a rsync-style URL."""
- if ":" in location and "@" not in location:
- # SSH with no user@, zero or one leading slash.
- (host, path) = location.split(":", 1)
- user = None
- elif ":" in location:
- # SSH with user@host:foo.
- user_host, path = location.split(":", 1)
- if "@" in user_host:
- user, host = user_host.rsplit("@", 1)
- else:
- user = None
- host = user_host
- else:
- raise ValueError("not a valid rsync-style URL")
- return (user, host, path)
- def get_transport_and_path(
- location: str,
- config: Config | None = None,
- operation: str | None = None,
- thin_packs: bool = True,
- report_activity: Callable[[int, str], None] | None = None,
- quiet: bool = False,
- include_tags: bool = False,
- username: str | None = None,
- password: str | None = None,
- key_filename: str | None = None,
- ssh_command: str | None = None,
- pool_manager: "urllib3.PoolManager | None" = None,
- ) -> tuple[GitClient, str]:
- """Obtain a git client from a URL.
- Args:
- location: URL or path (a string)
- config: Optional config object
- operation: Kind of operation that'll be performed; "pull" or "push"
- thin_packs: Whether or not thin packs should be retrieved
- report_activity: Optional callback for reporting transport activity
- quiet: Whether to suppress output
- include_tags: Send annotated tags when sending the objects they point to
- username: Optional username for authentication
- password: Optional password for authentication
- key_filename: Optional SSH key file
- ssh_command: Optional custom SSH command
- pool_manager: Optional urllib3 PoolManager for HTTP(S) connections
- Returns:
- Tuple with client instance and relative path.
- """
- if config is not None:
- location = apply_instead_of(config, location, push=(operation == "push"))
- # First, try to parse it as a URL
- try:
- return _get_transport_and_path_from_url(
- location,
- config=config,
- operation=operation,
- thin_packs=thin_packs,
- report_activity=report_activity,
- quiet=quiet,
- include_tags=include_tags,
- username=username,
- password=password,
- key_filename=key_filename,
- ssh_command=ssh_command,
- pool_manager=pool_manager,
- )
- except ValueError:
- pass
- if sys.platform == "win32" and location[0].isalpha() and location[1:3] == ":\\":
- # Windows local path - but check if it's a bundle file first
- if BundleClient._is_bundle_file(location):
- return BundleClient(
- thin_packs=thin_packs,
- report_activity=report_activity,
- quiet=quiet,
- include_tags=include_tags,
- ), location
- return default_local_git_client_cls(
- thin_packs=thin_packs,
- report_activity=report_activity,
- quiet=quiet,
- include_tags=include_tags,
- ), location
- try:
- (rsync_username, hostname, path) = parse_rsync_url(location)
- except ValueError:
- # Check if it's a bundle file before assuming it's a local path
- if BundleClient._is_bundle_file(location):
- return BundleClient(
- thin_packs=thin_packs,
- report_activity=report_activity,
- quiet=quiet,
- include_tags=include_tags,
- ), location
- # Otherwise, assume it's a local path.
- return default_local_git_client_cls(
- thin_packs=thin_packs,
- report_activity=report_activity,
- quiet=quiet,
- include_tags=include_tags,
- ), location
- else:
- return SSHGitClient(
- hostname,
- username=rsync_username or username,
- config=config,
- password=password,
- key_filename=key_filename,
- ssh_command=ssh_command,
- thin_packs=thin_packs,
- report_activity=report_activity,
- quiet=quiet,
- include_tags=include_tags,
- ), path
- DEFAULT_GIT_CREDENTIALS_PATHS = [
- os.path.expanduser("~/.git-credentials"),
- get_xdg_config_home_path("git", "credentials"),
- ]
- def get_credentials_from_store(
- scheme: str,
- hostname: str,
- username: str | None = None,
- fnames: list[str] = DEFAULT_GIT_CREDENTIALS_PATHS,
- ) -> Iterator[tuple[str, str]]:
- """Read credentials from a Git credential store."""
- for fname in fnames:
- try:
- with open(fname, "rb") as f:
- for line in f:
- line_str = line.strip().decode("utf-8")
- parsed_line = urlparse(line_str)
- if (
- parsed_line.scheme == scheme
- and parsed_line.hostname == hostname
- and (username is None or parsed_line.username == username)
- ):
- if parsed_line.username and parsed_line.password:
- yield parsed_line.username, parsed_line.password
- except FileNotFoundError:
- # If the file doesn't exist, try the next one.
- continue
|