porcelain.py 236 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840384138423843384438453846384738483849385038513852385338543855385638573858385938603861386238633864386538663867386838693870387138723873387438753876387738783879388038813882388338843885388638873888388938903891389238933894389538963897389838993900390139023903390439053906390739083909391039113912391339143915391639173918391939203921392239233924392539263927392839293930393139323933393439353936393739383939394039413942394339443945394639473948394939503951395239533954395539563957395839593960396139623963396439653966396739683969397039713972397339743975397639773978397939803981398239833984398539863987398839893990399139923993399439953996399739983999400040014002400340044005400640074008400940104011401240134014401540164017401840194020402140224023402440254026402740284029403040314032403340344035403640374038403940404041404240434044404540464047404840494050405140524053405440554056405740584059406040614062406340644065406640674068406940704071407240734074407540764077407840794080408140824083408440854086408740884089409040914092409340944095409640974098409941004101410241034104410541064107410841094110411141124113411441154116411741184119412041214122412341244125412641274128412941304131413241334134413541364137413841394140414141424143414441454146414741484149415041514152415341544155415641574158415941604161416241634164416541664167416841694170417141724173417441754176417741784179418041814182418341844185418641874188418941904191419241934194419541964197419841994200420142024203420442054206420742084209421042114212421342144215421642174218421942204221422242234224422542264227422842294230423142324233423442354236423742384239424042414242424342444245424642474248424942504251425242534254425542564257425842594260426142624263426442654266426742684269427042714272427342744275427642774278427942804281428242834284428542864287428842894290429142924293429442954296429742984299430043014302430343044305430643074308430943104311431243134314431543164317431843194320432143224323432443254326432743284329433043314332433343344335433643374338433943404341434243434344434543464347434843494350435143524353435443554356435743584359436043614362436343644365436643674368436943704371437243734374437543764377437843794380438143824383438443854386438743884389439043914392439343944395439643974398439944004401440244034404440544064407440844094410441144124413441444154416441744184419442044214422442344244425442644274428442944304431443244334434443544364437443844394440444144424443444444454446444744484449445044514452445344544455445644574458445944604461446244634464446544664467446844694470447144724473447444754476447744784479448044814482448344844485448644874488448944904491449244934494449544964497449844994500450145024503450445054506450745084509451045114512451345144515451645174518451945204521452245234524452545264527452845294530453145324533453445354536453745384539454045414542454345444545454645474548454945504551455245534554455545564557455845594560456145624563456445654566456745684569457045714572457345744575457645774578457945804581458245834584458545864587458845894590459145924593459445954596459745984599460046014602460346044605460646074608460946104611461246134614461546164617461846194620462146224623462446254626462746284629463046314632463346344635463646374638463946404641464246434644464546464647464846494650465146524653465446554656465746584659466046614662466346644665466646674668466946704671467246734674467546764677467846794680468146824683468446854686468746884689469046914692469346944695469646974698469947004701470247034704470547064707470847094710471147124713471447154716471747184719472047214722472347244725472647274728472947304731473247334734473547364737473847394740474147424743474447454746474747484749475047514752475347544755475647574758475947604761476247634764476547664767476847694770477147724773477447754776477747784779478047814782478347844785478647874788478947904791479247934794479547964797479847994800480148024803480448054806480748084809481048114812481348144815481648174818481948204821482248234824482548264827482848294830483148324833483448354836483748384839484048414842484348444845484648474848484948504851485248534854485548564857485848594860486148624863486448654866486748684869487048714872487348744875487648774878487948804881488248834884488548864887488848894890489148924893489448954896489748984899490049014902490349044905490649074908490949104911491249134914491549164917491849194920492149224923492449254926492749284929493049314932493349344935493649374938493949404941494249434944494549464947494849494950495149524953495449554956495749584959496049614962496349644965496649674968496949704971497249734974497549764977497849794980498149824983498449854986498749884989499049914992499349944995499649974998499950005001500250035004500550065007500850095010501150125013501450155016501750185019502050215022502350245025502650275028502950305031503250335034503550365037503850395040504150425043504450455046504750485049505050515052505350545055505650575058505950605061506250635064506550665067506850695070507150725073507450755076507750785079508050815082508350845085508650875088508950905091509250935094509550965097509850995100510151025103510451055106510751085109511051115112511351145115511651175118511951205121512251235124512551265127512851295130513151325133513451355136513751385139514051415142514351445145514651475148514951505151515251535154515551565157515851595160516151625163516451655166516751685169517051715172517351745175517651775178517951805181518251835184518551865187518851895190519151925193519451955196519751985199520052015202520352045205520652075208520952105211521252135214521552165217521852195220522152225223522452255226522752285229523052315232523352345235523652375238523952405241524252435244524552465247524852495250525152525253525452555256525752585259526052615262526352645265526652675268526952705271527252735274527552765277527852795280528152825283528452855286528752885289529052915292529352945295529652975298529953005301530253035304530553065307530853095310531153125313531453155316531753185319532053215322532353245325532653275328532953305331533253335334533553365337533853395340534153425343534453455346534753485349535053515352535353545355535653575358535953605361536253635364536553665367536853695370537153725373537453755376537753785379538053815382538353845385538653875388538953905391539253935394539553965397539853995400540154025403540454055406540754085409541054115412541354145415541654175418541954205421542254235424542554265427542854295430543154325433543454355436543754385439544054415442544354445445544654475448544954505451545254535454545554565457545854595460546154625463546454655466546754685469547054715472547354745475547654775478547954805481548254835484548554865487548854895490549154925493549454955496549754985499550055015502550355045505550655075508550955105511551255135514551555165517551855195520552155225523552455255526552755285529553055315532553355345535553655375538553955405541554255435544554555465547554855495550555155525553555455555556555755585559556055615562556355645565556655675568556955705571557255735574557555765577557855795580558155825583558455855586558755885589559055915592559355945595559655975598559956005601560256035604560556065607560856095610561156125613561456155616561756185619562056215622562356245625562656275628562956305631563256335634563556365637563856395640564156425643564456455646564756485649565056515652565356545655565656575658565956605661566256635664566556665667566856695670567156725673567456755676567756785679568056815682568356845685568656875688568956905691569256935694569556965697569856995700570157025703570457055706570757085709571057115712571357145715571657175718571957205721572257235724572557265727572857295730573157325733573457355736573757385739574057415742574357445745574657475748574957505751575257535754575557565757575857595760576157625763576457655766576757685769577057715772577357745775577657775778577957805781578257835784578557865787578857895790579157925793579457955796579757985799580058015802580358045805580658075808580958105811581258135814581558165817581858195820582158225823582458255826582758285829583058315832583358345835583658375838583958405841584258435844584558465847584858495850585158525853585458555856585758585859586058615862586358645865586658675868586958705871587258735874587558765877587858795880588158825883588458855886588758885889589058915892589358945895589658975898589959005901590259035904590559065907590859095910591159125913591459155916591759185919592059215922592359245925592659275928592959305931593259335934593559365937593859395940594159425943594459455946594759485949595059515952595359545955595659575958595959605961596259635964596559665967596859695970597159725973597459755976597759785979598059815982598359845985598659875988598959905991599259935994599559965997599859996000600160026003600460056006600760086009601060116012601360146015601660176018601960206021602260236024602560266027602860296030603160326033603460356036603760386039604060416042604360446045604660476048604960506051605260536054605560566057605860596060606160626063606460656066606760686069607060716072607360746075607660776078607960806081608260836084608560866087608860896090609160926093609460956096609760986099610061016102610361046105610661076108610961106111611261136114611561166117611861196120612161226123612461256126612761286129613061316132613361346135613661376138613961406141614261436144614561466147614861496150615161526153615461556156615761586159616061616162616361646165616661676168616961706171617261736174617561766177617861796180618161826183618461856186618761886189619061916192619361946195619661976198619962006201620262036204620562066207620862096210621162126213621462156216621762186219622062216222622362246225622662276228622962306231623262336234623562366237623862396240624162426243624462456246624762486249625062516252625362546255625662576258625962606261626262636264626562666267626862696270627162726273627462756276627762786279628062816282628362846285628662876288628962906291629262936294629562966297629862996300630163026303630463056306630763086309631063116312631363146315631663176318631963206321632263236324632563266327632863296330633163326333633463356336633763386339634063416342634363446345634663476348634963506351635263536354635563566357635863596360636163626363636463656366636763686369637063716372637363746375637663776378637963806381638263836384638563866387638863896390639163926393639463956396639763986399640064016402640364046405640664076408640964106411641264136414641564166417641864196420642164226423642464256426642764286429643064316432643364346435643664376438643964406441644264436444644564466447644864496450645164526453645464556456645764586459646064616462646364646465646664676468646964706471647264736474647564766477647864796480648164826483648464856486648764886489649064916492649364946495649664976498649965006501650265036504650565066507650865096510651165126513651465156516651765186519652065216522652365246525652665276528652965306531653265336534653565366537653865396540654165426543654465456546654765486549655065516552655365546555655665576558655965606561656265636564656565666567656865696570657165726573657465756576657765786579658065816582658365846585658665876588658965906591659265936594659565966597659865996600660166026603660466056606660766086609661066116612661366146615661666176618661966206621662266236624662566266627662866296630663166326633663466356636663766386639664066416642664366446645664666476648664966506651665266536654665566566657665866596660666166626663666466656666666766686669667066716672667366746675667666776678667966806681668266836684668566866687668866896690669166926693669466956696669766986699670067016702670367046705670667076708670967106711671267136714671567166717671867196720672167226723672467256726672767286729673067316732673367346735673667376738673967406741674267436744674567466747674867496750675167526753675467556756675767586759676067616762676367646765676667676768676967706771677267736774677567766777677867796780678167826783678467856786678767886789679067916792679367946795679667976798679968006801680268036804680568066807680868096810681168126813681468156816681768186819682068216822682368246825682668276828682968306831683268336834683568366837683868396840684168426843684468456846684768486849685068516852685368546855685668576858685968606861686268636864686568666867686868696870687168726873687468756876687768786879688068816882688368846885688668876888688968906891689268936894689568966897689868996900690169026903690469056906690769086909691069116912691369146915691669176918691969206921692269236924692569266927692869296930693169326933693469356936693769386939694069416942694369446945694669476948694969506951695269536954695569566957695869596960696169626963696469656966696769686969697069716972697369746975697669776978697969806981698269836984698569866987698869896990699169926993699469956996699769986999700070017002700370047005700670077008700970107011701270137014701570167017701870197020702170227023702470257026702770287029703070317032703370347035703670377038
  1. # e porcelain.py -- Porcelain-like layer on top of Dulwich
  2. # Copyright (C) 2013 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Simple wrapper that provides porcelain-like functions on top of Dulwich.
  22. Currently implemented:
  23. * archive
  24. * add
  25. * bisect{_start,_bad,_good,_skip,_reset,_log,_replay}
  26. * branch{_create,_delete,_list}
  27. * check_ignore
  28. * checkout
  29. * checkout_branch
  30. * clone
  31. * cone mode{_init, _set, _add}
  32. * commit
  33. * commit_tree
  34. * daemon
  35. * describe
  36. * diff_tree
  37. * fetch
  38. * filter_branch
  39. * for_each_ref
  40. * init
  41. * ls_files
  42. * ls_remote
  43. * ls_tree
  44. * merge
  45. * merge_tree
  46. * mv/move
  47. * prune
  48. * pull
  49. * push
  50. * rm
  51. * remote{_add}
  52. * receive_pack
  53. * reset
  54. * revert
  55. * sparse_checkout
  56. * submodule_add
  57. * submodule_init
  58. * submodule_list
  59. * rev_list
  60. * tag{_create,_delete,_list}
  61. * upload_pack
  62. * update_server_info
  63. * write_commit_graph
  64. * status
  65. * shortlog
  66. * symbolic_ref
  67. * worktree{_add,_list,_remove,_prune,_lock,_unlock,_move}
  68. These functions are meant to behave similarly to the git subcommands.
  69. Differences in behaviour are considered bugs.
  70. Note: one of the consequences of this is that paths tend to be
  71. interpreted relative to the current working directory rather than relative
  72. to the repository root.
  73. Functions should generally accept both unicode strings and bytestrings
  74. """
  75. import datetime
  76. import fnmatch
  77. import logging
  78. import os
  79. import posixpath
  80. import stat
  81. import sys
  82. import time
  83. from collections import namedtuple
  84. from collections.abc import Iterable, Iterator, Sequence
  85. from collections.abc import Set as AbstractSet
  86. from contextlib import AbstractContextManager, closing, contextmanager
  87. from dataclasses import dataclass
  88. from io import BytesIO, RawIOBase
  89. from pathlib import Path
  90. from typing import (
  91. IO,
  92. TYPE_CHECKING,
  93. Any,
  94. BinaryIO,
  95. Callable,
  96. Optional,
  97. TextIO,
  98. TypeVar,
  99. Union,
  100. cast,
  101. overload,
  102. )
  103. if sys.version_info >= (3, 12):
  104. from collections.abc import Buffer
  105. from typing import override
  106. else:
  107. from typing_extensions import Buffer, override
  108. if TYPE_CHECKING:
  109. from .filter_branch import CommitData
  110. from .gc import GCStats
  111. from . import replace_me
  112. from .archive import tar_stream
  113. from .bisect import BisectState
  114. from .client import (
  115. FetchPackResult,
  116. LsRemoteResult,
  117. SendPackResult,
  118. get_transport_and_path,
  119. )
  120. from .config import Config, ConfigFile, StackedConfig, read_submodules
  121. from .diff_tree import (
  122. CHANGE_ADD,
  123. CHANGE_COPY,
  124. CHANGE_DELETE,
  125. CHANGE_MODIFY,
  126. CHANGE_RENAME,
  127. RENAME_CHANGE_TYPES,
  128. TreeChange,
  129. tree_changes,
  130. )
  131. from .errors import SendPackError
  132. from .graph import can_fast_forward
  133. from .ignore import IgnoreFilterManager
  134. from .index import (
  135. ConflictedIndexEntry,
  136. Index,
  137. IndexEntry,
  138. _fs_to_tree_path,
  139. blob_from_path_and_stat,
  140. build_file_from_blob,
  141. build_index_from_tree,
  142. get_unstaged_changes,
  143. index_entry_from_stat,
  144. symlink,
  145. update_working_tree,
  146. validate_path_element_default,
  147. validate_path_element_hfs,
  148. validate_path_element_ntfs,
  149. )
  150. from .object_store import BaseObjectStore, tree_lookup_path
  151. from .objects import (
  152. Blob,
  153. Commit,
  154. Tag,
  155. Tree,
  156. TreeEntry,
  157. format_timezone,
  158. parse_timezone,
  159. pretty_format_tree_entry,
  160. )
  161. from .objectspec import (
  162. parse_commit,
  163. parse_object,
  164. parse_ref,
  165. parse_reftuples,
  166. parse_tree,
  167. )
  168. from .pack import UnpackedObject, write_pack_from_container, write_pack_index
  169. from .patch import (
  170. get_summary,
  171. write_commit_patch,
  172. write_object_diff,
  173. write_tree_diff,
  174. )
  175. from .protocol import ZERO_SHA, Protocol
  176. from .refs import (
  177. LOCAL_BRANCH_PREFIX,
  178. LOCAL_NOTES_PREFIX,
  179. LOCAL_REMOTE_PREFIX,
  180. LOCAL_TAG_PREFIX,
  181. Ref,
  182. SymrefLoop,
  183. _import_remote_refs,
  184. )
  185. from .repo import BaseRepo, Repo, get_user_identity
  186. from .server import (
  187. FileSystemBackend,
  188. ReceivePackHandler,
  189. TCPGitServer,
  190. UploadPackHandler,
  191. )
  192. from .server import update_server_info as server_update_server_info
  193. from .sparse_patterns import (
  194. SparseCheckoutConflictError,
  195. apply_included_paths,
  196. determine_included_paths,
  197. )
  198. # Module level tuple definition for status output
  199. GitStatus = namedtuple("GitStatus", "staged unstaged untracked")
  200. # TypeVar for preserving BaseRepo subclass types
  201. T = TypeVar("T", bound="BaseRepo")
  202. # Type alias for common repository parameter pattern
  203. RepoPath = Union[str, os.PathLike[str], Repo]
  204. @dataclass
  205. class CountObjectsResult:
  206. """Result of counting objects in a repository.
  207. Attributes:
  208. count: Number of loose objects
  209. size: Total size of loose objects in bytes
  210. in_pack: Number of objects in pack files
  211. packs: Number of pack files
  212. size_pack: Total size of pack files in bytes
  213. """
  214. count: int
  215. size: int
  216. in_pack: Optional[int] = None
  217. packs: Optional[int] = None
  218. size_pack: Optional[int] = None
  219. class NoneStream(RawIOBase):
  220. """Fallback if stdout or stderr are unavailable, does nothing."""
  221. def read(self, size: int = -1) -> None:
  222. """Read from stream (returns None as this is a null stream)."""
  223. return None
  224. def readall(self) -> bytes:
  225. """Read all bytes (returns empty bytes).
  226. Returns:
  227. Empty bytes object
  228. """
  229. return b""
  230. @override
  231. def readinto(self, b: Buffer) -> Optional[int]:
  232. return 0
  233. @override
  234. def write(self, b: Buffer) -> Optional[int]:
  235. # All Buffer implementations (bytes, bytearray, memoryview) support len()
  236. return len(b) if b else 0 # type: ignore[arg-type]
  237. default_bytes_out_stream: BinaryIO = cast(
  238. BinaryIO, getattr(sys.stdout, "buffer", None) or NoneStream()
  239. )
  240. default_bytes_err_stream: BinaryIO = cast(
  241. BinaryIO, getattr(sys.stderr, "buffer", None) or NoneStream()
  242. )
  243. DEFAULT_ENCODING = "utf-8"
  244. class Error(Exception):
  245. """Porcelain-based error."""
  246. def __init__(self, msg: str) -> None:
  247. """Initialize Error with message."""
  248. super().__init__(msg)
  249. class RemoteExists(Error):
  250. """Raised when the remote already exists."""
  251. class TimezoneFormatError(Error):
  252. """Raised when the timezone cannot be determined from a given string."""
  253. class CheckoutError(Error):
  254. """Indicates that a checkout cannot be performed."""
  255. def parse_timezone_format(tz_str: str) -> int:
  256. """Parse given string and attempt to return a timezone offset.
  257. Different formats are considered in the following order:
  258. - Git internal format: <unix timestamp> <timezone offset>
  259. - RFC 2822: e.g. Mon, 20 Nov 1995 19:12:08 -0500
  260. - ISO 8601: e.g. 1995-11-20T19:12:08-0500
  261. Args:
  262. tz_str: datetime string
  263. Returns: Timezone offset as integer
  264. Raises:
  265. TimezoneFormatError: if timezone information cannot be extracted
  266. """
  267. import re
  268. # Git internal format
  269. internal_format_pattern = re.compile("^[0-9]+ [+-][0-9]{,4}$")
  270. if re.match(internal_format_pattern, tz_str):
  271. try:
  272. tz_internal = parse_timezone(tz_str.split(" ")[1].encode(DEFAULT_ENCODING))
  273. return tz_internal[0]
  274. except ValueError:
  275. pass
  276. # RFC 2822
  277. import email.utils
  278. rfc_2822 = email.utils.parsedate_tz(tz_str)
  279. if rfc_2822 and rfc_2822[9] is not None:
  280. return rfc_2822[9]
  281. # ISO 8601
  282. # Supported offsets:
  283. # sHHMM, sHH:MM, sHH
  284. iso_8601_pattern = re.compile(
  285. "[0-9] ?([+-])([0-9]{2})(?::(?=[0-9]{2}))?([0-9]{2})?$"
  286. )
  287. match = re.search(iso_8601_pattern, tz_str)
  288. total_secs = 0
  289. if match:
  290. sign, hours, minutes = match.groups()
  291. total_secs += int(hours) * 3600
  292. if minutes:
  293. total_secs += int(minutes) * 60
  294. total_secs = -total_secs if sign == "-" else total_secs
  295. return total_secs
  296. # YYYY.MM.DD, MM/DD/YYYY, DD.MM.YYYY contain no timezone information
  297. raise TimezoneFormatError(tz_str)
  298. def get_user_timezones() -> tuple[int, int]:
  299. """Retrieve local timezone as described in git documentation.
  300. https://raw.githubusercontent.com/git/git/v2.3.0/Documentation/date-formats.txt
  301. Returns: A tuple containing author timezone, committer timezone.
  302. """
  303. local_timezone = time.localtime().tm_gmtoff
  304. if os.environ.get("GIT_AUTHOR_DATE"):
  305. author_timezone = parse_timezone_format(os.environ["GIT_AUTHOR_DATE"])
  306. else:
  307. author_timezone = local_timezone
  308. if os.environ.get("GIT_COMMITTER_DATE"):
  309. commit_timezone = parse_timezone_format(os.environ["GIT_COMMITTER_DATE"])
  310. else:
  311. commit_timezone = local_timezone
  312. return author_timezone, commit_timezone
  313. @overload
  314. def open_repo(path_or_repo: T) -> AbstractContextManager[T]: ...
  315. @overload
  316. def open_repo(
  317. path_or_repo: Union[str, os.PathLike[str]],
  318. ) -> AbstractContextManager[Repo]: ...
  319. def open_repo(
  320. path_or_repo: Union[str, os.PathLike[str], T],
  321. ) -> AbstractContextManager[Union[T, Repo]]:
  322. """Open an argument that can be a repository or a path for a repository."""
  323. if isinstance(path_or_repo, BaseRepo):
  324. return _noop_context_manager(path_or_repo)
  325. return Repo(path_or_repo)
  326. @contextmanager
  327. def _noop_context_manager(obj: T) -> Iterator[T]:
  328. """Context manager that has the same api as closing but does nothing."""
  329. yield obj
  330. @overload
  331. def open_repo_closing(path_or_repo: T) -> AbstractContextManager[T]: ...
  332. @overload
  333. def open_repo_closing(
  334. path_or_repo: Union[str, bytes, os.PathLike[str]],
  335. ) -> AbstractContextManager[Repo]: ...
  336. def open_repo_closing(
  337. path_or_repo: Union[str, bytes, os.PathLike[str], T],
  338. ) -> AbstractContextManager[Union[T, Repo]]:
  339. """Open an argument that can be a repository or a path for a repository.
  340. returns a context manager that will close the repo on exit if the argument
  341. is a path, else does nothing if the argument is a repo.
  342. """
  343. if isinstance(path_or_repo, BaseRepo):
  344. return _noop_context_manager(path_or_repo)
  345. return closing(Repo(path_or_repo))
  346. def path_to_tree_path(
  347. repopath: Union[str, bytes, os.PathLike[str]],
  348. path: Union[str, bytes, os.PathLike[str]],
  349. tree_encoding: str = DEFAULT_ENCODING,
  350. ) -> bytes:
  351. """Convert a path to a path usable in an index, e.g. bytes and relative to the repository root.
  352. Args:
  353. repopath: Repository path, absolute or relative to the cwd
  354. path: A path, absolute or relative to the cwd
  355. tree_encoding: Encoding to use for tree paths
  356. Returns: A path formatted for use in e.g. an index
  357. """
  358. # Resolve might returns a relative path on Windows
  359. # https://bugs.python.org/issue38671
  360. if sys.platform == "win32":
  361. path = os.path.abspath(path)
  362. # Convert bytes paths to str for Path
  363. if isinstance(path, bytes):
  364. path = os.fsdecode(path)
  365. path = Path(path)
  366. resolved_path = path.resolve()
  367. # Resolve and abspath seems to behave differently regarding symlinks,
  368. # as we are doing abspath on the file path, we need to do the same on
  369. # the repo path or they might not match
  370. if sys.platform == "win32":
  371. repopath = os.path.abspath(repopath)
  372. # Convert bytes paths to str for Path
  373. if isinstance(repopath, bytes):
  374. repopath = os.fsdecode(repopath)
  375. repopath = Path(repopath).resolve()
  376. try:
  377. relpath = resolved_path.relative_to(repopath)
  378. except ValueError:
  379. # If path is a symlink that points to a file outside the repo, we
  380. # want the relpath for the link itself, not the resolved target
  381. if path.is_symlink():
  382. parent = path.parent.resolve()
  383. relpath = (parent / path.name).relative_to(repopath)
  384. else:
  385. raise
  386. if sys.platform == "win32":
  387. return str(relpath).replace(os.path.sep, "/").encode(tree_encoding)
  388. else:
  389. return bytes(relpath)
  390. class DivergedBranches(Error):
  391. """Branches have diverged and fast-forward is not possible."""
  392. def __init__(self, current_sha: bytes, new_sha: bytes) -> None:
  393. """Initialize DivergedBranches error with current and new SHA values."""
  394. self.current_sha = current_sha
  395. self.new_sha = new_sha
  396. def check_diverged(repo: BaseRepo, current_sha: bytes, new_sha: bytes) -> None:
  397. """Check if updating to a sha can be done with fast forwarding.
  398. Args:
  399. repo: Repository object
  400. current_sha: Current head sha
  401. new_sha: New head sha
  402. """
  403. try:
  404. can = can_fast_forward(repo, current_sha, new_sha)
  405. except KeyError:
  406. can = False
  407. if not can:
  408. raise DivergedBranches(current_sha, new_sha)
  409. def archive(
  410. repo: Union[str, BaseRepo],
  411. committish: Optional[Union[str, bytes, Commit, Tag]] = None,
  412. outstream: Union[BinaryIO, RawIOBase] = default_bytes_out_stream,
  413. errstream: Union[BinaryIO, RawIOBase] = default_bytes_err_stream,
  414. ) -> None:
  415. """Create an archive.
  416. Args:
  417. repo: Path of repository for which to generate an archive.
  418. committish: Commit SHA1 or ref to use
  419. outstream: Output stream (defaults to stdout)
  420. errstream: Error stream (defaults to stderr)
  421. """
  422. if committish is None:
  423. committish = "HEAD"
  424. with open_repo_closing(repo) as repo_obj:
  425. c = parse_commit(repo_obj, committish)
  426. tree = repo_obj.object_store[c.tree]
  427. assert isinstance(tree, Tree)
  428. for chunk in tar_stream(repo_obj.object_store, tree, c.commit_time):
  429. outstream.write(chunk)
  430. def update_server_info(repo: RepoPath = ".") -> None:
  431. """Update server info files for a repository.
  432. Args:
  433. repo: path to the repository
  434. """
  435. with open_repo_closing(repo) as r:
  436. server_update_server_info(r)
  437. def write_commit_graph(repo: RepoPath = ".", reachable: bool = True) -> None:
  438. """Write a commit graph file for a repository.
  439. Args:
  440. repo: path to the repository or a Repo object
  441. reachable: if True, include all commits reachable from refs.
  442. if False, only include direct ref targets.
  443. """
  444. with open_repo_closing(repo) as r:
  445. # Get all refs
  446. refs = list(r.refs.as_dict().values())
  447. if refs:
  448. r.object_store.write_commit_graph(refs, reachable=reachable)
  449. def symbolic_ref(
  450. repo: RepoPath, ref_name: Union[str, bytes], force: bool = False
  451. ) -> None:
  452. """Set git symbolic ref into HEAD.
  453. Args:
  454. repo: path to the repository
  455. ref_name: short name of the new ref
  456. force: force settings without checking if it exists in refs/heads
  457. """
  458. with open_repo_closing(repo) as repo_obj:
  459. ref_path = _make_branch_ref(ref_name)
  460. if not force and ref_path not in repo_obj.refs.keys():
  461. ref_name_str = (
  462. ref_name.decode("utf-8", "replace")
  463. if isinstance(ref_name, bytes)
  464. else ref_name
  465. )
  466. raise Error(f"fatal: ref `{ref_name_str}` is not a ref")
  467. repo_obj.refs.set_symbolic_ref(b"HEAD", ref_path)
  468. def pack_refs(repo: RepoPath, all: bool = False) -> None:
  469. """Pack loose references into packed-refs file."""
  470. with open_repo_closing(repo) as repo_obj:
  471. repo_obj.refs.pack_refs(all=all)
  472. def commit(
  473. repo: RepoPath = ".",
  474. message: Optional[Union[str, bytes, Callable[[Any, Commit], bytes]]] = None,
  475. author: Optional[bytes] = None,
  476. author_timezone: Optional[int] = None,
  477. committer: Optional[bytes] = None,
  478. commit_timezone: Optional[int] = None,
  479. encoding: Optional[bytes] = None,
  480. no_verify: bool = False,
  481. signoff: Optional[bool] = None,
  482. all: bool = False,
  483. amend: bool = False,
  484. sign: Optional[bool] = None,
  485. ) -> bytes:
  486. """Create a new commit.
  487. Args:
  488. repo: Path to repository
  489. message: Optional commit message (string/bytes or callable that takes
  490. (repo, commit) and returns bytes)
  491. author: Optional author name and email
  492. author_timezone: Author timestamp timezone
  493. committer: Optional committer name and email
  494. commit_timezone: Commit timestamp timezone
  495. encoding: Encoding to use for commit message
  496. no_verify: Skip pre-commit and commit-msg hooks
  497. signoff: Add Signed-off-by line to commit message. If None, uses format.signoff config.
  498. all: Automatically stage all tracked files that have been modified
  499. amend: Replace the tip of the current branch by creating a new commit
  500. sign: GPG sign the commit. If None, uses commit.gpgsign config.
  501. If True, signs with default GPG key. If False, does not sign.
  502. Returns: SHA1 of the new commit
  503. """
  504. encoding_str = encoding.decode("ascii") if encoding else DEFAULT_ENCODING
  505. if isinstance(message, str):
  506. message = message.encode(encoding_str)
  507. if isinstance(author, str):
  508. author = author.encode(encoding_str)
  509. if isinstance(committer, str):
  510. committer = committer.encode(encoding_str)
  511. local_timezone = get_user_timezones()
  512. if author_timezone is None:
  513. author_timezone = local_timezone[0]
  514. if commit_timezone is None:
  515. commit_timezone = local_timezone[1]
  516. with open_repo_closing(repo) as r:
  517. # Handle amend logic
  518. merge_heads = None
  519. if amend:
  520. try:
  521. head_commit = r[r.head()]
  522. assert isinstance(head_commit, Commit)
  523. except KeyError:
  524. raise ValueError("Cannot amend: no existing commit found")
  525. # If message not provided, use the message from the current HEAD
  526. if message is None:
  527. message = head_commit.message
  528. # If author not provided, use the author from the current HEAD
  529. if author is None:
  530. author = head_commit.author
  531. if author_timezone is None:
  532. author_timezone = head_commit.author_timezone
  533. # Use the parent(s) of the current HEAD as our parent(s)
  534. merge_heads = list(head_commit.parents)
  535. # If -a flag is used, stage all modified tracked files
  536. if all:
  537. index = r.open_index()
  538. normalizer = r.get_blob_normalizer()
  539. # Create a wrapper that handles the bytes -> Blob conversion
  540. if normalizer is not None:
  541. def filter_callback(data: bytes, path: bytes) -> bytes:
  542. from dulwich.objects import Blob
  543. blob = Blob()
  544. blob.data = data
  545. normalized_blob = normalizer.checkin_normalize(blob, path)
  546. data_bytes: bytes = normalized_blob.data
  547. return data_bytes
  548. else:
  549. filter_callback = None
  550. unstaged_changes = list(
  551. get_unstaged_changes(index, r.path, filter_callback)
  552. )
  553. if unstaged_changes:
  554. # Convert bytes paths to strings for add function
  555. modified_files: list[Union[str, bytes, os.PathLike[str]]] = []
  556. for path in unstaged_changes:
  557. if isinstance(path, bytes):
  558. modified_files.append(path.decode())
  559. else:
  560. modified_files.append(path)
  561. add(r, paths=modified_files)
  562. # For amend, create dangling commit to avoid adding current HEAD as parent
  563. if amend:
  564. commit_sha = r.get_worktree().commit(
  565. message=message,
  566. author=author,
  567. author_timezone=author_timezone,
  568. committer=committer,
  569. commit_timezone=commit_timezone,
  570. encoding=encoding,
  571. no_verify=no_verify,
  572. sign=sign,
  573. merge_heads=merge_heads,
  574. ref=None,
  575. )
  576. # Update HEAD to point to the new commit
  577. r.refs[b"HEAD"] = commit_sha
  578. return commit_sha
  579. else:
  580. return r.get_worktree().commit(
  581. message=message,
  582. author=author,
  583. author_timezone=author_timezone,
  584. committer=committer,
  585. commit_timezone=commit_timezone,
  586. encoding=encoding,
  587. no_verify=no_verify,
  588. sign=sign,
  589. merge_heads=merge_heads,
  590. )
  591. def commit_tree(
  592. repo: RepoPath,
  593. tree: bytes,
  594. message: Optional[Union[str, bytes]] = None,
  595. author: Optional[bytes] = None,
  596. committer: Optional[bytes] = None,
  597. ) -> bytes:
  598. """Create a new commit object.
  599. Args:
  600. repo: Path to repository
  601. tree: An existing tree object
  602. message: Commit message
  603. author: Optional author name and email
  604. committer: Optional committer name and email
  605. """
  606. with open_repo_closing(repo) as r:
  607. if isinstance(message, str):
  608. message = message.encode(DEFAULT_ENCODING)
  609. return r.get_worktree().commit(
  610. message=message, tree=tree, committer=committer, author=author
  611. )
  612. def init(
  613. path: Union[str, os.PathLike[str]] = ".",
  614. *,
  615. bare: bool = False,
  616. symlinks: Optional[bool] = None,
  617. ) -> Repo:
  618. """Create a new git repository.
  619. Args:
  620. path: Path to repository.
  621. bare: Whether to create a bare repository.
  622. symlinks: Whether to create actual symlinks (defaults to autodetect)
  623. Returns: A Repo instance
  624. """
  625. if not os.path.exists(path):
  626. os.mkdir(path)
  627. if bare:
  628. return Repo.init_bare(path)
  629. else:
  630. return Repo.init(path, symlinks=symlinks)
  631. def clone(
  632. source: Union[str, bytes, Repo],
  633. target: Optional[Union[str, os.PathLike[str]]] = None,
  634. bare: bool = False,
  635. checkout: Optional[bool] = None,
  636. errstream: Union[BinaryIO, RawIOBase] = default_bytes_err_stream,
  637. outstream: Optional[BinaryIO] = None,
  638. origin: Optional[str] = "origin",
  639. depth: Optional[int] = None,
  640. branch: Optional[Union[str, bytes]] = None,
  641. config: Optional[Config] = None,
  642. filter_spec: Optional[str] = None,
  643. protocol_version: Optional[int] = None,
  644. recurse_submodules: bool = False,
  645. **kwargs: Union[Union[str, bytes], Sequence[Union[str, bytes]]],
  646. ) -> Repo:
  647. """Clone a local or remote git repository.
  648. Args:
  649. source: Path or URL for source repository
  650. target: Path to target repository (optional)
  651. bare: Whether or not to create a bare repository
  652. checkout: Whether or not to check-out HEAD after cloning
  653. errstream: Optional stream to write progress to
  654. outstream: Optional stream to write progress to (deprecated)
  655. origin: Name of remote from the repository used to clone
  656. depth: Depth to fetch at
  657. branch: Optional branch or tag to be used as HEAD in the new repository
  658. instead of the cloned repository's HEAD.
  659. config: Configuration to use
  660. filter_spec: A git-rev-list-style object filter spec, as an ASCII string.
  661. Only used if the server supports the Git protocol-v2 'filter'
  662. feature, and ignored otherwise.
  663. protocol_version: desired Git protocol version. By default the highest
  664. mutually supported protocol version will be used.
  665. recurse_submodules: Whether to initialize and clone submodules
  666. **kwargs: Additional keyword arguments including refspecs to fetch.
  667. Can be a bytestring, a string, or a list of bytestring/string.
  668. Returns: The new repository
  669. """
  670. if outstream is not None:
  671. import warnings
  672. warnings.warn(
  673. "outstream= has been deprecated in favour of errstream=.",
  674. DeprecationWarning,
  675. stacklevel=3,
  676. )
  677. # TODO(jelmer): Capture logging output and stream to errstream
  678. if config is None:
  679. config = StackedConfig.default()
  680. if checkout is None:
  681. checkout = not bare
  682. if checkout and bare:
  683. raise Error("checkout and bare are incompatible")
  684. if target is None:
  685. if isinstance(source, Repo):
  686. raise ValueError("target must be specified when cloning from a Repo object")
  687. elif isinstance(source, bytes):
  688. target = source.split(b"/")[-1].decode()
  689. else:
  690. target = source.split("/")[-1]
  691. if isinstance(branch, str):
  692. branch = branch.encode(DEFAULT_ENCODING)
  693. mkdir = not os.path.exists(target)
  694. if isinstance(source, Repo):
  695. # For direct repo cloning, use LocalGitClient
  696. from .client import GitClient, LocalGitClient
  697. client: GitClient = LocalGitClient(config=config)
  698. path = source.path
  699. else:
  700. source_str = source.decode() if isinstance(source, bytes) else source
  701. (client, path) = get_transport_and_path(source_str, config=config, **kwargs) # type: ignore[arg-type]
  702. filter_spec_bytes: Optional[bytes] = None
  703. if filter_spec:
  704. filter_spec_bytes = filter_spec.encode("ascii")
  705. repo = client.clone(
  706. path,
  707. str(target), # Convert PathLike to str
  708. mkdir=mkdir,
  709. bare=bare,
  710. origin=origin,
  711. checkout=checkout,
  712. branch=branch.decode() if branch else None, # Convert bytes to str
  713. progress=lambda data: (errstream.write(data), None)[1],
  714. depth=depth,
  715. filter_spec=filter_spec_bytes,
  716. protocol_version=protocol_version,
  717. )
  718. # Initialize and update submodules if requested
  719. if recurse_submodules and not bare:
  720. try:
  721. submodule_init(repo)
  722. submodule_update(repo, init=True)
  723. except FileNotFoundError as e:
  724. # .gitmodules file doesn't exist - no submodules to process
  725. logging.debug("No .gitmodules file found: %s", e)
  726. except KeyError as e:
  727. # Submodule configuration missing
  728. logging.warning("Submodule configuration error: %s", e)
  729. if errstream:
  730. errstream.write(
  731. f"Warning: Submodule configuration error: {e}\n".encode()
  732. )
  733. return repo
  734. def add(
  735. repo: Union[str, os.PathLike[str], Repo] = ".",
  736. paths: Optional[
  737. Union[
  738. Sequence[Union[str, bytes, os.PathLike[str]]], str, bytes, os.PathLike[str]
  739. ]
  740. ] = None,
  741. ) -> tuple[list[str], set[str]]:
  742. """Add files to the staging area.
  743. Args:
  744. repo: Repository for the files
  745. paths: Paths to add. If None, stages all untracked and modified files from the
  746. current working directory (mimicking 'git add .' behavior).
  747. Returns: Tuple with set of added files and ignored files
  748. If the repository contains ignored directories, the returned set will
  749. contain the path to an ignored directory (with trailing slash). Individual
  750. files within ignored directories will not be returned.
  751. Note: When paths=None, this function adds all untracked and modified files
  752. from the entire repository, mimicking 'git add -A' behavior.
  753. """
  754. ignored = set()
  755. with open_repo_closing(repo) as r:
  756. repo_path = Path(r.path).resolve()
  757. ignore_manager = IgnoreFilterManager.from_repo(r)
  758. # Get unstaged changes once for the entire operation
  759. index = r.open_index()
  760. normalizer = r.get_blob_normalizer()
  761. if normalizer is not None:
  762. def filter_callback(data: bytes, path: bytes) -> bytes:
  763. from dulwich.objects import Blob
  764. blob = Blob()
  765. blob.data = data
  766. normalized_blob = normalizer.checkin_normalize(blob, path)
  767. data_bytes: bytes = normalized_blob.data
  768. return data_bytes
  769. else:
  770. filter_callback = None
  771. # Check if core.preloadIndex is enabled
  772. config = r.get_config_stack()
  773. preload_index = config.get_boolean(b"core", b"preloadIndex", False)
  774. all_unstaged_paths = list(
  775. get_unstaged_changes(index, r.path, filter_callback, preload_index)
  776. )
  777. if not paths:
  778. # When no paths specified, add all untracked and modified files from repo root
  779. paths = [str(repo_path)]
  780. relpaths = []
  781. if isinstance(paths, (str, bytes, os.PathLike)):
  782. paths = [paths]
  783. for p in paths:
  784. # Handle bytes paths by decoding them
  785. if isinstance(p, bytes):
  786. p = p.decode("utf-8")
  787. path = Path(p)
  788. if not path.is_absolute():
  789. # Make relative paths relative to the repo directory
  790. path = repo_path / path
  791. # Don't resolve symlinks completely - only resolve the parent directory
  792. # to avoid issues when symlinks point outside the repository
  793. if path.is_symlink():
  794. # For symlinks, resolve only the parent directory
  795. parent_resolved = path.parent.resolve()
  796. resolved_path = parent_resolved / path.name
  797. else:
  798. # For regular files/dirs, resolve normally
  799. resolved_path = path.resolve()
  800. try:
  801. relpath = str(resolved_path.relative_to(repo_path)).replace(os.sep, "/")
  802. except ValueError as e:
  803. # Path is not within the repository
  804. p_str = p.decode() if isinstance(p, bytes) else str(p)
  805. raise ValueError(
  806. f"Path {p_str} is not within repository {repo_path}"
  807. ) from e
  808. # Handle directories by scanning their contents
  809. if resolved_path.is_dir():
  810. # Check if the directory itself is ignored
  811. dir_relpath = posixpath.join(relpath, "") if relpath != "." else ""
  812. if dir_relpath and ignore_manager.is_ignored(dir_relpath):
  813. ignored.add(dir_relpath)
  814. continue
  815. # When adding a directory, add all untracked files within it
  816. current_untracked = list(
  817. get_untracked_paths(
  818. str(resolved_path),
  819. str(repo_path),
  820. index,
  821. )
  822. )
  823. for untracked_path in current_untracked:
  824. # If we're scanning a subdirectory, adjust the path
  825. if relpath != ".":
  826. untracked_path = posixpath.join(relpath, untracked_path)
  827. if not ignore_manager.is_ignored(untracked_path):
  828. relpaths.append(untracked_path)
  829. else:
  830. ignored.add(untracked_path)
  831. # Also add unstaged (modified) files within this directory
  832. for unstaged_path in all_unstaged_paths:
  833. if isinstance(unstaged_path, bytes):
  834. unstaged_path_str = unstaged_path.decode("utf-8")
  835. else:
  836. unstaged_path_str = unstaged_path
  837. # Check if this unstaged file is within the directory we're processing
  838. unstaged_full_path = repo_path / unstaged_path_str
  839. try:
  840. unstaged_full_path.relative_to(resolved_path)
  841. # File is within this directory, add it
  842. if not ignore_manager.is_ignored(unstaged_path_str):
  843. relpaths.append(unstaged_path_str)
  844. else:
  845. ignored.add(unstaged_path_str)
  846. except ValueError:
  847. # File is not within this directory, skip it
  848. continue
  849. continue
  850. # FIXME: Support patterns
  851. if ignore_manager.is_ignored(relpath):
  852. ignored.add(relpath)
  853. continue
  854. relpaths.append(relpath)
  855. r.get_worktree().stage(relpaths)
  856. return (relpaths, ignored)
  857. def _is_subdir(
  858. subdir: Union[str, os.PathLike[str]], parentdir: Union[str, os.PathLike[str]]
  859. ) -> bool:
  860. """Check whether subdir is parentdir or a subdir of parentdir.
  861. If parentdir or subdir is a relative path, it will be disamgibuated
  862. relative to the pwd.
  863. """
  864. parentdir_abs = os.path.realpath(parentdir) + os.path.sep
  865. subdir_abs = os.path.realpath(subdir) + os.path.sep
  866. return subdir_abs.startswith(parentdir_abs)
  867. # TODO: option to remove ignored files also, in line with `git clean -fdx`
  868. def clean(
  869. repo: Union[str, os.PathLike[str], Repo] = ".",
  870. target_dir: Optional[Union[str, os.PathLike[str]]] = None,
  871. ) -> None:
  872. """Remove any untracked files from the target directory recursively.
  873. Equivalent to running ``git clean -fd`` in target_dir.
  874. Args:
  875. repo: Repository where the files may be tracked
  876. target_dir: Directory to clean - current directory if None
  877. """
  878. if target_dir is None:
  879. target_dir = os.getcwd()
  880. with open_repo_closing(repo) as r:
  881. if not _is_subdir(target_dir, r.path):
  882. raise Error("target_dir must be in the repo's working dir")
  883. config = r.get_config_stack()
  884. config.get_boolean((b"clean",), b"requireForce", True)
  885. # TODO(jelmer): if require_force is set, then make sure that -f, -i or
  886. # -n is specified.
  887. index = r.open_index()
  888. ignore_manager = IgnoreFilterManager.from_repo(r)
  889. paths_in_wd = _walk_working_dir_paths(target_dir, r.path)
  890. # Reverse file visit order, so that files and subdirectories are
  891. # removed before containing directory
  892. for ap, is_dir in reversed(list(paths_in_wd)):
  893. # target_dir and r.path are both str, so ap must be str
  894. assert isinstance(ap, str)
  895. if is_dir:
  896. # All subdirectories and files have been removed if untracked,
  897. # so dir contains no tracked files iff it is empty.
  898. is_empty = len(os.listdir(ap)) == 0
  899. if is_empty:
  900. os.rmdir(ap)
  901. else:
  902. ip = path_to_tree_path(r.path, ap)
  903. is_tracked = ip in index
  904. rp = os.path.relpath(ap, r.path)
  905. is_ignored = ignore_manager.is_ignored(rp)
  906. if not is_tracked and not is_ignored:
  907. os.remove(ap)
  908. def remove(
  909. repo: Union[str, os.PathLike[str], Repo] = ".",
  910. paths: Sequence[Union[str, bytes, os.PathLike[str]]] = [],
  911. cached: bool = False,
  912. ) -> None:
  913. """Remove files from the staging area.
  914. Args:
  915. repo: Repository for the files
  916. paths: Paths to remove. Can be absolute or relative to the repository root.
  917. cached: Only remove from index, not from working directory
  918. """
  919. with open_repo_closing(repo) as r:
  920. index = r.open_index()
  921. blob_normalizer = r.get_blob_normalizer()
  922. for p in paths:
  923. # If path is absolute, use it as-is. Otherwise, treat it as relative to repo
  924. p_str = os.fsdecode(p) if isinstance(p, bytes) else str(p)
  925. if os.path.isabs(p_str):
  926. full_path = p_str
  927. else:
  928. # Treat relative paths as relative to the repository root
  929. full_path = os.path.join(r.path, p_str)
  930. tree_path = path_to_tree_path(r.path, full_path)
  931. # Convert to bytes for file operations
  932. full_path_bytes = os.fsencode(full_path)
  933. try:
  934. entry = index[tree_path]
  935. if isinstance(entry, ConflictedIndexEntry):
  936. raise Error(f"{p_str} has conflicts in the index")
  937. index_sha = entry.sha
  938. except KeyError as exc:
  939. raise Error(f"{p_str} did not match any files") from exc
  940. if not cached:
  941. try:
  942. st = os.lstat(full_path_bytes)
  943. except OSError:
  944. pass
  945. else:
  946. try:
  947. blob = blob_from_path_and_stat(full_path_bytes, st)
  948. # Apply checkin normalization to compare apples to apples
  949. if blob_normalizer is not None:
  950. blob = blob_normalizer.checkin_normalize(blob, tree_path)
  951. except OSError:
  952. pass
  953. else:
  954. try:
  955. head_commit = r[r.head()]
  956. assert isinstance(head_commit, Commit)
  957. committed_sha = tree_lookup_path(
  958. r.__getitem__, head_commit.tree, tree_path
  959. )[1]
  960. except KeyError:
  961. committed_sha = None
  962. if blob.id != index_sha and index_sha != committed_sha:
  963. raise Error(
  964. "file has staged content differing "
  965. f"from both the file and head: {p_str}"
  966. )
  967. if index_sha != committed_sha:
  968. raise Error(f"file has staged changes: {p_str}")
  969. os.remove(full_path_bytes)
  970. del index[tree_path]
  971. index.write()
  972. rm = remove
  973. def mv(
  974. repo: Union[str, os.PathLike[str], Repo],
  975. source: Union[str, bytes, os.PathLike[str]],
  976. destination: Union[str, bytes, os.PathLike[str]],
  977. force: bool = False,
  978. ) -> None:
  979. """Move or rename a file, directory, or symlink.
  980. Args:
  981. repo: Path to the repository
  982. source: Path to move from
  983. destination: Path to move to
  984. force: Force move even if destination exists
  985. Raises:
  986. Error: If source doesn't exist, is not tracked, or destination already exists (without force)
  987. """
  988. with open_repo_closing(repo) as r:
  989. index = r.open_index()
  990. # Handle paths - convert to string if necessary
  991. if isinstance(source, bytes):
  992. source = source.decode(sys.getfilesystemencoding())
  993. elif hasattr(source, "__fspath__"):
  994. source = os.fspath(source)
  995. else:
  996. source = str(source)
  997. if isinstance(destination, bytes):
  998. destination = destination.decode(sys.getfilesystemencoding())
  999. elif hasattr(destination, "__fspath__"):
  1000. destination = os.fspath(destination)
  1001. else:
  1002. destination = str(destination)
  1003. # Get full paths
  1004. if os.path.isabs(source):
  1005. source_full_path = source
  1006. else:
  1007. # Treat relative paths as relative to the repository root
  1008. source_full_path = os.path.join(r.path, source)
  1009. if os.path.isabs(destination):
  1010. destination_full_path = destination
  1011. else:
  1012. # Treat relative paths as relative to the repository root
  1013. destination_full_path = os.path.join(r.path, destination)
  1014. # Check if destination is a directory
  1015. if os.path.isdir(destination_full_path):
  1016. # Move source into destination directory
  1017. basename = os.path.basename(source_full_path)
  1018. destination_full_path = os.path.join(destination_full_path, basename)
  1019. # Convert to tree paths for index
  1020. source_tree_path = path_to_tree_path(r.path, source_full_path)
  1021. destination_tree_path = path_to_tree_path(r.path, destination_full_path)
  1022. # Check if source exists in index
  1023. if source_tree_path not in index:
  1024. raise Error(f"source '{source}' is not under version control")
  1025. # Check if source exists in filesystem
  1026. if not os.path.exists(source_full_path):
  1027. raise Error(f"source '{source}' does not exist")
  1028. # Check if destination already exists
  1029. if os.path.exists(destination_full_path) and not force:
  1030. raise Error(f"destination '{destination}' already exists (use -f to force)")
  1031. # Check if destination is already in index
  1032. if destination_tree_path in index and not force:
  1033. raise Error(
  1034. f"destination '{destination}' already exists in index (use -f to force)"
  1035. )
  1036. # Get the index entry for the source
  1037. source_entry = index[source_tree_path]
  1038. # Convert to bytes for file operations
  1039. source_full_path_bytes = os.fsencode(source_full_path)
  1040. destination_full_path_bytes = os.fsencode(destination_full_path)
  1041. # Create parent directory for destination if needed
  1042. dest_dir = os.path.dirname(destination_full_path_bytes)
  1043. if dest_dir and not os.path.exists(dest_dir):
  1044. os.makedirs(dest_dir)
  1045. # Move the file in the filesystem
  1046. if os.path.exists(destination_full_path_bytes) and force:
  1047. os.remove(destination_full_path_bytes)
  1048. os.rename(source_full_path_bytes, destination_full_path_bytes)
  1049. # Update the index
  1050. del index[source_tree_path]
  1051. index[destination_tree_path] = source_entry
  1052. index.write()
  1053. move = mv
  1054. def commit_decode(
  1055. commit: Commit, contents: bytes, default_encoding: str = DEFAULT_ENCODING
  1056. ) -> str:
  1057. """Decode commit contents using the commit's encoding or default."""
  1058. if commit.encoding:
  1059. encoding = commit.encoding.decode("ascii")
  1060. else:
  1061. encoding = default_encoding
  1062. return contents.decode(encoding, "replace")
  1063. def commit_encode(
  1064. commit: Commit, contents: str, default_encoding: str = DEFAULT_ENCODING
  1065. ) -> bytes:
  1066. """Encode commit contents using the commit's encoding or default."""
  1067. if commit.encoding:
  1068. encoding = commit.encoding.decode("ascii")
  1069. else:
  1070. encoding = default_encoding
  1071. return contents.encode(encoding)
  1072. def print_commit(
  1073. commit: Commit,
  1074. decode: Callable[[bytes], str],
  1075. outstream: TextIO = sys.stdout,
  1076. ) -> None:
  1077. """Write a human-readable commit log entry.
  1078. Args:
  1079. commit: A `Commit` object
  1080. decode: Function to decode commit data
  1081. outstream: A stream file to write to
  1082. """
  1083. outstream.write("-" * 50 + "\n")
  1084. outstream.write("commit: " + commit.id.decode("ascii") + "\n")
  1085. if len(commit.parents) > 1:
  1086. outstream.write(
  1087. "merge: "
  1088. + "...".join([c.decode("ascii") for c in commit.parents[1:]])
  1089. + "\n"
  1090. )
  1091. outstream.write("Author: " + decode(commit.author) + "\n")
  1092. if commit.author != commit.committer:
  1093. outstream.write("Committer: " + decode(commit.committer) + "\n")
  1094. time_tuple = time.gmtime(commit.author_time + commit.author_timezone)
  1095. time_str = time.strftime("%a %b %d %Y %H:%M:%S", time_tuple)
  1096. timezone_str = format_timezone(commit.author_timezone).decode("ascii")
  1097. outstream.write("Date: " + time_str + " " + timezone_str + "\n")
  1098. if commit.message:
  1099. outstream.write("\n")
  1100. outstream.write(decode(commit.message) + "\n")
  1101. outstream.write("\n")
  1102. def print_tag(
  1103. tag: Tag, decode: Callable[[bytes], str], outstream: TextIO = sys.stdout
  1104. ) -> None:
  1105. """Write a human-readable tag.
  1106. Args:
  1107. tag: A `Tag` object
  1108. decode: Function for decoding bytes to unicode string
  1109. outstream: A stream to write to
  1110. """
  1111. outstream.write("Tagger: " + decode(tag.tagger) + "\n")
  1112. time_tuple = time.gmtime(tag.tag_time + tag.tag_timezone)
  1113. time_str = time.strftime("%a %b %d %Y %H:%M:%S", time_tuple)
  1114. timezone_str = format_timezone(tag.tag_timezone).decode("ascii")
  1115. outstream.write("Date: " + time_str + " " + timezone_str + "\n")
  1116. outstream.write("\n")
  1117. outstream.write(decode(tag.message))
  1118. outstream.write("\n")
  1119. def show_blob(
  1120. repo: RepoPath,
  1121. blob: Blob,
  1122. decode: Callable[[bytes], str],
  1123. outstream: TextIO = sys.stdout,
  1124. ) -> None:
  1125. """Write a blob to a stream.
  1126. Args:
  1127. repo: A `Repo` object
  1128. blob: A `Blob` object
  1129. decode: Function for decoding bytes to unicode string
  1130. outstream: A stream file to write to
  1131. """
  1132. outstream.write(decode(blob.data))
  1133. def show_commit(
  1134. repo: RepoPath,
  1135. commit: Commit,
  1136. decode: Callable[[bytes], str],
  1137. outstream: TextIO = sys.stdout,
  1138. ) -> None:
  1139. """Show a commit to a stream.
  1140. Args:
  1141. repo: A `Repo` object
  1142. commit: A `Commit` object
  1143. decode: Function for decoding bytes to unicode string
  1144. outstream: Stream to write to
  1145. """
  1146. from .diff import ColorizedDiffStream
  1147. # Create a wrapper for ColorizedDiffStream to handle string/bytes conversion
  1148. class _StreamWrapper:
  1149. def __init__(self, stream: "ColorizedDiffStream") -> None:
  1150. self.stream = stream
  1151. def write(self, data: Union[str, bytes]) -> None:
  1152. if isinstance(data, str):
  1153. # Convert string to bytes for ColorizedDiffStream
  1154. self.stream.write(data.encode("utf-8"))
  1155. else:
  1156. self.stream.write(data)
  1157. with open_repo_closing(repo) as r:
  1158. # Use wrapper for ColorizedDiffStream, direct stream for others
  1159. if isinstance(outstream, ColorizedDiffStream):
  1160. wrapped_stream = _StreamWrapper(outstream)
  1161. print_commit(commit, decode=decode, outstream=wrapped_stream)
  1162. # Write diff directly to the ColorizedDiffStream as bytes
  1163. write_tree_diff(
  1164. outstream,
  1165. r.object_store,
  1166. commit.parents[0] if commit.parents else None,
  1167. commit.tree,
  1168. )
  1169. else:
  1170. print_commit(commit, decode=decode, outstream=outstream)
  1171. if commit.parents:
  1172. parent_commit = r[commit.parents[0]]
  1173. assert isinstance(parent_commit, Commit)
  1174. base_tree = parent_commit.tree
  1175. else:
  1176. base_tree = None
  1177. # Traditional path: buffer diff and write as decoded text
  1178. diffstream = BytesIO()
  1179. write_tree_diff(diffstream, r.object_store, base_tree, commit.tree)
  1180. diffstream.seek(0)
  1181. outstream.write(commit_decode(commit, diffstream.getvalue()))
  1182. def show_tree(
  1183. repo: RepoPath,
  1184. tree: Tree,
  1185. decode: Callable[[bytes], str],
  1186. outstream: TextIO = sys.stdout,
  1187. ) -> None:
  1188. """Print a tree to a stream.
  1189. Args:
  1190. repo: A `Repo` object
  1191. tree: A `Tree` object
  1192. decode: Function for decoding bytes to unicode string
  1193. outstream: Stream to write to
  1194. """
  1195. for n in tree:
  1196. outstream.write(decode(n) + "\n")
  1197. def show_tag(
  1198. repo: RepoPath,
  1199. tag: Tag,
  1200. decode: Callable[[bytes], str],
  1201. outstream: TextIO = sys.stdout,
  1202. ) -> None:
  1203. """Print a tag to a stream.
  1204. Args:
  1205. repo: A `Repo` object
  1206. tag: A `Tag` object
  1207. decode: Function for decoding bytes to unicode string
  1208. outstream: Stream to write to
  1209. """
  1210. with open_repo_closing(repo) as r:
  1211. print_tag(tag, decode, outstream)
  1212. obj = r[tag.object[1]]
  1213. assert isinstance(obj, (Tree, Blob, Commit, Tag))
  1214. show_object(repo, obj, decode, outstream)
  1215. def show_object(
  1216. repo: RepoPath,
  1217. obj: Union[Tree, Blob, Commit, Tag],
  1218. decode: Callable[[bytes], str],
  1219. outstream: TextIO,
  1220. ) -> None:
  1221. """Show details of a git object."""
  1222. handlers: dict[bytes, Callable[[RepoPath, Any, Any, TextIO], None]] = {
  1223. b"tree": show_tree,
  1224. b"blob": show_blob,
  1225. b"commit": show_commit,
  1226. b"tag": show_tag,
  1227. }
  1228. handler = handlers.get(obj.type_name)
  1229. if handler is None:
  1230. raise ValueError(f"Unknown object type: {obj.type_name.decode()}")
  1231. handler(repo, obj, decode, outstream)
  1232. def print_name_status(changes: Iterator[TreeChange]) -> Iterator[str]:
  1233. """Print a simple status summary, listing changed files."""
  1234. for change in changes:
  1235. if not change:
  1236. continue
  1237. if isinstance(change, list):
  1238. change = change[0]
  1239. if change.type == CHANGE_ADD:
  1240. assert change.new is not None
  1241. path1 = change.new.path
  1242. assert path1 is not None
  1243. path2 = b""
  1244. kind = "A"
  1245. elif change.type == CHANGE_DELETE:
  1246. assert change.old is not None
  1247. path1 = change.old.path
  1248. assert path1 is not None
  1249. path2 = b""
  1250. kind = "D"
  1251. elif change.type == CHANGE_MODIFY:
  1252. assert change.new is not None
  1253. path1 = change.new.path
  1254. assert path1 is not None
  1255. path2 = b""
  1256. kind = "M"
  1257. elif change.type in RENAME_CHANGE_TYPES:
  1258. assert change.old is not None and change.new is not None
  1259. path1 = change.old.path
  1260. assert path1 is not None
  1261. path2_opt = change.new.path
  1262. assert path2_opt is not None
  1263. path2 = path2_opt
  1264. if change.type == CHANGE_RENAME:
  1265. kind = "R"
  1266. elif change.type == CHANGE_COPY:
  1267. kind = "C"
  1268. path1_str = (
  1269. path1.decode("utf-8", errors="replace")
  1270. if isinstance(path1, bytes)
  1271. else path1
  1272. )
  1273. path2_str = (
  1274. path2.decode("utf-8", errors="replace")
  1275. if isinstance(path2, bytes)
  1276. else path2
  1277. )
  1278. yield f"{kind:<8}{path1_str:<20}{path2_str:<20}"
  1279. def log(
  1280. repo: RepoPath = ".",
  1281. paths: Optional[Sequence[Union[str, bytes]]] = None,
  1282. outstream: TextIO = sys.stdout,
  1283. max_entries: Optional[int] = None,
  1284. reverse: bool = False,
  1285. name_status: bool = False,
  1286. ) -> None:
  1287. """Write commit logs.
  1288. Args:
  1289. repo: Path to repository
  1290. paths: Optional set of specific paths to print entries for
  1291. outstream: Stream to write log output to
  1292. reverse: Reverse order in which entries are printed
  1293. name_status: Print name status
  1294. max_entries: Optional maximum number of entries to display
  1295. """
  1296. with open_repo_closing(repo) as r:
  1297. try:
  1298. include = [r.head()]
  1299. except KeyError:
  1300. include = []
  1301. # Convert paths to bytes if needed
  1302. paths_bytes = None
  1303. if paths:
  1304. paths_bytes = [p.encode() if isinstance(p, str) else p for p in paths]
  1305. walker = r.get_walker(
  1306. include=include, max_entries=max_entries, paths=paths_bytes, reverse=reverse
  1307. )
  1308. for entry in walker:
  1309. def decode_wrapper(x: bytes) -> str:
  1310. return commit_decode(entry.commit, x)
  1311. print_commit(entry.commit, decode_wrapper, outstream)
  1312. if name_status:
  1313. outstream.writelines(
  1314. [
  1315. line + "\n"
  1316. for line in print_name_status(
  1317. cast(Iterator[TreeChange], entry.changes())
  1318. )
  1319. ]
  1320. )
  1321. # TODO(jelmer): better default for encoding?
  1322. def show(
  1323. repo: RepoPath = ".",
  1324. objects: Optional[Sequence[Union[str, bytes]]] = None,
  1325. outstream: TextIO = sys.stdout,
  1326. default_encoding: str = DEFAULT_ENCODING,
  1327. ) -> None:
  1328. """Print the changes in a commit.
  1329. Args:
  1330. repo: Path to repository
  1331. objects: Objects to show (defaults to [HEAD])
  1332. outstream: Stream to write to
  1333. default_encoding: Default encoding to use if none is set in the
  1334. commit
  1335. """
  1336. if objects is None:
  1337. objects = ["HEAD"]
  1338. if isinstance(objects, (str, bytes)):
  1339. objects = [objects]
  1340. with open_repo_closing(repo) as r:
  1341. for objectish in objects:
  1342. o = parse_object(r, objectish)
  1343. if isinstance(o, Commit):
  1344. def decode(x: bytes) -> str:
  1345. return commit_decode(o, x, default_encoding)
  1346. else:
  1347. def decode(x: bytes) -> str:
  1348. return x.decode(default_encoding)
  1349. assert isinstance(o, (Tree, Blob, Commit, Tag))
  1350. show_object(r, o, decode, outstream)
  1351. def diff_tree(
  1352. repo: RepoPath,
  1353. old_tree: Union[str, bytes, Tree],
  1354. new_tree: Union[str, bytes, Tree],
  1355. outstream: BinaryIO = default_bytes_out_stream,
  1356. ) -> None:
  1357. """Compares the content and mode of blobs found via two tree objects.
  1358. Args:
  1359. repo: Path to repository
  1360. old_tree: Id of old tree
  1361. new_tree: Id of new tree
  1362. outstream: Stream to write to
  1363. """
  1364. with open_repo_closing(repo) as r:
  1365. if isinstance(old_tree, Tree):
  1366. old_tree_id: Optional[bytes] = old_tree.id
  1367. elif isinstance(old_tree, str):
  1368. old_tree_id = old_tree.encode()
  1369. else:
  1370. old_tree_id = old_tree
  1371. if isinstance(new_tree, Tree):
  1372. new_tree_id: Optional[bytes] = new_tree.id
  1373. elif isinstance(new_tree, str):
  1374. new_tree_id = new_tree.encode()
  1375. else:
  1376. new_tree_id = new_tree
  1377. write_tree_diff(outstream, r.object_store, old_tree_id, new_tree_id)
  1378. def diff(
  1379. repo: RepoPath = ".",
  1380. commit: Optional[Union[str, bytes, Commit]] = None,
  1381. commit2: Optional[Union[str, bytes, Commit]] = None,
  1382. staged: bool = False,
  1383. paths: Optional[Sequence[Union[str, bytes]]] = None,
  1384. outstream: BinaryIO = default_bytes_out_stream,
  1385. diff_algorithm: Optional[str] = None,
  1386. ) -> None:
  1387. """Show diff.
  1388. Args:
  1389. repo: Path to repository
  1390. commit: First commit to compare. If staged is True, compare
  1391. index to this commit. If staged is False, compare working tree
  1392. to this commit. If None, defaults to HEAD for staged and index
  1393. for unstaged.
  1394. commit2: Second commit to compare against first commit. If provided,
  1395. show diff between commit and commit2 (ignoring staged flag).
  1396. staged: If True, show staged changes (index vs commit).
  1397. If False, show unstaged changes (working tree vs commit/index).
  1398. Ignored if commit2 is provided.
  1399. paths: Optional list of paths to limit diff
  1400. outstream: Stream to write to
  1401. diff_algorithm: Algorithm to use for diffing ("myers" or "patience"),
  1402. defaults to the underlying function's default if None
  1403. """
  1404. from . import diff as diff_module
  1405. with open_repo_closing(repo) as r:
  1406. # Normalize paths to bytes
  1407. byte_paths: Optional[list[bytes]] = None
  1408. if paths is not None and paths: # Check if paths is not empty
  1409. byte_paths = []
  1410. for p in paths:
  1411. if isinstance(p, str):
  1412. byte_paths.append(p.encode("utf-8"))
  1413. else:
  1414. byte_paths.append(p)
  1415. elif paths == []: # Convert empty list to None
  1416. byte_paths = None
  1417. else:
  1418. byte_paths = None
  1419. # Resolve commit refs to SHAs if provided
  1420. if commit is not None:
  1421. if isinstance(commit, Commit):
  1422. # Already a Commit object
  1423. commit_sha = commit.id
  1424. commit_obj = commit
  1425. else:
  1426. # parse_commit handles both refs and SHAs, and always returns a Commit object
  1427. commit_obj = parse_commit(r, commit)
  1428. commit_sha = commit_obj.id
  1429. else:
  1430. commit_sha = None
  1431. commit_obj = None
  1432. if commit2 is not None:
  1433. # Compare two commits
  1434. if isinstance(commit2, Commit):
  1435. commit2_obj = commit2
  1436. else:
  1437. commit2_obj = parse_commit(r, commit2)
  1438. # Get trees from commits
  1439. old_tree = commit_obj.tree if commit_obj else None
  1440. new_tree = commit2_obj.tree
  1441. # Use tree_changes to get the changes and apply path filtering
  1442. changes = r.object_store.tree_changes(old_tree, new_tree)
  1443. for (oldpath, newpath), (oldmode, newmode), (oldsha, newsha) in changes:
  1444. # Skip if paths are specified and this change doesn't match
  1445. if byte_paths:
  1446. path_to_check = newpath or oldpath
  1447. assert path_to_check is not None
  1448. if not any(
  1449. path_to_check == p or path_to_check.startswith(p + b"/")
  1450. for p in byte_paths
  1451. ):
  1452. continue
  1453. write_object_diff(
  1454. outstream,
  1455. r.object_store,
  1456. (oldpath, oldmode, oldsha),
  1457. (newpath, newmode, newsha),
  1458. diff_algorithm=diff_algorithm,
  1459. )
  1460. elif staged:
  1461. # Show staged changes (index vs commit)
  1462. diff_module.diff_index_to_tree(
  1463. r, outstream, commit_sha, byte_paths, diff_algorithm=diff_algorithm
  1464. )
  1465. elif commit is not None:
  1466. # Compare working tree to a specific commit
  1467. assert (
  1468. commit_sha is not None
  1469. ) # mypy: commit_sha is set when commit is not None
  1470. diff_module.diff_working_tree_to_tree(
  1471. r, outstream, commit_sha, byte_paths, diff_algorithm=diff_algorithm
  1472. )
  1473. else:
  1474. # Compare working tree to index
  1475. diff_module.diff_working_tree_to_index(
  1476. r, outstream, byte_paths, diff_algorithm=diff_algorithm
  1477. )
  1478. def rev_list(
  1479. repo: RepoPath,
  1480. commits: Sequence[Union[str, bytes]],
  1481. outstream: BinaryIO = default_bytes_out_stream,
  1482. ) -> None:
  1483. """Lists commit objects in reverse chronological order.
  1484. Args:
  1485. repo: Path to repository
  1486. commits: Commits over which to iterate
  1487. outstream: Stream to write to
  1488. """
  1489. with open_repo_closing(repo) as r:
  1490. for entry in r.get_walker(
  1491. include=[r[c if isinstance(c, bytes) else c.encode()].id for c in commits]
  1492. ):
  1493. outstream.write(entry.commit.id + b"\n")
  1494. def _canonical_part(url: str) -> str:
  1495. name = url.rsplit("/", 1)[-1]
  1496. if name.endswith(".git"):
  1497. name = name[:-4]
  1498. return name
  1499. def submodule_add(
  1500. repo: Union[str, os.PathLike[str], Repo],
  1501. url: str,
  1502. path: Optional[Union[str, os.PathLike[str]]] = None,
  1503. name: Optional[str] = None,
  1504. ) -> None:
  1505. """Add a new submodule.
  1506. Args:
  1507. repo: Path to repository
  1508. url: URL of repository to add as submodule
  1509. path: Path where submodule should live
  1510. name: Name for the submodule
  1511. """
  1512. with open_repo_closing(repo) as r:
  1513. if path is None:
  1514. path = os.path.relpath(_canonical_part(url), r.path)
  1515. if name is None:
  1516. name = os.fsdecode(path) if path is not None else None
  1517. if name is None:
  1518. raise Error("Submodule name must be specified or derivable from path")
  1519. # TODO(jelmer): Move this logic to dulwich.submodule
  1520. gitmodules_path = os.path.join(r.path, ".gitmodules")
  1521. try:
  1522. config = ConfigFile.from_path(gitmodules_path)
  1523. except FileNotFoundError:
  1524. config = ConfigFile()
  1525. config.path = gitmodules_path
  1526. config.set(("submodule", name), "url", url)
  1527. config.set(("submodule", name), "path", os.fsdecode(path))
  1528. config.write_to_path()
  1529. def submodule_init(repo: Union[str, os.PathLike[str], Repo]) -> None:
  1530. """Initialize submodules.
  1531. Args:
  1532. repo: Path to repository
  1533. """
  1534. with open_repo_closing(repo) as r:
  1535. config = r.get_config()
  1536. gitmodules_path = os.path.join(r.path, ".gitmodules")
  1537. for path, url, name in read_submodules(gitmodules_path):
  1538. config.set((b"submodule", name), b"active", True)
  1539. config.set((b"submodule", name), b"url", url)
  1540. config.write_to_path()
  1541. def submodule_list(repo: RepoPath) -> Iterator[tuple[str, str]]:
  1542. """List submodules.
  1543. Args:
  1544. repo: Path to repository
  1545. """
  1546. from .submodule import iter_cached_submodules
  1547. with open_repo_closing(repo) as r:
  1548. head_commit = r[r.head()]
  1549. assert isinstance(head_commit, Commit)
  1550. for path, sha in iter_cached_submodules(r.object_store, head_commit.tree):
  1551. yield path.decode(DEFAULT_ENCODING), sha.decode(DEFAULT_ENCODING)
  1552. def submodule_update(
  1553. repo: Union[str, os.PathLike[str], Repo],
  1554. paths: Optional[Sequence[Union[str, bytes, os.PathLike[str]]]] = None,
  1555. init: bool = False,
  1556. force: bool = False,
  1557. errstream: Optional[BinaryIO] = None,
  1558. ) -> None:
  1559. """Update submodules.
  1560. Args:
  1561. repo: Path to repository
  1562. paths: Optional list of specific submodule paths to update. If None, updates all.
  1563. init: If True, initialize submodules first
  1564. force: Force update even if local changes exist
  1565. errstream: Error stream for error messages
  1566. """
  1567. from .submodule import iter_cached_submodules
  1568. with open_repo_closing(repo) as r:
  1569. if init:
  1570. submodule_init(r)
  1571. config = r.get_config()
  1572. gitmodules_path = os.path.join(r.path, ".gitmodules")
  1573. # Get list of submodules to update
  1574. submodules_to_update = []
  1575. head_commit = r[r.head()]
  1576. assert isinstance(head_commit, Commit)
  1577. for path, sha in iter_cached_submodules(r.object_store, head_commit.tree):
  1578. path_str = (
  1579. path.decode(DEFAULT_ENCODING) if isinstance(path, bytes) else path
  1580. )
  1581. if paths is None or path_str in paths:
  1582. submodules_to_update.append((path, sha))
  1583. # Read submodule configuration
  1584. for path, target_sha in submodules_to_update:
  1585. path_str = (
  1586. path.decode(DEFAULT_ENCODING) if isinstance(path, bytes) else path
  1587. )
  1588. # Find the submodule name from .gitmodules
  1589. submodule_name: Optional[bytes] = None
  1590. for sm_path, sm_url, sm_name in read_submodules(gitmodules_path):
  1591. if sm_path == path:
  1592. submodule_name = sm_name
  1593. break
  1594. if not submodule_name:
  1595. continue
  1596. # Get the URL from config
  1597. section = (
  1598. b"submodule",
  1599. submodule_name
  1600. if isinstance(submodule_name, bytes)
  1601. else submodule_name.encode(),
  1602. )
  1603. try:
  1604. url_value = config.get(section, b"url")
  1605. if isinstance(url_value, bytes):
  1606. url = url_value.decode(DEFAULT_ENCODING)
  1607. else:
  1608. url = url_value
  1609. except KeyError:
  1610. # URL not in config, skip this submodule
  1611. continue
  1612. # Get or create the submodule repository paths
  1613. submodule_path = os.path.join(r.path, path_str)
  1614. submodule_git_dir = os.path.join(r.path, ".git", "modules", path_str)
  1615. # Clone or fetch the submodule
  1616. if not os.path.exists(submodule_git_dir):
  1617. # Clone the submodule as bare repository
  1618. os.makedirs(os.path.dirname(submodule_git_dir), exist_ok=True)
  1619. # Clone to the git directory
  1620. sub_repo = clone(url, submodule_git_dir, bare=True, checkout=False)
  1621. sub_repo.close()
  1622. # Create the submodule directory if it doesn't exist
  1623. if not os.path.exists(submodule_path):
  1624. os.makedirs(submodule_path)
  1625. # Create .git file in the submodule directory
  1626. depth = path_str.count("/") + 1
  1627. relative_git_dir = "../" * depth + ".git/modules/" + path_str
  1628. git_file_path = os.path.join(submodule_path, ".git")
  1629. with open(git_file_path, "w") as f:
  1630. f.write(f"gitdir: {relative_git_dir}\n")
  1631. # Set up working directory configuration
  1632. with open_repo_closing(submodule_git_dir) as sub_repo:
  1633. sub_config = sub_repo.get_config()
  1634. sub_config.set(
  1635. (b"core",),
  1636. b"worktree",
  1637. os.path.abspath(submodule_path).encode(),
  1638. )
  1639. sub_config.write_to_path()
  1640. # Checkout the target commit
  1641. sub_repo.refs[b"HEAD"] = target_sha
  1642. # Build the index and checkout files
  1643. tree = sub_repo[target_sha]
  1644. if hasattr(tree, "tree"): # If it's a commit, get the tree
  1645. tree_id = tree.tree
  1646. else:
  1647. tree_id = target_sha
  1648. build_index_from_tree(
  1649. submodule_path,
  1650. sub_repo.index_path(),
  1651. sub_repo.object_store,
  1652. tree_id,
  1653. )
  1654. else:
  1655. # Fetch and checkout in existing submodule
  1656. with open_repo_closing(submodule_git_dir) as sub_repo:
  1657. # Fetch from remote
  1658. client, path_segments = get_transport_and_path(url)
  1659. client.fetch(path_segments.encode(), sub_repo)
  1660. # Update to the target commit
  1661. sub_repo.refs[b"HEAD"] = target_sha
  1662. # Reset the working directory
  1663. reset(sub_repo, "hard", target_sha)
  1664. def tag_create(
  1665. repo: RepoPath,
  1666. tag: Union[str, bytes],
  1667. author: Optional[Union[str, bytes]] = None,
  1668. message: Optional[Union[str, bytes]] = None,
  1669. annotated: bool = False,
  1670. objectish: Union[str, bytes] = "HEAD",
  1671. tag_time: Optional[int] = None,
  1672. tag_timezone: Optional[int] = None,
  1673. sign: Optional[bool] = None,
  1674. encoding: str = DEFAULT_ENCODING,
  1675. ) -> None:
  1676. """Creates a tag in git via dulwich calls.
  1677. Args:
  1678. repo: Path to repository
  1679. tag: tag string
  1680. author: tag author (optional, if annotated is set)
  1681. message: tag message (optional)
  1682. annotated: whether to create an annotated tag
  1683. objectish: object the tag should point at, defaults to HEAD
  1684. tag_time: Optional time for annotated tag
  1685. tag_timezone: Optional timezone for annotated tag
  1686. sign: GPG Sign the tag (bool, defaults to False,
  1687. pass True to use default GPG key,
  1688. pass a str containing Key ID to use a specific GPG key)
  1689. encoding: Encoding to use for tag messages
  1690. """
  1691. with open_repo_closing(repo) as r:
  1692. object = parse_object(r, objectish)
  1693. if isinstance(tag, str):
  1694. tag = tag.encode(encoding)
  1695. if annotated:
  1696. # Create the tag object
  1697. tag_obj = Tag()
  1698. if author is None:
  1699. author = get_user_identity(r.get_config_stack())
  1700. elif isinstance(author, str):
  1701. author = author.encode(encoding)
  1702. else:
  1703. assert isinstance(author, bytes)
  1704. tag_obj.tagger = author
  1705. if isinstance(message, str):
  1706. message = message.encode(encoding)
  1707. elif isinstance(message, bytes):
  1708. pass
  1709. else:
  1710. message = b""
  1711. tag_obj.message = message + "\n".encode(encoding)
  1712. tag_obj.name = tag
  1713. tag_obj.object = (type(object), object.id)
  1714. if tag_time is None:
  1715. tag_time = int(time.time())
  1716. tag_obj.tag_time = tag_time
  1717. if tag_timezone is None:
  1718. tag_timezone = get_user_timezones()[1]
  1719. elif isinstance(tag_timezone, str):
  1720. tag_timezone = parse_timezone(tag_timezone.encode())
  1721. tag_obj.tag_timezone = tag_timezone
  1722. # Check if we should sign the tag
  1723. config = r.get_config_stack()
  1724. if sign is None:
  1725. # Check tag.gpgSign configuration when sign is not explicitly set
  1726. try:
  1727. should_sign = config.get_boolean(
  1728. (b"tag",), b"gpgsign", default=False
  1729. )
  1730. except KeyError:
  1731. should_sign = False # Default to not signing if no config
  1732. else:
  1733. should_sign = sign
  1734. # Get the signing key from config if signing is enabled
  1735. keyid = None
  1736. if should_sign:
  1737. try:
  1738. keyid_bytes = config.get((b"user",), b"signingkey")
  1739. keyid = keyid_bytes.decode() if keyid_bytes else None
  1740. except KeyError:
  1741. keyid = None
  1742. tag_obj.sign(keyid)
  1743. r.object_store.add_object(tag_obj)
  1744. tag_id = tag_obj.id
  1745. else:
  1746. tag_id = object.id
  1747. r.refs[_make_tag_ref(tag)] = tag_id
  1748. def verify_commit(
  1749. repo: RepoPath,
  1750. committish: Union[str, bytes] = "HEAD",
  1751. keyids: Optional[list[str]] = None,
  1752. ) -> None:
  1753. """Verify GPG signature on a commit.
  1754. Args:
  1755. repo: Path to repository
  1756. committish: Commit to verify (defaults to HEAD)
  1757. keyids: Optional list of trusted key IDs. If provided, the commit
  1758. must be signed by one of these keys. If not provided, just verifies
  1759. that the commit has a valid signature.
  1760. Raises:
  1761. gpg.errors.BadSignatures: if GPG signature verification fails
  1762. gpg.errors.MissingSignatures: if commit was not signed by a key
  1763. specified in keyids
  1764. """
  1765. with open_repo_closing(repo) as r:
  1766. commit = parse_commit(r, committish)
  1767. commit.verify(keyids)
  1768. def verify_tag(
  1769. repo: RepoPath,
  1770. tagname: Union[str, bytes],
  1771. keyids: Optional[list[str]] = None,
  1772. ) -> None:
  1773. """Verify GPG signature on a tag.
  1774. Args:
  1775. repo: Path to repository
  1776. tagname: Name of tag to verify
  1777. keyids: Optional list of trusted key IDs. If provided, the tag
  1778. must be signed by one of these keys. If not provided, just verifies
  1779. that the tag has a valid signature.
  1780. Raises:
  1781. gpg.errors.BadSignatures: if GPG signature verification fails
  1782. gpg.errors.MissingSignatures: if tag was not signed by a key
  1783. specified in keyids
  1784. """
  1785. with open_repo_closing(repo) as r:
  1786. if isinstance(tagname, str):
  1787. tagname = tagname.encode()
  1788. tag_ref = _make_tag_ref(tagname)
  1789. tag_id = r.refs[tag_ref]
  1790. tag_obj = r[tag_id]
  1791. if not isinstance(tag_obj, Tag):
  1792. raise Error(f"{tagname!r} does not point to a tag object")
  1793. tag_obj.verify(keyids)
  1794. def tag_list(repo: RepoPath, outstream: TextIO = sys.stdout) -> list[bytes]:
  1795. """List all tags.
  1796. Args:
  1797. repo: Path to repository
  1798. outstream: Stream to write tags to
  1799. """
  1800. with open_repo_closing(repo) as r:
  1801. tags = sorted(r.refs.as_dict(b"refs/tags"))
  1802. return tags
  1803. def tag_delete(repo: RepoPath, name: Union[str, bytes]) -> None:
  1804. """Remove a tag.
  1805. Args:
  1806. repo: Path to repository
  1807. name: Name of tag to remove
  1808. """
  1809. with open_repo_closing(repo) as r:
  1810. if isinstance(name, bytes):
  1811. names = [name]
  1812. elif isinstance(name, list):
  1813. names = name
  1814. else:
  1815. raise Error(f"Unexpected tag name type {name!r}")
  1816. for name in names:
  1817. del r.refs[_make_tag_ref(name)]
  1818. def _make_notes_ref(name: bytes) -> bytes:
  1819. """Make a notes ref name."""
  1820. if name.startswith(b"refs/notes/"):
  1821. return name
  1822. return LOCAL_NOTES_PREFIX + name
  1823. def notes_add(
  1824. repo: RepoPath,
  1825. object_sha: bytes,
  1826. note: bytes,
  1827. ref: bytes = b"commits",
  1828. author: Optional[bytes] = None,
  1829. committer: Optional[bytes] = None,
  1830. message: Optional[bytes] = None,
  1831. ) -> bytes:
  1832. """Add or update a note for an object.
  1833. Args:
  1834. repo: Path to repository
  1835. object_sha: SHA of the object to annotate
  1836. note: Note content
  1837. ref: Notes ref to use (defaults to "commits" for refs/notes/commits)
  1838. author: Author identity (defaults to committer)
  1839. committer: Committer identity (defaults to config)
  1840. message: Commit message for the notes update
  1841. Returns:
  1842. SHA of the new notes commit
  1843. """
  1844. with open_repo_closing(repo) as r:
  1845. # Parse the object to get its SHA
  1846. obj = parse_object(r, object_sha)
  1847. object_sha = obj.id
  1848. if isinstance(note, str):
  1849. note = note.encode(DEFAULT_ENCODING)
  1850. if isinstance(ref, str):
  1851. ref = ref.encode(DEFAULT_ENCODING)
  1852. notes_ref = _make_notes_ref(ref)
  1853. config = r.get_config_stack()
  1854. return r.notes.set_note(
  1855. object_sha,
  1856. note,
  1857. notes_ref,
  1858. author=author,
  1859. committer=committer,
  1860. message=message,
  1861. config=config,
  1862. )
  1863. def notes_remove(
  1864. repo: RepoPath,
  1865. object_sha: bytes,
  1866. ref: bytes = b"commits",
  1867. author: Optional[bytes] = None,
  1868. committer: Optional[bytes] = None,
  1869. message: Optional[bytes] = None,
  1870. ) -> Optional[bytes]:
  1871. """Remove a note for an object.
  1872. Args:
  1873. repo: Path to repository
  1874. object_sha: SHA of the object to remove notes from
  1875. ref: Notes ref to use (defaults to "commits" for refs/notes/commits)
  1876. author: Author identity (defaults to committer)
  1877. committer: Committer identity (defaults to config)
  1878. message: Commit message for the notes removal
  1879. Returns:
  1880. SHA of the new notes commit, or None if no note existed
  1881. """
  1882. with open_repo_closing(repo) as r:
  1883. # Parse the object to get its SHA
  1884. obj = parse_object(r, object_sha)
  1885. object_sha = obj.id
  1886. if isinstance(ref, str):
  1887. ref = ref.encode(DEFAULT_ENCODING)
  1888. notes_ref = _make_notes_ref(ref)
  1889. config = r.get_config_stack()
  1890. return r.notes.remove_note(
  1891. object_sha,
  1892. notes_ref,
  1893. author=author,
  1894. committer=committer,
  1895. message=message,
  1896. config=config,
  1897. )
  1898. def notes_show(
  1899. repo: Union[str, os.PathLike[str], Repo], object_sha: bytes, ref: bytes = b"commits"
  1900. ) -> Optional[bytes]:
  1901. """Show the note for an object.
  1902. Args:
  1903. repo: Path to repository
  1904. object_sha: SHA of the object
  1905. ref: Notes ref to use (defaults to "commits" for refs/notes/commits)
  1906. Returns:
  1907. Note content as bytes, or None if no note exists
  1908. """
  1909. with open_repo_closing(repo) as r:
  1910. # Parse the object to get its SHA
  1911. obj = parse_object(r, object_sha)
  1912. object_sha = obj.id
  1913. if isinstance(ref, str):
  1914. ref = ref.encode(DEFAULT_ENCODING)
  1915. notes_ref = _make_notes_ref(ref)
  1916. config = r.get_config_stack()
  1917. return r.notes.get_note(object_sha, notes_ref, config=config)
  1918. def notes_list(repo: RepoPath, ref: bytes = b"commits") -> list[tuple[bytes, bytes]]:
  1919. """List all notes in a notes ref.
  1920. Args:
  1921. repo: Path to repository
  1922. ref: Notes ref to use (defaults to "commits" for refs/notes/commits)
  1923. Returns:
  1924. List of tuples of (object_sha, note_content)
  1925. """
  1926. with open_repo_closing(repo) as r:
  1927. if isinstance(ref, str):
  1928. ref = ref.encode(DEFAULT_ENCODING)
  1929. notes_ref = _make_notes_ref(ref)
  1930. config = r.get_config_stack()
  1931. return r.notes.list_notes(notes_ref, config=config)
  1932. def reset(
  1933. repo: Union[str, os.PathLike[str], Repo],
  1934. mode: str,
  1935. treeish: Union[str, bytes, Commit, Tree, Tag] = "HEAD",
  1936. ) -> None:
  1937. """Reset current HEAD to the specified state.
  1938. Args:
  1939. repo: Path to repository
  1940. mode: Mode ("hard", "soft", "mixed")
  1941. treeish: Treeish to reset to
  1942. """
  1943. with open_repo_closing(repo) as r:
  1944. # Parse the target tree
  1945. tree = parse_tree(r, treeish)
  1946. # Only parse as commit if treeish is not a Tree object
  1947. if isinstance(treeish, Tree):
  1948. # For Tree objects, we can't determine the commit, skip updating HEAD
  1949. target_commit = None
  1950. else:
  1951. target_commit = parse_commit(r, treeish)
  1952. # Update HEAD to point to the target commit
  1953. if target_commit is not None:
  1954. r.refs[b"HEAD"] = target_commit.id
  1955. if mode == "soft":
  1956. # Soft reset: only update HEAD, leave index and working tree unchanged
  1957. return
  1958. elif mode == "mixed":
  1959. # Mixed reset: update HEAD and index, but leave working tree unchanged
  1960. from .object_store import iter_tree_contents
  1961. # Open the index
  1962. index = r.open_index()
  1963. # Clear the current index
  1964. index.clear()
  1965. # Populate index from the target tree
  1966. for entry in iter_tree_contents(r.object_store, tree.id):
  1967. # Create an IndexEntry from the tree entry
  1968. # Use zeros for filesystem-specific fields since we're not touching the working tree
  1969. assert (
  1970. entry.mode is not None
  1971. and entry.sha is not None
  1972. and entry.path is not None
  1973. )
  1974. index_entry = IndexEntry(
  1975. ctime=(0, 0),
  1976. mtime=(0, 0),
  1977. dev=0,
  1978. ino=0,
  1979. mode=entry.mode,
  1980. uid=0,
  1981. gid=0,
  1982. size=0, # Size will be 0 since we're not reading from disk
  1983. sha=entry.sha,
  1984. flags=0,
  1985. )
  1986. index[entry.path] = index_entry
  1987. # Write the updated index
  1988. index.write()
  1989. elif mode == "hard":
  1990. # Hard reset: update HEAD, index, and working tree
  1991. # Get configuration for working directory update
  1992. config = r.get_config()
  1993. honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt")
  1994. if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"):
  1995. validate_path_element = validate_path_element_ntfs
  1996. elif config.get_boolean(
  1997. b"core", b"core.protectHFS", sys.platform == "darwin"
  1998. ):
  1999. validate_path_element = validate_path_element_hfs
  2000. else:
  2001. validate_path_element = validate_path_element_default
  2002. if config.get_boolean(b"core", b"symlinks", True):
  2003. def symlink_wrapper(
  2004. source: Union[str, bytes, os.PathLike[str]],
  2005. target: Union[str, bytes, os.PathLike[str]],
  2006. ) -> None:
  2007. symlink(source, target) # type: ignore[arg-type,unused-ignore]
  2008. symlink_fn = symlink_wrapper
  2009. else:
  2010. def symlink_fallback(
  2011. source: Union[str, bytes, os.PathLike[str]],
  2012. target: Union[str, bytes, os.PathLike[str]],
  2013. ) -> None:
  2014. mode = "w" + ("b" if isinstance(source, bytes) else "")
  2015. with open(target, mode) as f:
  2016. f.write(source)
  2017. symlink_fn = symlink_fallback
  2018. # Update working tree and index
  2019. blob_normalizer = r.get_blob_normalizer()
  2020. # For reset --hard, use current index tree as old tree to get proper deletions
  2021. index = r.open_index()
  2022. if len(index) > 0:
  2023. index_tree_id = index.commit(r.object_store)
  2024. else:
  2025. # Empty index
  2026. index_tree_id = None
  2027. changes = tree_changes(
  2028. r.object_store, index_tree_id, tree.id, want_unchanged=True
  2029. )
  2030. update_working_tree(
  2031. r,
  2032. index_tree_id,
  2033. tree.id,
  2034. change_iterator=changes,
  2035. honor_filemode=honor_filemode,
  2036. validate_path_element=validate_path_element,
  2037. symlink_fn=symlink_fn,
  2038. force_remove_untracked=True,
  2039. blob_normalizer=blob_normalizer,
  2040. allow_overwrite_modified=True, # Allow overwriting modified files
  2041. )
  2042. else:
  2043. raise Error(f"Invalid reset mode: {mode}")
  2044. def get_remote_repo(
  2045. repo: Repo, remote_location: Optional[Union[str, bytes]] = None
  2046. ) -> tuple[Optional[str], str]:
  2047. """Get the remote repository information.
  2048. Args:
  2049. repo: Local repository object
  2050. remote_location: Optional remote name or URL; defaults to branch remote
  2051. Returns:
  2052. Tuple of (remote_name, remote_url) where remote_name may be None
  2053. if remote_location is a URL rather than a configured remote
  2054. """
  2055. config = repo.get_config()
  2056. if remote_location is None:
  2057. remote_location = get_branch_remote(repo)
  2058. if isinstance(remote_location, str):
  2059. encoded_location = remote_location.encode()
  2060. else:
  2061. encoded_location = remote_location
  2062. section = (b"remote", encoded_location)
  2063. remote_name: Optional[str] = None
  2064. if config.has_section(section):
  2065. remote_name = encoded_location.decode()
  2066. encoded_location = config.get(section, "url")
  2067. else:
  2068. remote_name = None
  2069. return (remote_name, encoded_location.decode())
  2070. def push(
  2071. repo: RepoPath,
  2072. remote_location: Optional[Union[str, bytes]] = None,
  2073. refspecs: Optional[Union[Union[str, bytes], Sequence[Union[str, bytes]]]] = None,
  2074. outstream: BinaryIO = default_bytes_out_stream,
  2075. errstream: Union[BinaryIO, RawIOBase] = default_bytes_err_stream,
  2076. force: bool = False,
  2077. **kwargs: object,
  2078. ) -> SendPackResult:
  2079. """Remote push with dulwich via dulwich.client.
  2080. Args:
  2081. repo: Path to repository
  2082. remote_location: Location of the remote
  2083. refspecs: Refs to push to remote
  2084. outstream: A stream file to write output
  2085. errstream: A stream file to write errors
  2086. force: Force overwriting refs
  2087. **kwargs: Additional keyword arguments for the client
  2088. """
  2089. # Open the repo
  2090. with open_repo_closing(repo) as r:
  2091. (remote_name, remote_location) = get_remote_repo(r, remote_location)
  2092. # Check if mirror mode is enabled
  2093. mirror_mode = False
  2094. if remote_name:
  2095. try:
  2096. mirror_mode_val = r.get_config_stack().get_boolean(
  2097. (b"remote", remote_name.encode()), b"mirror"
  2098. )
  2099. if mirror_mode_val is not None:
  2100. mirror_mode = mirror_mode_val
  2101. except KeyError:
  2102. pass
  2103. if mirror_mode:
  2104. # Mirror mode: push all refs and delete non-existent ones
  2105. refspecs = []
  2106. for ref in r.refs.keys():
  2107. # Push all refs to the same name on remote
  2108. refspecs.append(ref + b":" + ref)
  2109. elif refspecs is None:
  2110. refspecs = [active_branch(r)]
  2111. # Normalize refspecs to bytes
  2112. if isinstance(refspecs, str):
  2113. refspecs_bytes: Union[bytes, list[bytes]] = refspecs.encode()
  2114. elif isinstance(refspecs, bytes):
  2115. refspecs_bytes = refspecs
  2116. else:
  2117. refspecs_bytes = []
  2118. for spec in refspecs:
  2119. if isinstance(spec, str):
  2120. refspecs_bytes.append(spec.encode())
  2121. else:
  2122. refspecs_bytes.append(spec)
  2123. # Get the client and path
  2124. client, path = get_transport_and_path(
  2125. remote_location,
  2126. config=r.get_config_stack(),
  2127. **kwargs, # type: ignore[arg-type]
  2128. )
  2129. selected_refs = []
  2130. remote_changed_refs: dict[bytes, Optional[bytes]] = {}
  2131. def update_refs(refs: dict[bytes, bytes]) -> dict[bytes, bytes]:
  2132. from .refs import DictRefsContainer
  2133. remote_refs = DictRefsContainer(refs)
  2134. selected_refs.extend(
  2135. parse_reftuples(r.refs, remote_refs, refspecs_bytes, force=force)
  2136. )
  2137. new_refs = {}
  2138. # In mirror mode, delete remote refs that don't exist locally
  2139. if mirror_mode:
  2140. local_refs = set(r.refs.keys())
  2141. for remote_ref in refs.keys():
  2142. if remote_ref not in local_refs:
  2143. new_refs[remote_ref] = ZERO_SHA
  2144. remote_changed_refs[remote_ref] = None
  2145. # TODO: Handle selected_refs == {None: None}
  2146. for lh, rh, force_ref in selected_refs:
  2147. if lh is None:
  2148. assert rh is not None
  2149. new_refs[rh] = ZERO_SHA
  2150. remote_changed_refs[rh] = None
  2151. else:
  2152. try:
  2153. localsha = r.refs[lh]
  2154. except KeyError as exc:
  2155. raise Error(
  2156. f"No valid ref {lh.decode() if isinstance(lh, bytes) else lh} in local repository"
  2157. ) from exc
  2158. assert rh is not None
  2159. if not force_ref and rh in refs:
  2160. check_diverged(r, refs[rh], localsha)
  2161. new_refs[rh] = localsha
  2162. remote_changed_refs[rh] = localsha
  2163. return new_refs
  2164. err_encoding = getattr(errstream, "encoding", None) or DEFAULT_ENCODING
  2165. remote_location = client.get_url(path)
  2166. try:
  2167. def generate_pack_data_wrapper(
  2168. have: AbstractSet[bytes],
  2169. want: AbstractSet[bytes],
  2170. ofs_delta: bool = False,
  2171. ) -> tuple[int, Iterator[UnpackedObject]]:
  2172. # Wrap to match the expected signature
  2173. # Convert AbstractSet to set since generate_pack_data expects set
  2174. return r.generate_pack_data(
  2175. set(have), set(want), progress=None, ofs_delta=ofs_delta
  2176. )
  2177. result = client.send_pack(
  2178. path.encode(),
  2179. update_refs,
  2180. generate_pack_data=generate_pack_data_wrapper,
  2181. progress=lambda data: (errstream.write(data), None)[1],
  2182. )
  2183. except SendPackError as exc:
  2184. raise Error(
  2185. "Push to " + remote_location + " failed -> " + exc.args[0].decode(),
  2186. ) from exc
  2187. else:
  2188. errstream.write(
  2189. b"Push to " + remote_location.encode(err_encoding) + b" successful.\n"
  2190. )
  2191. for ref, error in (result.ref_status or {}).items():
  2192. if error is not None:
  2193. errstream.write(
  2194. b"Push of ref %s failed: %s\n" % (ref, error.encode(err_encoding))
  2195. )
  2196. else:
  2197. errstream.write(b"Ref %s updated\n" % ref)
  2198. if remote_name is not None:
  2199. _import_remote_refs(r.refs, remote_name, remote_changed_refs)
  2200. return result
  2201. # Trigger auto GC if needed
  2202. from .gc import maybe_auto_gc
  2203. with open_repo_closing(repo) as r:
  2204. maybe_auto_gc(r)
  2205. def pull(
  2206. repo: RepoPath,
  2207. remote_location: Optional[Union[str, bytes]] = None,
  2208. refspecs: Optional[Union[Union[str, bytes], Sequence[Union[str, bytes]]]] = None,
  2209. outstream: BinaryIO = default_bytes_out_stream,
  2210. errstream: Union[BinaryIO, RawIOBase] = default_bytes_err_stream,
  2211. fast_forward: bool = True,
  2212. ff_only: bool = False,
  2213. force: bool = False,
  2214. filter_spec: Optional[str] = None,
  2215. protocol_version: Optional[int] = None,
  2216. **kwargs: object,
  2217. ) -> None:
  2218. """Pull from remote via dulwich.client.
  2219. Args:
  2220. repo: Path to repository
  2221. remote_location: Location of the remote
  2222. refspecs: refspecs to fetch. Can be a bytestring, a string, or a list of
  2223. bytestring/string.
  2224. outstream: A stream file to write to output
  2225. errstream: A stream file to write to errors
  2226. fast_forward: If True, raise an exception when fast-forward is not possible
  2227. ff_only: If True, only allow fast-forward merges. Raises DivergedBranches
  2228. when branches have diverged rather than performing a merge.
  2229. force: If True, allow overwriting local changes in the working tree.
  2230. If False, pull will abort if it would overwrite uncommitted changes.
  2231. filter_spec: A git-rev-list-style object filter spec, as an ASCII string.
  2232. Only used if the server supports the Git protocol-v2 'filter'
  2233. feature, and ignored otherwise.
  2234. protocol_version: desired Git protocol version. By default the highest
  2235. mutually supported protocol version will be used
  2236. **kwargs: Additional keyword arguments for the client
  2237. """
  2238. # Open the repo
  2239. with open_repo_closing(repo) as r:
  2240. (remote_name, remote_location) = get_remote_repo(r, remote_location)
  2241. selected_refs = []
  2242. if refspecs is None:
  2243. refspecs_normalized: Union[bytes, list[bytes]] = [b"HEAD"]
  2244. elif isinstance(refspecs, str):
  2245. refspecs_normalized = refspecs.encode()
  2246. elif isinstance(refspecs, bytes):
  2247. refspecs_normalized = refspecs
  2248. else:
  2249. refspecs_normalized = []
  2250. for spec in refspecs:
  2251. if isinstance(spec, str):
  2252. refspecs_normalized.append(spec.encode())
  2253. else:
  2254. refspecs_normalized.append(spec)
  2255. def determine_wants(
  2256. remote_refs: dict[bytes, bytes], depth: Optional[int] = None
  2257. ) -> list[bytes]:
  2258. from .refs import DictRefsContainer
  2259. remote_refs_container = DictRefsContainer(remote_refs)
  2260. selected_refs.extend(
  2261. parse_reftuples(
  2262. remote_refs_container, r.refs, refspecs_normalized, force=force
  2263. )
  2264. )
  2265. return [
  2266. remote_refs[lh]
  2267. for (lh, rh, force_ref) in selected_refs
  2268. if lh is not None
  2269. and lh in remote_refs
  2270. and remote_refs[lh] not in r.object_store
  2271. ]
  2272. client, path = get_transport_and_path(
  2273. remote_location,
  2274. config=r.get_config_stack(),
  2275. **kwargs, # type: ignore[arg-type]
  2276. )
  2277. if filter_spec:
  2278. filter_spec_bytes: Optional[bytes] = filter_spec.encode("ascii")
  2279. else:
  2280. filter_spec_bytes = None
  2281. def progress(data: bytes) -> None:
  2282. errstream.write(data)
  2283. fetch_result = client.fetch(
  2284. path.encode(),
  2285. r,
  2286. progress=progress,
  2287. determine_wants=determine_wants, # type: ignore[arg-type] # Function matches protocol but mypy can't verify
  2288. filter_spec=filter_spec_bytes,
  2289. protocol_version=protocol_version,
  2290. )
  2291. # Store the old HEAD tree before making changes
  2292. try:
  2293. old_head = r.refs[b"HEAD"]
  2294. old_commit = r[old_head]
  2295. assert isinstance(old_commit, Commit)
  2296. old_tree_id = old_commit.tree
  2297. except KeyError:
  2298. old_tree_id = None
  2299. merged = False
  2300. for lh, rh, force_ref in selected_refs:
  2301. if not force_ref and rh is not None and rh in r.refs:
  2302. try:
  2303. assert lh is not None
  2304. followed_ref = r.refs.follow(rh)[1]
  2305. assert followed_ref is not None
  2306. lh_ref = fetch_result.refs[lh]
  2307. assert lh_ref is not None
  2308. check_diverged(r, followed_ref, lh_ref)
  2309. except DivergedBranches as exc:
  2310. if ff_only or fast_forward:
  2311. raise
  2312. else:
  2313. # Perform merge
  2314. assert lh is not None
  2315. merge_ref = fetch_result.refs[lh]
  2316. assert merge_ref is not None
  2317. _merge_result, conflicts = _do_merge(r, merge_ref)
  2318. if conflicts:
  2319. raise Error(
  2320. f"Merge conflicts occurred: {conflicts}"
  2321. ) from exc
  2322. merged = True
  2323. # Skip updating ref since merge already updated HEAD
  2324. continue
  2325. if rh is not None and lh is not None:
  2326. lh_value = fetch_result.refs[lh]
  2327. if lh_value is not None:
  2328. r.refs[rh] = lh_value
  2329. # Only update HEAD if we didn't perform a merge
  2330. if selected_refs and not merged:
  2331. lh, rh, _ = selected_refs[0]
  2332. if lh is not None:
  2333. ref_value = fetch_result.refs[lh]
  2334. if ref_value is not None:
  2335. r[b"HEAD"] = ref_value
  2336. # Update working tree to match the new HEAD
  2337. # Skip if merge was performed as merge already updates the working tree
  2338. if not merged and old_tree_id is not None:
  2339. head_commit = r[b"HEAD"]
  2340. assert isinstance(head_commit, Commit)
  2341. new_tree_id = head_commit.tree
  2342. blob_normalizer = r.get_blob_normalizer()
  2343. changes = tree_changes(r.object_store, old_tree_id, new_tree_id)
  2344. update_working_tree(
  2345. r,
  2346. old_tree_id,
  2347. new_tree_id,
  2348. change_iterator=changes,
  2349. blob_normalizer=blob_normalizer,
  2350. allow_overwrite_modified=force,
  2351. )
  2352. if remote_name is not None:
  2353. _import_remote_refs(r.refs, remote_name, fetch_result.refs)
  2354. # Trigger auto GC if needed
  2355. from .gc import maybe_auto_gc
  2356. with open_repo_closing(repo) as r:
  2357. maybe_auto_gc(r)
  2358. def status(
  2359. repo: Union[str, os.PathLike[str], Repo] = ".",
  2360. ignored: bool = False,
  2361. untracked_files: str = "normal",
  2362. ) -> GitStatus:
  2363. """Returns staged, unstaged, and untracked changes relative to the HEAD.
  2364. Args:
  2365. repo: Path to repository or repository object
  2366. ignored: Whether to include ignored files in untracked
  2367. untracked_files: How to handle untracked files, defaults to "all":
  2368. "no": do not return untracked files
  2369. "normal": return untracked directories, not their contents
  2370. "all": include all files in untracked directories
  2371. Using untracked_files="no" can be faster than "all" when the worktree
  2372. contains many untracked files/directories.
  2373. Using untracked_files="normal" provides a good balance, only showing
  2374. directories that are entirely untracked without listing all their contents.
  2375. Returns: GitStatus tuple,
  2376. staged - dict with lists of staged paths (diff index/HEAD)
  2377. unstaged - list of unstaged paths (diff index/working-tree)
  2378. untracked - list of untracked, un-ignored & non-.git paths
  2379. """
  2380. with open_repo_closing(repo) as r:
  2381. # 1. Get status of staged
  2382. tracked_changes = get_tree_changes(r)
  2383. # 2. Get status of unstaged
  2384. index = r.open_index()
  2385. normalizer = r.get_blob_normalizer()
  2386. # Create a wrapper that handles the bytes -> Blob conversion
  2387. if normalizer is not None:
  2388. def filter_callback(data: bytes, path: bytes) -> bytes:
  2389. from dulwich.objects import Blob
  2390. blob = Blob()
  2391. blob.data = data
  2392. normalized_blob = normalizer.checkin_normalize(blob, path)
  2393. result_data: bytes = normalized_blob.data
  2394. return result_data
  2395. else:
  2396. filter_callback = None
  2397. # Check if core.preloadIndex is enabled
  2398. config = r.get_config_stack()
  2399. preload_index = config.get_boolean(b"core", b"preloadIndex", False)
  2400. unstaged_changes = list(
  2401. get_unstaged_changes(index, r.path, filter_callback, preload_index)
  2402. )
  2403. untracked_paths = get_untracked_paths(
  2404. r.path,
  2405. r.path,
  2406. index,
  2407. exclude_ignored=not ignored,
  2408. untracked_files=untracked_files,
  2409. )
  2410. if sys.platform == "win32":
  2411. untracked_changes = [
  2412. path.replace(os.path.sep, "/") for path in untracked_paths
  2413. ]
  2414. else:
  2415. untracked_changes = list(untracked_paths)
  2416. return GitStatus(tracked_changes, unstaged_changes, untracked_changes)
  2417. def shortlog(
  2418. repo: Union[str, os.PathLike[str], Repo],
  2419. summary_only: bool = False,
  2420. sort_by_commits: bool = False,
  2421. ) -> list[dict[str, str]]:
  2422. """Summarize commits by author, like git shortlog.
  2423. Args:
  2424. repo: Path to repository or Repo object.
  2425. summary_only: If True, only show counts per author.
  2426. sort_by_commits: If True, sort authors by number of commits.
  2427. Returns:
  2428. A list of dictionaries, each containing:
  2429. - "author": the author's name as a string
  2430. - "messages": all commit messages concatenated into a single string
  2431. """
  2432. with open_repo_closing(repo) as r:
  2433. walker = r.get_walker()
  2434. authors: dict[str, list[str]] = {}
  2435. for entry in walker:
  2436. commit = entry.commit
  2437. author = commit.author.decode(commit.encoding or "utf-8")
  2438. message = commit.message.decode(commit.encoding or "utf-8").strip()
  2439. authors.setdefault(author, []).append(message)
  2440. # Convert messages to single string per author
  2441. items: list[dict[str, str]] = [
  2442. {"author": author, "messages": "\n".join(msgs)}
  2443. for author, msgs in authors.items()
  2444. ]
  2445. if sort_by_commits:
  2446. # Sort by number of commits (lines in messages)
  2447. items.sort(key=lambda x: len(x["messages"].splitlines()), reverse=True)
  2448. return items
  2449. def _walk_working_dir_paths(
  2450. frompath: Union[str, bytes, os.PathLike[str]],
  2451. basepath: Union[str, bytes, os.PathLike[str]],
  2452. prune_dirnames: Optional[Callable[[str, list[str]], list[str]]] = None,
  2453. ) -> Iterator[tuple[Union[str, bytes], bool]]:
  2454. """Get path, is_dir for files in working dir from frompath.
  2455. Args:
  2456. frompath: Path to begin walk
  2457. basepath: Path to compare to
  2458. prune_dirnames: Optional callback to prune dirnames during os.walk
  2459. dirnames will be set to result of prune_dirnames(dirpath, dirnames)
  2460. """
  2461. # Convert paths to strings for os.walk compatibility
  2462. for dirpath, dirnames, filenames in os.walk(frompath): # type: ignore[type-var]
  2463. # Skip .git and below.
  2464. if ".git" in dirnames:
  2465. dirnames.remove(".git")
  2466. if dirpath != basepath:
  2467. continue
  2468. if ".git" in filenames:
  2469. filenames.remove(".git")
  2470. if dirpath != basepath:
  2471. continue
  2472. if dirpath != frompath:
  2473. yield dirpath, True # type: ignore[misc]
  2474. for filename in filenames:
  2475. filepath = os.path.join(dirpath, filename) # type: ignore[call-overload]
  2476. yield filepath, False
  2477. if prune_dirnames:
  2478. dirnames[:] = prune_dirnames(dirpath, dirnames) # type: ignore[arg-type]
  2479. def get_untracked_paths(
  2480. frompath: Union[str, bytes, os.PathLike[str]],
  2481. basepath: Union[str, bytes, os.PathLike[str]],
  2482. index: Index,
  2483. exclude_ignored: bool = False,
  2484. untracked_files: str = "all",
  2485. ) -> Iterator[str]:
  2486. """Get untracked paths.
  2487. Args:
  2488. frompath: Path to walk
  2489. basepath: Path to compare to
  2490. index: Index to check against
  2491. exclude_ignored: Whether to exclude ignored paths
  2492. untracked_files: How to handle untracked files:
  2493. - "no": return an empty list
  2494. - "all": return all files in untracked directories
  2495. - "normal": return untracked directories without listing their contents
  2496. Note: ignored directories will never be walked for performance reasons.
  2497. If exclude_ignored is False, only the path to an ignored directory will
  2498. be yielded, no files inside the directory will be returned
  2499. """
  2500. if untracked_files not in ("no", "all", "normal"):
  2501. raise ValueError("untracked_files must be one of (no, all, normal)")
  2502. if untracked_files == "no":
  2503. return
  2504. # Normalize paths to str
  2505. frompath_str = os.fsdecode(os.fspath(frompath))
  2506. basepath_str = os.fsdecode(os.fspath(basepath))
  2507. with open_repo_closing(basepath_str) as r:
  2508. ignore_manager = IgnoreFilterManager.from_repo(r)
  2509. ignored_dirs = []
  2510. # List to store untracked directories found during traversal
  2511. untracked_dir_list = []
  2512. def directory_has_non_ignored_files(dir_path: str, base_rel_path: str) -> bool:
  2513. """Recursively check if directory contains any non-ignored files."""
  2514. try:
  2515. for entry in os.listdir(dir_path):
  2516. entry_path = os.path.join(dir_path, entry)
  2517. rel_entry = os.path.join(base_rel_path, entry)
  2518. if os.path.isfile(entry_path):
  2519. if ignore_manager.is_ignored(rel_entry) is not True:
  2520. return True
  2521. elif os.path.isdir(entry_path):
  2522. if directory_has_non_ignored_files(entry_path, rel_entry):
  2523. return True
  2524. return False
  2525. except OSError:
  2526. # If we can't read the directory, assume it has non-ignored files
  2527. return True
  2528. def prune_dirnames(dirpath: str, dirnames: list[str]) -> list[str]:
  2529. for i in range(len(dirnames) - 1, -1, -1):
  2530. path = os.path.join(dirpath, dirnames[i])
  2531. ip = os.path.join(os.path.relpath(path, basepath_str), "")
  2532. # Check if directory is ignored
  2533. if ignore_manager.is_ignored(ip) is True:
  2534. if not exclude_ignored:
  2535. ignored_dirs.append(
  2536. os.path.join(os.path.relpath(path, frompath_str), "")
  2537. )
  2538. del dirnames[i]
  2539. continue
  2540. # For "normal" mode, check if the directory is entirely untracked
  2541. if untracked_files == "normal":
  2542. # Convert directory path to tree path for index lookup
  2543. dir_tree_path = path_to_tree_path(basepath_str, path)
  2544. # Check if any file in this directory is tracked
  2545. dir_prefix = dir_tree_path + b"/" if dir_tree_path else b""
  2546. has_tracked_files = any(name.startswith(dir_prefix) for name in index)
  2547. if not has_tracked_files:
  2548. # This directory is entirely untracked
  2549. rel_path_base = os.path.relpath(path, basepath_str)
  2550. rel_path_from = os.path.join(
  2551. os.path.relpath(path, frompath_str), ""
  2552. )
  2553. # If excluding ignored, check if directory contains any non-ignored files
  2554. if exclude_ignored:
  2555. if not directory_has_non_ignored_files(path, rel_path_base):
  2556. # Directory only contains ignored files, skip it
  2557. del dirnames[i]
  2558. continue
  2559. # Check if it should be excluded due to ignore rules
  2560. is_ignored = ignore_manager.is_ignored(rel_path_base)
  2561. if not exclude_ignored or not is_ignored:
  2562. untracked_dir_list.append(rel_path_from)
  2563. del dirnames[i]
  2564. return dirnames
  2565. # For "all" mode, use the original behavior
  2566. if untracked_files == "all":
  2567. for ap, is_dir in _walk_working_dir_paths(
  2568. frompath_str, basepath_str, prune_dirnames=prune_dirnames
  2569. ):
  2570. # frompath_str and basepath_str are both str, so ap must be str
  2571. assert isinstance(ap, str)
  2572. if not is_dir:
  2573. ip = path_to_tree_path(basepath_str, ap)
  2574. if ip not in index:
  2575. if not exclude_ignored or not ignore_manager.is_ignored(
  2576. os.path.relpath(ap, basepath_str)
  2577. ):
  2578. yield os.path.relpath(ap, frompath_str)
  2579. else: # "normal" mode
  2580. # Walk directories, handling both files and directories
  2581. for ap, is_dir in _walk_working_dir_paths(
  2582. frompath_str, basepath_str, prune_dirnames=prune_dirnames
  2583. ):
  2584. # frompath_str and basepath_str are both str, so ap must be str
  2585. assert isinstance(ap, str)
  2586. # This part won't be reached for pruned directories
  2587. if is_dir:
  2588. # Check if this directory is entirely untracked
  2589. dir_tree_path = path_to_tree_path(basepath_str, ap)
  2590. dir_prefix = dir_tree_path + b"/" if dir_tree_path else b""
  2591. has_tracked_files = any(name.startswith(dir_prefix) for name in index)
  2592. if not has_tracked_files:
  2593. if not exclude_ignored or not ignore_manager.is_ignored(
  2594. os.path.relpath(ap, basepath_str)
  2595. ):
  2596. yield os.path.join(os.path.relpath(ap, frompath_str), "")
  2597. else:
  2598. # Check individual files in directories that contain tracked files
  2599. ip = path_to_tree_path(basepath_str, ap)
  2600. if ip not in index:
  2601. if not exclude_ignored or not ignore_manager.is_ignored(
  2602. os.path.relpath(ap, basepath_str)
  2603. ):
  2604. yield os.path.relpath(ap, frompath_str)
  2605. # Yield any untracked directories found during pruning
  2606. yield from untracked_dir_list
  2607. yield from ignored_dirs
  2608. def get_tree_changes(repo: RepoPath) -> dict[str, list[Union[str, bytes]]]:
  2609. """Return add/delete/modify changes to tree by comparing index to HEAD.
  2610. Args:
  2611. repo: repo path or object
  2612. Returns: dict with lists for each type of change
  2613. """
  2614. with open_repo_closing(repo) as r:
  2615. index = r.open_index()
  2616. # Compares the Index to the HEAD & determines changes
  2617. # Iterate through the changes and report add/delete/modify
  2618. # TODO: call out to dulwich.diff_tree somehow.
  2619. tracked_changes: dict[str, list[Union[str, bytes]]] = {
  2620. "add": [],
  2621. "delete": [],
  2622. "modify": [],
  2623. }
  2624. try:
  2625. head_commit = r[b"HEAD"]
  2626. assert isinstance(head_commit, Commit)
  2627. tree_id = head_commit.tree
  2628. except KeyError:
  2629. tree_id = None
  2630. for change in index.changes_from_tree(r.object_store, tree_id):
  2631. if not change[0][0]:
  2632. assert change[0][1] is not None
  2633. tracked_changes["add"].append(change[0][1])
  2634. elif not change[0][1]:
  2635. assert change[0][0] is not None
  2636. tracked_changes["delete"].append(change[0][0])
  2637. elif change[0][0] == change[0][1]:
  2638. assert change[0][0] is not None
  2639. tracked_changes["modify"].append(change[0][0])
  2640. else:
  2641. raise NotImplementedError("git mv ops not yet supported")
  2642. return tracked_changes
  2643. def daemon(
  2644. path: Union[str, os.PathLike[str]] = ".",
  2645. address: Optional[str] = None,
  2646. port: Optional[int] = None,
  2647. ) -> None:
  2648. """Run a daemon serving Git requests over TCP/IP.
  2649. Args:
  2650. path: Path to the directory to serve.
  2651. address: Optional address to listen on (defaults to ::)
  2652. port: Optional port to listen on (defaults to TCP_GIT_PORT)
  2653. """
  2654. # TODO(jelmer): Support git-daemon-export-ok and --export-all.
  2655. backend = FileSystemBackend(os.fspath(path))
  2656. server = TCPGitServer(backend, address or "localhost", port or 9418)
  2657. server.serve_forever()
  2658. def web_daemon(
  2659. path: Union[str, os.PathLike[str]] = ".",
  2660. address: Optional[str] = None,
  2661. port: Optional[int] = None,
  2662. ) -> None:
  2663. """Run a daemon serving Git requests over HTTP.
  2664. Args:
  2665. path: Path to the directory to serve
  2666. address: Optional address to listen on (defaults to ::)
  2667. port: Optional port to listen on (defaults to 80)
  2668. """
  2669. from .web import (
  2670. WSGIRequestHandlerLogger,
  2671. WSGIServerLogger,
  2672. make_server,
  2673. make_wsgi_chain,
  2674. )
  2675. backend = FileSystemBackend(os.fspath(path))
  2676. app = make_wsgi_chain(backend)
  2677. server = make_server(
  2678. address or "::",
  2679. port or 80,
  2680. app,
  2681. handler_class=WSGIRequestHandlerLogger,
  2682. server_class=WSGIServerLogger,
  2683. )
  2684. server.serve_forever()
  2685. def upload_pack(
  2686. path: Union[str, os.PathLike[str]] = ".",
  2687. inf: Optional[BinaryIO] = None,
  2688. outf: Optional[BinaryIO] = None,
  2689. ) -> int:
  2690. """Upload a pack file after negotiating its contents using smart protocol.
  2691. Args:
  2692. path: Path to the repository
  2693. inf: Input stream to communicate with client
  2694. outf: Output stream to communicate with client
  2695. """
  2696. if outf is None:
  2697. outf = sys.stdout.buffer
  2698. if inf is None:
  2699. inf = sys.stdin.buffer
  2700. assert outf is not None
  2701. assert inf is not None
  2702. path = os.path.expanduser(path)
  2703. backend = FileSystemBackend(path)
  2704. def send_fn(data: bytes) -> None:
  2705. outf.write(data)
  2706. outf.flush()
  2707. proto = Protocol(inf.read, send_fn)
  2708. handler = UploadPackHandler(backend, [path], proto)
  2709. # FIXME: Catch exceptions and write a single-line summary to outf.
  2710. handler.handle()
  2711. return 0
  2712. def receive_pack(
  2713. path: Union[str, os.PathLike[str]] = ".",
  2714. inf: Optional[BinaryIO] = None,
  2715. outf: Optional[BinaryIO] = None,
  2716. ) -> int:
  2717. """Receive a pack file after negotiating its contents using smart protocol.
  2718. Args:
  2719. path: Path to the repository
  2720. inf: Input stream to communicate with client
  2721. outf: Output stream to communicate with client
  2722. """
  2723. if outf is None:
  2724. outf = sys.stdout.buffer
  2725. if inf is None:
  2726. inf = sys.stdin.buffer
  2727. assert outf is not None
  2728. assert inf is not None
  2729. path = os.path.expanduser(path)
  2730. backend = FileSystemBackend(path)
  2731. def send_fn(data: bytes) -> None:
  2732. outf.write(data)
  2733. outf.flush()
  2734. proto = Protocol(inf.read, send_fn)
  2735. handler = ReceivePackHandler(backend, [path], proto)
  2736. # FIXME: Catch exceptions and write a single-line summary to outf.
  2737. handler.handle()
  2738. return 0
  2739. def _make_branch_ref(name: Union[str, bytes]) -> Ref:
  2740. if isinstance(name, str):
  2741. name = name.encode(DEFAULT_ENCODING)
  2742. return LOCAL_BRANCH_PREFIX + name
  2743. def _make_tag_ref(name: Union[str, bytes]) -> Ref:
  2744. if isinstance(name, str):
  2745. name = name.encode(DEFAULT_ENCODING)
  2746. return LOCAL_TAG_PREFIX + name
  2747. def branch_delete(
  2748. repo: RepoPath, name: Union[str, bytes, Sequence[Union[str, bytes]]]
  2749. ) -> None:
  2750. """Delete a branch.
  2751. Args:
  2752. repo: Path to the repository
  2753. name: Name of the branch
  2754. """
  2755. with open_repo_closing(repo) as r:
  2756. if isinstance(name, (list, tuple)):
  2757. names = name
  2758. else:
  2759. names = [name]
  2760. for branch_name in names:
  2761. del r.refs[_make_branch_ref(branch_name)]
  2762. def branch_create(
  2763. repo: Union[str, os.PathLike[str], Repo],
  2764. name: Union[str, bytes],
  2765. objectish: Optional[Union[str, bytes]] = None,
  2766. force: bool = False,
  2767. ) -> None:
  2768. """Create a branch.
  2769. Args:
  2770. repo: Path to the repository
  2771. name: Name of the new branch
  2772. objectish: Target object to point new branch at (defaults to HEAD)
  2773. force: Force creation of branch, even if it already exists
  2774. """
  2775. with open_repo_closing(repo) as r:
  2776. if objectish is None:
  2777. objectish = "HEAD"
  2778. # Try to expand branch shorthand before parsing
  2779. original_objectish = objectish
  2780. objectish_bytes = (
  2781. objectish.encode(DEFAULT_ENCODING)
  2782. if isinstance(objectish, str)
  2783. else objectish
  2784. )
  2785. if b"refs/remotes/" + objectish_bytes in r.refs:
  2786. objectish = b"refs/remotes/" + objectish_bytes
  2787. elif b"refs/heads/" + objectish_bytes in r.refs:
  2788. objectish = b"refs/heads/" + objectish_bytes
  2789. object = parse_object(r, objectish)
  2790. refname = _make_branch_ref(name)
  2791. ref_message = (
  2792. b"branch: Created from " + original_objectish.encode(DEFAULT_ENCODING)
  2793. if isinstance(original_objectish, str)
  2794. else b"branch: Created from " + original_objectish
  2795. )
  2796. if force:
  2797. r.refs.set_if_equals(refname, None, object.id, message=ref_message)
  2798. else:
  2799. if not r.refs.add_if_new(refname, object.id, message=ref_message):
  2800. name_str = name.decode() if isinstance(name, bytes) else name
  2801. raise Error(f"Branch with name {name_str} already exists.")
  2802. # Check if we should set up tracking
  2803. config = r.get_config_stack()
  2804. try:
  2805. auto_setup_merge = config.get((b"branch",), b"autoSetupMerge").decode()
  2806. except KeyError:
  2807. auto_setup_merge = "true" # Default value
  2808. # Determine if the objectish refers to a remote-tracking branch
  2809. objectish_ref = None
  2810. if original_objectish != "HEAD":
  2811. # Try to resolve objectish as a ref
  2812. objectish_bytes = (
  2813. original_objectish.encode(DEFAULT_ENCODING)
  2814. if isinstance(original_objectish, str)
  2815. else original_objectish
  2816. )
  2817. if objectish_bytes in r.refs:
  2818. objectish_ref = objectish_bytes
  2819. elif b"refs/remotes/" + objectish_bytes in r.refs:
  2820. objectish_ref = b"refs/remotes/" + objectish_bytes
  2821. elif b"refs/heads/" + objectish_bytes in r.refs:
  2822. objectish_ref = b"refs/heads/" + objectish_bytes
  2823. else:
  2824. # HEAD might point to a remote-tracking branch
  2825. head_ref = r.refs.follow(b"HEAD")[0][1]
  2826. if head_ref.startswith(b"refs/remotes/"):
  2827. objectish_ref = head_ref
  2828. # Set up tracking if appropriate
  2829. if objectish_ref and (
  2830. (auto_setup_merge == "always")
  2831. or (
  2832. auto_setup_merge == "true"
  2833. and objectish_ref.startswith(b"refs/remotes/")
  2834. )
  2835. ):
  2836. # Extract remote name and branch from the ref
  2837. if objectish_ref.startswith(b"refs/remotes/"):
  2838. parts = objectish_ref[len(b"refs/remotes/") :].split(b"/", 1)
  2839. if len(parts) == 2:
  2840. remote_name = parts[0]
  2841. remote_branch = b"refs/heads/" + parts[1]
  2842. # Set up tracking
  2843. repo_config = r.get_config()
  2844. branch_name_bytes = (
  2845. name.encode(DEFAULT_ENCODING) if isinstance(name, str) else name
  2846. )
  2847. repo_config.set(
  2848. (b"branch", branch_name_bytes), b"remote", remote_name
  2849. )
  2850. repo_config.set(
  2851. (b"branch", branch_name_bytes), b"merge", remote_branch
  2852. )
  2853. repo_config.write_to_path()
  2854. def filter_branches_by_pattern(branches: Iterable[bytes], pattern: str) -> list[bytes]:
  2855. """Filter branches by fnmatch pattern.
  2856. Args:
  2857. branches: Iterable of branch names as bytes
  2858. pattern: Pattern to match against
  2859. Returns:
  2860. List of filtered branch names
  2861. """
  2862. return [
  2863. branch for branch in branches if fnmatch.fnmatchcase(branch.decode(), pattern)
  2864. ]
  2865. def branch_list(repo: RepoPath) -> list[bytes]:
  2866. """List all branches.
  2867. Args:
  2868. repo: Path to the repository
  2869. Returns:
  2870. List of branch names (without refs/heads/ prefix)
  2871. """
  2872. with open_repo_closing(repo) as r:
  2873. branches = list(r.refs.keys(base=LOCAL_BRANCH_PREFIX))
  2874. # Check for branch.sort configuration
  2875. config = r.get_config_stack()
  2876. try:
  2877. sort_key = config.get((b"branch",), b"sort").decode()
  2878. except KeyError:
  2879. # Default is refname (alphabetical)
  2880. sort_key = "refname"
  2881. # Parse sort key
  2882. reverse = False
  2883. if sort_key.startswith("-"):
  2884. reverse = True
  2885. sort_key = sort_key[1:]
  2886. # Apply sorting
  2887. if sort_key == "refname":
  2888. # Simple alphabetical sort (default)
  2889. branches.sort(reverse=reverse)
  2890. elif sort_key in ("committerdate", "authordate"):
  2891. # Sort by date
  2892. def get_commit_date(branch_name: bytes) -> int:
  2893. ref = LOCAL_BRANCH_PREFIX + branch_name
  2894. sha = r.refs[ref]
  2895. commit = r.object_store[sha]
  2896. assert isinstance(commit, Commit)
  2897. if sort_key == "committerdate":
  2898. return cast(int, commit.commit_time)
  2899. else: # authordate
  2900. return cast(int, commit.author_time)
  2901. # Sort branches by date
  2902. # Note: Python's sort naturally orders smaller values first (ascending)
  2903. # For dates, this means oldest first by default
  2904. # Use a stable sort with branch name as secondary key for consistent ordering
  2905. if reverse:
  2906. # For reverse sort, we want newest dates first but alphabetical names second
  2907. branches.sort(key=lambda b: (-get_commit_date(b), b))
  2908. else:
  2909. branches.sort(key=lambda b: (get_commit_date(b), b))
  2910. else:
  2911. # Unknown sort key, fall back to default
  2912. branches.sort()
  2913. return branches
  2914. def branch_remotes_list(repo: RepoPath) -> list[bytes]:
  2915. """List the short names of all remote branches.
  2916. Args:
  2917. repo: Path to the repository
  2918. Returns:
  2919. List of branch names (without refs/remotes/ prefix, and without remote name; e.g. 'main' from 'origin/main')
  2920. """
  2921. with open_repo_closing(repo) as r:
  2922. branches = list(r.refs.keys(base=LOCAL_REMOTE_PREFIX))
  2923. config = r.get_config_stack()
  2924. try:
  2925. sort_key = config.get((b"branch",), b"sort").decode()
  2926. except KeyError:
  2927. # Default is refname (alphabetical)
  2928. sort_key = "refname"
  2929. # Parse sort key
  2930. reverse = False
  2931. if sort_key.startswith("-"):
  2932. reverse = True
  2933. sort_key = sort_key[1:]
  2934. # Apply sorting
  2935. if sort_key == "refname":
  2936. # Simple alphabetical sort (default)
  2937. branches.sort(reverse=reverse)
  2938. elif sort_key in ("committerdate", "authordate"):
  2939. # Sort by date
  2940. def get_commit_date(branch_name: bytes) -> int:
  2941. ref = LOCAL_REMOTE_PREFIX + branch_name
  2942. sha = r.refs[ref]
  2943. commit = r.object_store[sha]
  2944. assert isinstance(commit, Commit)
  2945. if sort_key == "committerdate":
  2946. return cast(int, commit.commit_time)
  2947. else: # authordate
  2948. return cast(int, commit.author_time)
  2949. # Sort branches by date
  2950. # Note: Python's sort naturally orders smaller values first (ascending)
  2951. # For dates, this means oldest first by default
  2952. # Use a stable sort with branch name as secondary key for consistent ordering
  2953. if reverse:
  2954. # For reverse sort, we want newest dates first but alphabetical names second
  2955. branches.sort(key=lambda b: (-get_commit_date(b), b))
  2956. else:
  2957. branches.sort(key=lambda b: (get_commit_date(b), b))
  2958. else:
  2959. # Unknown sort key
  2960. raise ValueError(f"Unknown sort key: {sort_key}")
  2961. return branches
  2962. def _get_branch_merge_status(repo: RepoPath) -> Iterator[tuple[bytes, bool]]:
  2963. """Get merge status for all branches relative to current HEAD.
  2964. Args:
  2965. repo: Path to the repository
  2966. Yields:
  2967. Tuple of (branch_name, is_merged) where:
  2968. - branch_name: Branch name without refs/heads/ prefix
  2969. - is_merged: True if branch is merged into HEAD, False otherwise
  2970. """
  2971. with open_repo_closing(repo) as r:
  2972. current_sha = r.refs[b"HEAD"]
  2973. for branch_ref, branch_sha in r.refs.as_dict(base=b"refs/heads/").items():
  2974. # Check if branch is an ancestor of HEAD (fully merged)
  2975. is_merged = can_fast_forward(r, branch_sha, current_sha)
  2976. yield branch_ref, is_merged
  2977. def merged_branches(repo: RepoPath) -> Iterator[bytes]:
  2978. """List branches that have been merged into the current branch.
  2979. Args:
  2980. repo: Path to the repository
  2981. Yields:
  2982. Branch names (without refs/heads/ prefix) that are merged
  2983. into the current HEAD
  2984. """
  2985. for branch_name, is_merged in _get_branch_merge_status(repo):
  2986. if is_merged:
  2987. yield branch_name
  2988. def no_merged_branches(repo: RepoPath) -> Iterator[bytes]:
  2989. """List branches that have been merged into the current branch.
  2990. Args:
  2991. repo: Path to the repository
  2992. Yields:
  2993. Branch names (without refs/heads/ prefix) that are merged
  2994. into the current HEAD
  2995. """
  2996. for branch_name, is_merged in _get_branch_merge_status(repo):
  2997. if not is_merged:
  2998. yield branch_name
  2999. def branches_containing(repo: RepoPath, commit: str) -> Iterator[bytes]:
  3000. """List branches that contain the specified commit.
  3001. Args:
  3002. repo: Path to the repository
  3003. commit: Commit-ish string (SHA, branch name, tag, etc.)
  3004. Yields:
  3005. Branch names (without refs/heads/ prefix) that contain the commit
  3006. Raises:
  3007. ValueError: If the commit reference is malformed
  3008. KeyError: If the commit reference does not exist
  3009. """
  3010. with open_repo_closing(repo) as r:
  3011. commit_obj = parse_commit(r, commit)
  3012. commit_sha = commit_obj.id
  3013. for branch_ref, branch_sha in r.refs.as_dict(base=LOCAL_BRANCH_PREFIX).items():
  3014. if can_fast_forward(r, commit_sha, branch_sha):
  3015. yield branch_ref
  3016. def active_branch(repo: RepoPath) -> bytes:
  3017. """Return the active branch in the repository, if any.
  3018. Args:
  3019. repo: Repository to open
  3020. Returns:
  3021. branch name
  3022. Raises:
  3023. KeyError: if the repository does not have a working tree
  3024. IndexError: if HEAD is floating
  3025. """
  3026. with open_repo_closing(repo) as r:
  3027. active_ref = r.refs.follow(b"HEAD")[0][1]
  3028. if not active_ref.startswith(LOCAL_BRANCH_PREFIX):
  3029. raise ValueError(active_ref)
  3030. return active_ref[len(LOCAL_BRANCH_PREFIX) :]
  3031. def get_branch_remote(repo: Union[str, os.PathLike[str], Repo]) -> bytes:
  3032. """Return the active branch's remote name, if any.
  3033. Args:
  3034. repo: Repository to open
  3035. Returns:
  3036. remote name
  3037. Raises:
  3038. KeyError: if the repository does not have a working tree
  3039. """
  3040. with open_repo_closing(repo) as r:
  3041. branch_name = active_branch(r.path)
  3042. config = r.get_config()
  3043. try:
  3044. remote_name = config.get((b"branch", branch_name), b"remote")
  3045. except KeyError:
  3046. remote_name = b"origin"
  3047. return remote_name
  3048. def get_branch_merge(repo: RepoPath, branch_name: Optional[bytes] = None) -> bytes:
  3049. """Return the branch's merge reference (upstream branch), if any.
  3050. Args:
  3051. repo: Repository to open
  3052. branch_name: Name of the branch (defaults to active branch)
  3053. Returns:
  3054. merge reference name (e.g. b"refs/heads/main")
  3055. Raises:
  3056. KeyError: if the branch does not have a merge configuration
  3057. """
  3058. with open_repo_closing(repo) as r:
  3059. if branch_name is None:
  3060. branch_name = active_branch(r.path)
  3061. config = r.get_config()
  3062. return config.get((b"branch", branch_name), b"merge")
  3063. def set_branch_tracking(
  3064. repo: Union[str, os.PathLike[str], Repo],
  3065. branch_name: bytes,
  3066. remote_name: bytes,
  3067. remote_ref: bytes,
  3068. ) -> None:
  3069. """Set up branch tracking configuration.
  3070. Args:
  3071. repo: Repository to open
  3072. branch_name: Name of the local branch
  3073. remote_name: Name of the remote (e.g. b"origin")
  3074. remote_ref: Remote reference to track (e.g. b"refs/heads/main")
  3075. """
  3076. with open_repo_closing(repo) as r:
  3077. config = r.get_config()
  3078. config.set((b"branch", branch_name), b"remote", remote_name)
  3079. config.set((b"branch", branch_name), b"merge", remote_ref)
  3080. config.write_to_path()
  3081. def fetch(
  3082. repo: RepoPath,
  3083. remote_location: Optional[Union[str, bytes]] = None,
  3084. outstream: TextIO = sys.stdout,
  3085. errstream: Union[BinaryIO, RawIOBase] = default_bytes_err_stream,
  3086. message: Optional[bytes] = None,
  3087. depth: Optional[int] = None,
  3088. prune: bool = False,
  3089. prune_tags: bool = False,
  3090. force: bool = False,
  3091. operation: Optional[str] = None,
  3092. thin_packs: bool = True,
  3093. report_activity: Optional[Callable[[int, str], None]] = None,
  3094. quiet: bool = False,
  3095. include_tags: bool = False,
  3096. username: Optional[str] = None,
  3097. password: Optional[str] = None,
  3098. key_filename: Optional[str] = None,
  3099. ssh_command: Optional[str] = None,
  3100. ) -> FetchPackResult:
  3101. """Fetch objects from a remote server.
  3102. Args:
  3103. repo: Path to the repository
  3104. remote_location: String identifying a remote server
  3105. outstream: Output stream (defaults to stdout)
  3106. errstream: Error stream (defaults to stderr)
  3107. message: Reflog message (defaults to b"fetch: from <remote_name>")
  3108. depth: Depth to fetch at
  3109. prune: Prune remote removed refs
  3110. prune_tags: Prune remote removed tags
  3111. force: Force fetching even if it would overwrite local changes
  3112. operation: Git operation for authentication (e.g., "fetch")
  3113. thin_packs: Whether to use thin packs
  3114. report_activity: Optional callback for reporting transport activity
  3115. quiet: Whether to suppress progress output
  3116. include_tags: Whether to include tags
  3117. username: Username for authentication
  3118. password: Password for authentication
  3119. key_filename: SSH key filename
  3120. ssh_command: SSH command to use
  3121. Returns:
  3122. Dictionary with refs on the remote
  3123. """
  3124. with open_repo_closing(repo) as r:
  3125. (remote_name, remote_location) = get_remote_repo(r, remote_location)
  3126. if message is None:
  3127. message = b"fetch: from " + remote_location.encode(DEFAULT_ENCODING)
  3128. client, path = get_transport_and_path(
  3129. remote_location,
  3130. config=r.get_config_stack(),
  3131. operation=operation,
  3132. thin_packs=thin_packs,
  3133. report_activity=report_activity,
  3134. quiet=quiet,
  3135. include_tags=include_tags,
  3136. username=username,
  3137. password=password,
  3138. key_filename=key_filename,
  3139. ssh_command=ssh_command,
  3140. )
  3141. def progress(data: bytes) -> None:
  3142. errstream.write(data)
  3143. fetch_result = client.fetch(path.encode(), r, progress=progress, depth=depth)
  3144. if remote_name is not None:
  3145. _import_remote_refs(
  3146. r.refs,
  3147. remote_name,
  3148. fetch_result.refs,
  3149. message,
  3150. prune=prune,
  3151. prune_tags=prune_tags,
  3152. )
  3153. # Trigger auto GC if needed
  3154. from .gc import maybe_auto_gc
  3155. with open_repo_closing(repo) as r:
  3156. maybe_auto_gc(r)
  3157. return fetch_result
  3158. def for_each_ref(
  3159. repo: Union[Repo, str] = ".",
  3160. pattern: Optional[Union[str, bytes]] = None,
  3161. ) -> list[tuple[bytes, bytes, bytes]]:
  3162. """Iterate over all refs that match the (optional) pattern.
  3163. Args:
  3164. repo: Path to the repository
  3165. pattern: Optional glob (7) patterns to filter the refs with
  3166. Returns: List of bytes tuples with: (sha, object_type, ref_name)
  3167. """
  3168. if isinstance(pattern, str):
  3169. pattern = os.fsencode(pattern)
  3170. with open_repo_closing(repo) as r:
  3171. refs = r.get_refs()
  3172. if pattern:
  3173. matching_refs: dict[bytes, bytes] = {}
  3174. pattern_parts = pattern.split(b"/")
  3175. for ref, sha in refs.items():
  3176. matches = False
  3177. # git for-each-ref uses glob (7) style patterns, but fnmatch
  3178. # is greedy and also matches slashes, unlike glob.glob.
  3179. # We have to check parts of the pattern individually.
  3180. # See https://github.com/python/cpython/issues/72904
  3181. ref_parts = ref.split(b"/")
  3182. if len(ref_parts) > len(pattern_parts):
  3183. continue
  3184. for pat, ref_part in zip(pattern_parts, ref_parts):
  3185. matches = fnmatch.fnmatchcase(ref_part, pat)
  3186. if not matches:
  3187. break
  3188. if matches:
  3189. matching_refs[ref] = sha
  3190. refs = matching_refs
  3191. ret: list[tuple[bytes, bytes, bytes]] = [
  3192. (sha, r.get_object(sha).type_name, ref)
  3193. for ref, sha in sorted(
  3194. refs.items(),
  3195. key=lambda ref_sha: ref_sha[0],
  3196. )
  3197. if ref != b"HEAD"
  3198. ]
  3199. return ret
  3200. def ls_remote(
  3201. remote: Union[str, bytes],
  3202. config: Optional[Config] = None,
  3203. operation: Optional[str] = None,
  3204. thin_packs: bool = True,
  3205. report_activity: Optional[Callable[[int, str], None]] = None,
  3206. quiet: bool = False,
  3207. include_tags: bool = False,
  3208. username: Optional[str] = None,
  3209. password: Optional[str] = None,
  3210. key_filename: Optional[str] = None,
  3211. ssh_command: Optional[str] = None,
  3212. ) -> LsRemoteResult:
  3213. """List the refs in a remote.
  3214. Args:
  3215. remote: Remote repository location
  3216. config: Configuration to use
  3217. operation: Operation type
  3218. thin_packs: Whether to use thin packs
  3219. report_activity: Function to report activity
  3220. quiet: Whether to suppress output
  3221. include_tags: Whether to include tags
  3222. username: Username for authentication
  3223. password: Password for authentication
  3224. key_filename: SSH key filename
  3225. ssh_command: SSH command to use
  3226. Returns:
  3227. LsRemoteResult object with refs and symrefs
  3228. """
  3229. if config is None:
  3230. config = StackedConfig.default()
  3231. remote_str = remote.decode() if isinstance(remote, bytes) else remote
  3232. client, host_path = get_transport_and_path(
  3233. remote_str,
  3234. config=config,
  3235. operation=operation,
  3236. thin_packs=thin_packs,
  3237. report_activity=report_activity,
  3238. quiet=quiet,
  3239. include_tags=include_tags,
  3240. username=username,
  3241. password=password,
  3242. key_filename=key_filename,
  3243. ssh_command=ssh_command,
  3244. )
  3245. return client.get_refs(
  3246. host_path.encode() if isinstance(host_path, str) else host_path
  3247. )
  3248. def repack(repo: RepoPath) -> None:
  3249. """Repack loose files in a repository.
  3250. Currently this only packs loose objects.
  3251. Args:
  3252. repo: Path to the repository
  3253. """
  3254. with open_repo_closing(repo) as r:
  3255. r.object_store.pack_loose_objects()
  3256. def pack_objects(
  3257. repo: RepoPath,
  3258. object_ids: Sequence[bytes],
  3259. packf: BinaryIO,
  3260. idxf: Optional[BinaryIO],
  3261. delta_window_size: Optional[int] = None,
  3262. deltify: Optional[bool] = None,
  3263. reuse_deltas: bool = True,
  3264. pack_index_version: Optional[int] = None,
  3265. ) -> None:
  3266. """Pack objects into a file.
  3267. Args:
  3268. repo: Path to the repository
  3269. object_ids: List of object ids to write
  3270. packf: File-like object to write to
  3271. idxf: File-like object to write to (can be None)
  3272. delta_window_size: Sliding window size for searching for deltas;
  3273. Set to None for default window size.
  3274. deltify: Whether to deltify objects
  3275. reuse_deltas: Allow reuse of existing deltas while deltifying
  3276. pack_index_version: Pack index version to use (1, 2, or 3). If None, uses default version.
  3277. """
  3278. with open_repo_closing(repo) as r:
  3279. entries, data_sum = write_pack_from_container(
  3280. packf.write,
  3281. r.object_store,
  3282. [(oid, None) for oid in object_ids],
  3283. deltify=deltify,
  3284. delta_window_size=delta_window_size,
  3285. reuse_deltas=reuse_deltas,
  3286. )
  3287. if idxf is not None:
  3288. index_entries = sorted([(k, v[0], v[1]) for (k, v) in entries.items()])
  3289. write_pack_index(idxf, index_entries, data_sum, version=pack_index_version)
  3290. def ls_tree(
  3291. repo: RepoPath,
  3292. treeish: Union[str, bytes, Commit, Tree, Tag] = b"HEAD",
  3293. outstream: Union[TextIO, BinaryIO] = sys.stdout,
  3294. recursive: bool = False,
  3295. name_only: bool = False,
  3296. ) -> None:
  3297. """List contents of a tree.
  3298. Args:
  3299. repo: Path to the repository
  3300. treeish: Tree id to list
  3301. outstream: Output stream (defaults to stdout)
  3302. recursive: Whether to recursively list files
  3303. name_only: Only print item name
  3304. """
  3305. def list_tree(store: BaseObjectStore, treeid: bytes, base: bytes) -> None:
  3306. tree = store[treeid]
  3307. assert isinstance(tree, Tree)
  3308. for name, mode, sha in tree.iteritems():
  3309. assert name is not None
  3310. assert mode is not None
  3311. assert sha is not None
  3312. if base:
  3313. name = posixpath.join(base, name)
  3314. if name_only:
  3315. if isinstance(outstream, BinaryIO):
  3316. outstream.write(name + b"\n")
  3317. else:
  3318. outstream.write(name.decode("utf-8", "replace") + "\n")
  3319. else:
  3320. formatted = pretty_format_tree_entry(name, mode, sha)
  3321. if isinstance(outstream, BinaryIO):
  3322. outstream.write(formatted.encode("utf-8"))
  3323. else:
  3324. outstream.write(formatted)
  3325. if stat.S_ISDIR(mode) and recursive:
  3326. list_tree(store, sha, name)
  3327. with open_repo_closing(repo) as r:
  3328. tree = parse_tree(r, treeish)
  3329. list_tree(r.object_store, tree.id, b"")
  3330. def remote_add(
  3331. repo: RepoPath,
  3332. name: Union[bytes, str],
  3333. url: Union[bytes, str],
  3334. ) -> None:
  3335. """Add a remote.
  3336. Args:
  3337. repo: Path to the repository
  3338. name: Remote name
  3339. url: Remote URL
  3340. """
  3341. if not isinstance(name, bytes):
  3342. name = name.encode(DEFAULT_ENCODING)
  3343. if not isinstance(url, bytes):
  3344. url = url.encode(DEFAULT_ENCODING)
  3345. with open_repo_closing(repo) as r:
  3346. c = r.get_config()
  3347. section = (b"remote", name)
  3348. if c.has_section(section):
  3349. raise RemoteExists(f"Remote {name.decode()} already exists")
  3350. c.set(section, b"url", url)
  3351. c.write_to_path()
  3352. def remote_remove(repo: Repo, name: Union[bytes, str]) -> None:
  3353. """Remove a remote.
  3354. Args:
  3355. repo: Path to the repository
  3356. name: Remote name
  3357. """
  3358. if not isinstance(name, bytes):
  3359. name = name.encode(DEFAULT_ENCODING)
  3360. with open_repo_closing(repo) as r:
  3361. c = r.get_config()
  3362. section = (b"remote", name)
  3363. del c[section]
  3364. c.write_to_path()
  3365. def _quote_path(path: str) -> str:
  3366. """Quote a path using C-style quoting similar to git's core.quotePath.
  3367. Args:
  3368. path: Path to quote
  3369. Returns:
  3370. Quoted path string
  3371. """
  3372. # Check if path needs quoting (non-ASCII or special characters)
  3373. needs_quoting = False
  3374. for char in path:
  3375. if ord(char) > 127 or char in '"\\':
  3376. needs_quoting = True
  3377. break
  3378. if not needs_quoting:
  3379. return path
  3380. # Apply C-style quoting
  3381. quoted = '"'
  3382. for char in path:
  3383. if ord(char) > 127:
  3384. # Non-ASCII character, encode as octal escape
  3385. utf8_bytes = char.encode("utf-8")
  3386. for byte in utf8_bytes:
  3387. quoted += f"\\{byte:03o}"
  3388. elif char == '"':
  3389. quoted += '\\"'
  3390. elif char == "\\":
  3391. quoted += "\\\\"
  3392. else:
  3393. quoted += char
  3394. quoted += '"'
  3395. return quoted
  3396. def check_ignore(
  3397. repo: RepoPath,
  3398. paths: Sequence[Union[str, bytes, os.PathLike[str]]],
  3399. no_index: bool = False,
  3400. quote_path: bool = True,
  3401. ) -> Iterator[str]:
  3402. r"""Debug gitignore files.
  3403. Args:
  3404. repo: Path to the repository
  3405. paths: List of paths to check for
  3406. no_index: Don't check index
  3407. quote_path: If True, quote non-ASCII characters in returned paths using
  3408. C-style octal escapes (e.g. "тест.txt" becomes "\\321\\202\\320\\265\\321\\201\\321\\202.txt").
  3409. If False, return raw unicode paths.
  3410. Returns: List of ignored files
  3411. """
  3412. with open_repo_closing(repo) as r:
  3413. index = r.open_index()
  3414. ignore_manager = IgnoreFilterManager.from_repo(r)
  3415. for original_path in paths:
  3416. # Convert path to string for consistent handling
  3417. original_path_fspath = os.fspath(original_path)
  3418. # Normalize to str
  3419. original_path_str = os.fsdecode(original_path_fspath)
  3420. if not no_index and path_to_tree_path(r.path, original_path_str) in index:
  3421. continue
  3422. # Preserve whether the original path had a trailing slash
  3423. had_trailing_slash = original_path_str.endswith(("/", os.path.sep))
  3424. if os.path.isabs(original_path_str):
  3425. path = os.path.relpath(original_path_str, r.path)
  3426. # Normalize Windows paths to use forward slashes
  3427. if os.path.sep != "/":
  3428. path = path.replace(os.path.sep, "/")
  3429. else:
  3430. path = original_path_str
  3431. # Restore trailing slash if it was in the original
  3432. if had_trailing_slash and not path.endswith("/"):
  3433. path = path + "/"
  3434. # For directories, check with trailing slash to get correct ignore behavior
  3435. test_path = path
  3436. path_without_slash = path.rstrip("/")
  3437. is_directory = os.path.isdir(os.path.join(r.path, path_without_slash))
  3438. # If this is a directory path, ensure we test it correctly
  3439. if is_directory and not path.endswith("/"):
  3440. test_path = path + "/"
  3441. if ignore_manager.is_ignored(test_path):
  3442. # Return relative path (like git does) when absolute path was provided
  3443. if os.path.isabs(original_path):
  3444. output_path = path
  3445. else:
  3446. output_path = original_path # type: ignore[assignment]
  3447. yield _quote_path(output_path) if quote_path else output_path
  3448. def update_head(
  3449. repo: RepoPath,
  3450. target: Union[str, bytes],
  3451. detached: bool = False,
  3452. new_branch: Optional[Union[str, bytes]] = None,
  3453. ) -> None:
  3454. """Update HEAD to point at a new branch/commit.
  3455. Note that this does not actually update the working tree.
  3456. Args:
  3457. repo: Path to the repository
  3458. detached: Create a detached head
  3459. target: Branch or committish to switch to
  3460. new_branch: New branch to create
  3461. """
  3462. with open_repo_closing(repo) as r:
  3463. if new_branch is not None:
  3464. to_set = _make_branch_ref(new_branch)
  3465. else:
  3466. to_set = b"HEAD"
  3467. if detached:
  3468. # TODO(jelmer): Provide some way so that the actual ref gets
  3469. # updated rather than what it points to, so the delete isn't
  3470. # necessary.
  3471. del r.refs[to_set]
  3472. r.refs[to_set] = parse_commit(r, target).id
  3473. else:
  3474. r.refs.set_symbolic_ref(to_set, parse_ref(r, target))
  3475. if new_branch is not None:
  3476. r.refs.set_symbolic_ref(b"HEAD", to_set)
  3477. def checkout(
  3478. repo: Union[str, os.PathLike[str], Repo],
  3479. target: Optional[Union[str, bytes, Commit, Tag]] = None,
  3480. force: bool = False,
  3481. new_branch: Optional[Union[bytes, str]] = None,
  3482. paths: Optional[list[Union[bytes, str]]] = None,
  3483. ) -> None:
  3484. """Switch to a branch or commit, updating both HEAD and the working tree.
  3485. This is similar to 'git checkout', allowing you to switch to a branch,
  3486. tag, or specific commit. Unlike update_head, this function also updates
  3487. the working tree to match the target.
  3488. Args:
  3489. repo: Path to repository or repository object
  3490. target: Branch name, tag, or commit SHA to checkout. If None and paths is specified,
  3491. restores files from HEAD
  3492. force: Force checkout even if there are local changes
  3493. new_branch: Create a new branch at target (like git checkout -b)
  3494. paths: List of specific paths to checkout. If specified, only these paths are updated
  3495. and HEAD is not changed
  3496. Raises:
  3497. CheckoutError: If checkout cannot be performed due to conflicts
  3498. KeyError: If the target reference cannot be found
  3499. """
  3500. with open_repo_closing(repo) as r:
  3501. # Store the original target for later reference checks
  3502. original_target = target
  3503. worktree = r.get_worktree()
  3504. # Handle path-specific checkout (like git checkout -- <paths>)
  3505. if paths is not None:
  3506. # Convert paths to bytes
  3507. byte_paths = []
  3508. for path in paths:
  3509. if isinstance(path, str):
  3510. byte_paths.append(path.encode(DEFAULT_ENCODING))
  3511. else:
  3512. byte_paths.append(path)
  3513. # If no target specified, use HEAD
  3514. if target is None:
  3515. try:
  3516. target = r.refs[b"HEAD"]
  3517. except KeyError:
  3518. raise CheckoutError("No HEAD reference found")
  3519. else:
  3520. if isinstance(target, str):
  3521. target = target.encode(DEFAULT_ENCODING)
  3522. # Get the target commit and tree
  3523. target_tree = parse_tree(r, target)
  3524. # Get blob normalizer for line ending conversion
  3525. blob_normalizer = r.get_blob_normalizer()
  3526. # Restore specified paths from target tree
  3527. for path in byte_paths:
  3528. try:
  3529. # Look up the path in the target tree
  3530. mode, sha = target_tree.lookup_path(
  3531. r.object_store.__getitem__, path
  3532. )
  3533. obj = r[sha]
  3534. assert isinstance(obj, Blob), "Expected a Blob object"
  3535. except KeyError:
  3536. # Path doesn't exist in target tree
  3537. pass
  3538. else:
  3539. # Create directories if needed
  3540. # Handle path as string
  3541. if isinstance(path, bytes):
  3542. path_str = path.decode(DEFAULT_ENCODING)
  3543. else:
  3544. path_str = path
  3545. file_path = os.path.join(r.path, path_str)
  3546. os.makedirs(os.path.dirname(file_path), exist_ok=True)
  3547. # Write the file content
  3548. if stat.S_ISREG(mode):
  3549. # Apply checkout filters (smudge)
  3550. if blob_normalizer:
  3551. obj = blob_normalizer.checkout_normalize(obj, path)
  3552. flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
  3553. if sys.platform == "win32":
  3554. flags |= os.O_BINARY
  3555. with os.fdopen(os.open(file_path, flags, mode), "wb") as f:
  3556. f.write(obj.data)
  3557. # Update the index
  3558. worktree.stage(path)
  3559. return
  3560. # Normal checkout (switching branches/commits)
  3561. if target is None:
  3562. raise ValueError("Target must be specified for branch/commit checkout")
  3563. if isinstance(target, str):
  3564. target_bytes = target.encode(DEFAULT_ENCODING)
  3565. elif isinstance(target, bytes):
  3566. target_bytes = target
  3567. else:
  3568. # For Commit/Tag objects, we'll use their SHA
  3569. target_bytes = target.id
  3570. if isinstance(new_branch, str):
  3571. new_branch = new_branch.encode(DEFAULT_ENCODING)
  3572. # Parse the target to get the commit
  3573. assert (
  3574. original_target is not None
  3575. ) # Guaranteed by earlier check for normal checkout
  3576. target_commit = parse_commit(r, original_target)
  3577. target_tree_id = target_commit.tree
  3578. # Get current HEAD tree for comparison
  3579. try:
  3580. current_head = r.refs[b"HEAD"]
  3581. current_commit = r[current_head]
  3582. assert isinstance(current_commit, Commit), "Expected a Commit object"
  3583. current_tree_id = current_commit.tree
  3584. except KeyError:
  3585. # No HEAD yet (empty repo)
  3586. current_tree_id = None
  3587. # Check for uncommitted changes if not forcing
  3588. if not force and current_tree_id is not None:
  3589. status_report = status(r)
  3590. changes = []
  3591. # staged is a dict with 'add', 'delete', 'modify' keys
  3592. if isinstance(status_report.staged, dict):
  3593. changes.extend(status_report.staged.get("add", []))
  3594. changes.extend(status_report.staged.get("delete", []))
  3595. changes.extend(status_report.staged.get("modify", []))
  3596. # unstaged is a list
  3597. changes.extend(status_report.unstaged)
  3598. if changes:
  3599. # Check if any changes would conflict with checkout
  3600. target_tree_obj = r[target_tree_id]
  3601. assert isinstance(target_tree_obj, Tree), "Expected a Tree object"
  3602. target_tree = target_tree_obj
  3603. for change in changes:
  3604. if isinstance(change, str):
  3605. change = change.encode(DEFAULT_ENCODING)
  3606. try:
  3607. target_tree.lookup_path(r.object_store.__getitem__, change)
  3608. except KeyError:
  3609. # File doesn't exist in target tree - change can be preserved
  3610. pass
  3611. else:
  3612. # File exists in target tree - would overwrite local changes
  3613. raise CheckoutError(
  3614. f"Your local changes to '{change.decode()}' would be "
  3615. "overwritten by checkout. Please commit or stash before switching."
  3616. )
  3617. # Get configuration for working directory update
  3618. config = r.get_config()
  3619. honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt")
  3620. if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"):
  3621. validate_path_element = validate_path_element_ntfs
  3622. else:
  3623. validate_path_element = validate_path_element_default
  3624. if config.get_boolean(b"core", b"symlinks", True):
  3625. def symlink_wrapper(
  3626. source: Union[str, bytes, os.PathLike[str]],
  3627. target: Union[str, bytes, os.PathLike[str]],
  3628. ) -> None:
  3629. symlink(source, target) # type: ignore[arg-type,unused-ignore]
  3630. symlink_fn = symlink_wrapper
  3631. else:
  3632. def symlink_fallback(
  3633. source: Union[str, bytes, os.PathLike[str]],
  3634. target: Union[str, bytes, os.PathLike[str]],
  3635. ) -> None:
  3636. mode = "w" + ("b" if isinstance(source, bytes) else "")
  3637. with open(target, mode) as f:
  3638. f.write(source)
  3639. symlink_fn = symlink_fallback
  3640. # Get blob normalizer for line ending conversion
  3641. blob_normalizer = r.get_blob_normalizer()
  3642. # Update working tree
  3643. tree_change_iterator: Iterator[TreeChange] = tree_changes(
  3644. r.object_store, current_tree_id, target_tree_id
  3645. )
  3646. update_working_tree(
  3647. r,
  3648. current_tree_id,
  3649. target_tree_id,
  3650. change_iterator=tree_change_iterator,
  3651. honor_filemode=honor_filemode,
  3652. validate_path_element=validate_path_element,
  3653. symlink_fn=symlink_fn,
  3654. force_remove_untracked=force,
  3655. blob_normalizer=blob_normalizer,
  3656. allow_overwrite_modified=force,
  3657. )
  3658. # Update HEAD
  3659. if new_branch:
  3660. # Create new branch and switch to it
  3661. branch_create(r, new_branch, objectish=target_commit.id.decode("ascii"))
  3662. update_head(r, new_branch)
  3663. # Set up tracking if creating from a remote branch
  3664. from .refs import LOCAL_REMOTE_PREFIX, parse_remote_ref
  3665. if isinstance(original_target, bytes) and target_bytes.startswith(
  3666. LOCAL_REMOTE_PREFIX
  3667. ):
  3668. try:
  3669. remote_name, branch_name = parse_remote_ref(target_bytes)
  3670. # Set tracking to refs/heads/<branch> on the remote
  3671. set_branch_tracking(
  3672. r, new_branch, remote_name, b"refs/heads/" + branch_name
  3673. )
  3674. except ValueError:
  3675. # Invalid remote ref format, skip tracking setup
  3676. pass
  3677. else:
  3678. # Check if target is a branch name (with or without refs/heads/ prefix)
  3679. branch_ref = None
  3680. if (
  3681. isinstance(original_target, (str, bytes))
  3682. and target_bytes in r.refs.keys()
  3683. ):
  3684. if target_bytes.startswith(LOCAL_BRANCH_PREFIX):
  3685. branch_ref = target_bytes
  3686. else:
  3687. # Try adding refs/heads/ prefix
  3688. potential_branch = (
  3689. _make_branch_ref(target_bytes)
  3690. if isinstance(original_target, (str, bytes))
  3691. else None
  3692. )
  3693. if potential_branch in r.refs.keys():
  3694. branch_ref = potential_branch
  3695. if branch_ref:
  3696. # It's a branch - update HEAD symbolically
  3697. update_head(r, branch_ref)
  3698. else:
  3699. # It's a tag, other ref, or commit SHA - detached HEAD
  3700. update_head(r, target_commit.id.decode("ascii"), detached=True)
  3701. def reset_file(
  3702. repo: Repo,
  3703. file_path: str,
  3704. target: Union[str, bytes, Commit, Tree, Tag] = b"HEAD",
  3705. symlink_fn: Optional[
  3706. Callable[
  3707. [Union[str, bytes, os.PathLike[str]], Union[str, bytes, os.PathLike[str]]],
  3708. None,
  3709. ]
  3710. ] = None,
  3711. ) -> None:
  3712. """Reset the file to specific commit or branch.
  3713. Args:
  3714. repo: dulwich Repo object
  3715. file_path: file to reset, relative to the repository path
  3716. target: branch or commit or b'HEAD' to reset
  3717. symlink_fn: Function to use for creating symlinks
  3718. """
  3719. tree = parse_tree(repo, treeish=target)
  3720. tree_path = _fs_to_tree_path(file_path)
  3721. file_entry = tree.lookup_path(repo.object_store.__getitem__, tree_path)
  3722. full_path = os.path.join(os.fsencode(repo.path), tree_path)
  3723. blob = repo.object_store[file_entry[1]]
  3724. assert isinstance(blob, Blob)
  3725. mode = file_entry[0]
  3726. build_file_from_blob(blob, mode, full_path, symlink_fn=symlink_fn)
  3727. @replace_me(since="0.22.9", remove_in="0.24.0")
  3728. def checkout_branch(
  3729. repo: Union[str, os.PathLike[str], Repo],
  3730. target: Union[bytes, str],
  3731. force: bool = False,
  3732. ) -> None:
  3733. """Switch branches or restore working tree files.
  3734. This is now a wrapper around the general checkout() function.
  3735. Preserved for backward compatibility.
  3736. Args:
  3737. repo: dulwich Repo object
  3738. target: branch name or commit sha to checkout
  3739. force: true or not to force checkout
  3740. """
  3741. # Simply delegate to the new checkout function
  3742. return checkout(repo, target, force=force)
  3743. def sparse_checkout(
  3744. repo: Union[str, os.PathLike[str], Repo],
  3745. patterns: Optional[list[str]] = None,
  3746. force: bool = False,
  3747. cone: Optional[bool] = None,
  3748. ) -> None:
  3749. """Perform a sparse checkout in the repository (either 'full' or 'cone mode').
  3750. Perform sparse checkout in either 'cone' (directory-based) mode or
  3751. 'full pattern' (.gitignore) mode, depending on the ``cone`` parameter.
  3752. If ``cone`` is ``None``, the mode is inferred from the repository's
  3753. ``core.sparseCheckoutCone`` config setting.
  3754. Steps:
  3755. 1) If ``patterns`` is provided, write them to ``.git/info/sparse-checkout``.
  3756. 2) Determine which paths in the index are included vs. excluded.
  3757. - If ``cone=True``, use "cone-compatible" directory-based logic.
  3758. - If ``cone=False``, use standard .gitignore-style matching.
  3759. 3) Update the index's skip-worktree bits and add/remove files in
  3760. the working tree accordingly.
  3761. 4) If ``force=False``, refuse to remove files that have local modifications.
  3762. Args:
  3763. repo: Path to the repository or a Repo object.
  3764. patterns: Optional list of sparse-checkout patterns to write.
  3765. force: Whether to force removal of locally modified files (default False).
  3766. cone: Boolean indicating cone mode (True/False). If None, read from config.
  3767. Returns:
  3768. None
  3769. """
  3770. with open_repo_closing(repo) as repo_obj:
  3771. # --- 0) Possibly infer 'cone' from config ---
  3772. if cone is None:
  3773. cone = repo_obj.get_worktree().infer_cone_mode()
  3774. # --- 1) Read or write patterns ---
  3775. if patterns is None:
  3776. lines = repo_obj.get_worktree().get_sparse_checkout_patterns()
  3777. if lines is None:
  3778. raise Error("No sparse checkout patterns found.")
  3779. else:
  3780. lines = patterns
  3781. repo_obj.get_worktree().set_sparse_checkout_patterns(patterns)
  3782. # --- 2) Determine the set of included paths ---
  3783. index = repo_obj.open_index()
  3784. included_paths = determine_included_paths(index, lines, cone)
  3785. # --- 3) Apply those results to the index & working tree ---
  3786. try:
  3787. apply_included_paths(repo_obj, included_paths, force=force)
  3788. except SparseCheckoutConflictError as exc:
  3789. raise CheckoutError(*exc.args) from exc
  3790. def cone_mode_init(repo: Union[str, os.PathLike[str], Repo]) -> None:
  3791. """Initialize a repository to use sparse checkout in 'cone' mode.
  3792. Sets ``core.sparseCheckout`` and ``core.sparseCheckoutCone`` in the config.
  3793. Writes an initial ``.git/info/sparse-checkout`` file that includes only
  3794. top-level files (and excludes all subdirectories), e.g. ``["/*", "!/*/"]``.
  3795. Then performs a sparse checkout to update the working tree accordingly.
  3796. If no directories are specified, then only top-level files are included:
  3797. https://git-scm.com/docs/git-sparse-checkout#_internalscone_mode_handling
  3798. Args:
  3799. repo: Path to the repository or a Repo object.
  3800. Returns:
  3801. None
  3802. """
  3803. with open_repo_closing(repo) as repo_obj:
  3804. repo_obj.get_worktree().configure_for_cone_mode()
  3805. patterns = ["/*", "!/*/"] # root-level files only
  3806. sparse_checkout(repo_obj, patterns, force=True, cone=True)
  3807. def cone_mode_set(
  3808. repo: Union[str, os.PathLike[str], Repo], dirs: Sequence[str], force: bool = False
  3809. ) -> None:
  3810. """Overwrite the existing 'cone-mode' sparse patterns with a new set of directories.
  3811. Ensures ``core.sparseCheckout`` and ``core.sparseCheckoutCone`` are enabled.
  3812. Writes new patterns so that only the specified directories (and top-level files)
  3813. remain in the working tree, and applies the sparse checkout update.
  3814. Args:
  3815. repo: Path to the repository or a Repo object.
  3816. dirs: List of directory names to include.
  3817. force: Whether to forcibly discard local modifications (default False).
  3818. Returns:
  3819. None
  3820. """
  3821. with open_repo_closing(repo) as repo_obj:
  3822. repo_obj.get_worktree().configure_for_cone_mode()
  3823. repo_obj.get_worktree().set_cone_mode_patterns(dirs=dirs)
  3824. new_patterns = repo_obj.get_worktree().get_sparse_checkout_patterns()
  3825. # Finally, apply the patterns and update the working tree
  3826. sparse_checkout(repo_obj, new_patterns, force=force, cone=True)
  3827. def cone_mode_add(
  3828. repo: Union[str, os.PathLike[str], Repo], dirs: Sequence[str], force: bool = False
  3829. ) -> None:
  3830. """Add new directories to the existing 'cone-mode' sparse-checkout patterns.
  3831. Reads the current patterns from ``.git/info/sparse-checkout``, adds pattern
  3832. lines to include the specified directories, and then performs a sparse
  3833. checkout to update the working tree accordingly.
  3834. Args:
  3835. repo: Path to the repository or a Repo object.
  3836. dirs: List of directory names to add to the sparse-checkout.
  3837. force: Whether to forcibly discard local modifications (default False).
  3838. Returns:
  3839. None
  3840. """
  3841. with open_repo_closing(repo) as repo_obj:
  3842. repo_obj.get_worktree().configure_for_cone_mode()
  3843. # Do not pass base patterns as dirs
  3844. base_patterns = ["/*", "!/*/"]
  3845. existing_dirs = [
  3846. pat.strip("/")
  3847. for pat in repo_obj.get_worktree().get_sparse_checkout_patterns()
  3848. if pat not in base_patterns
  3849. ]
  3850. added_dirs = existing_dirs + list(dirs or [])
  3851. repo_obj.get_worktree().set_cone_mode_patterns(dirs=added_dirs)
  3852. new_patterns = repo_obj.get_worktree().get_sparse_checkout_patterns()
  3853. sparse_checkout(repo_obj, patterns=new_patterns, force=force, cone=True)
  3854. def check_mailmap(repo: RepoPath, contact: Union[str, bytes]) -> bytes:
  3855. """Check canonical name and email of contact.
  3856. Args:
  3857. repo: Path to the repository
  3858. contact: Contact name and/or email
  3859. Returns: Canonical contact data
  3860. """
  3861. with open_repo_closing(repo) as r:
  3862. from .mailmap import Mailmap
  3863. try:
  3864. mailmap = Mailmap.from_path(os.path.join(r.path, ".mailmap"))
  3865. except FileNotFoundError:
  3866. mailmap = Mailmap()
  3867. contact_bytes = (
  3868. contact.encode(DEFAULT_ENCODING) if isinstance(contact, str) else contact
  3869. )
  3870. result = mailmap.lookup(contact_bytes)
  3871. if isinstance(result, bytes):
  3872. return result
  3873. else:
  3874. # Convert tuple back to bytes format
  3875. name, email = result
  3876. if name is None:
  3877. name = b""
  3878. if email is None:
  3879. email = b""
  3880. return name + b" <" + email + b">"
  3881. def fsck(repo: RepoPath) -> Iterator[tuple[bytes, Exception]]:
  3882. """Check a repository.
  3883. Args:
  3884. repo: A path to the repository
  3885. Returns: Iterator over errors/warnings
  3886. """
  3887. with open_repo_closing(repo) as r:
  3888. # TODO(jelmer): check pack files
  3889. # TODO(jelmer): check graph
  3890. # TODO(jelmer): check refs
  3891. for sha in r.object_store:
  3892. o = r.object_store[sha]
  3893. try:
  3894. o.check()
  3895. except Exception as e:
  3896. yield (sha, e)
  3897. def stash_list(
  3898. repo: Union[str, os.PathLike[str], Repo],
  3899. ) -> Iterator[tuple[int, tuple[bytes, bytes]]]:
  3900. """List all stashes in a repository."""
  3901. with open_repo_closing(repo) as r:
  3902. from .stash import Stash
  3903. stash = Stash.from_repo(r)
  3904. entries = stash.stashes()
  3905. # Convert Entry objects to (old_sha, new_sha) tuples
  3906. return enumerate([(entry.old_sha, entry.new_sha) for entry in entries])
  3907. def stash_push(repo: Union[str, os.PathLike[str], Repo]) -> None:
  3908. """Push a new stash onto the stack."""
  3909. with open_repo_closing(repo) as r:
  3910. from .stash import Stash
  3911. stash = Stash.from_repo(r)
  3912. stash.push()
  3913. def stash_pop(repo: Union[str, os.PathLike[str], Repo]) -> None:
  3914. """Pop a stash from the stack."""
  3915. with open_repo_closing(repo) as r:
  3916. from .stash import Stash
  3917. stash = Stash.from_repo(r)
  3918. stash.pop(0)
  3919. def stash_drop(repo: Union[str, os.PathLike[str], Repo], index: int) -> None:
  3920. """Drop a stash from the stack."""
  3921. with open_repo_closing(repo) as r:
  3922. from .stash import Stash
  3923. stash = Stash.from_repo(r)
  3924. stash.drop(index)
  3925. def ls_files(repo: RepoPath) -> list[bytes]:
  3926. """List all files in an index."""
  3927. with open_repo_closing(repo) as r:
  3928. return sorted(r.open_index())
  3929. def find_unique_abbrev(
  3930. object_store: BaseObjectStore, object_id: Union[str, bytes], min_length: int = 7
  3931. ) -> str:
  3932. """Find the shortest unique abbreviation for an object ID.
  3933. Args:
  3934. object_store: Object store to search in
  3935. object_id: The full object ID to abbreviate
  3936. min_length: Minimum length of abbreviation (default 7)
  3937. Returns:
  3938. The shortest unique prefix of the object ID (at least min_length chars)
  3939. """
  3940. if isinstance(object_id, bytes):
  3941. hex_id = object_id.decode("ascii")
  3942. else:
  3943. hex_id = object_id
  3944. # Start with minimum length
  3945. for length in range(min_length, len(hex_id) + 1):
  3946. prefix = hex_id[:length]
  3947. matches = 0
  3948. # Check if this prefix is unique
  3949. for obj_id in object_store:
  3950. if obj_id.decode("ascii").startswith(prefix):
  3951. matches += 1
  3952. if matches > 1:
  3953. # Not unique, need more characters
  3954. break
  3955. if matches == 1:
  3956. # Found unique prefix
  3957. return prefix
  3958. # If we get here, return the full ID
  3959. return hex_id
  3960. def describe(
  3961. repo: Union[str, os.PathLike[str], Repo], abbrev: Optional[int] = None
  3962. ) -> str:
  3963. """Describe the repository version.
  3964. Args:
  3965. repo: git repository
  3966. abbrev: number of characters of commit to take, default is 7
  3967. Returns: a string description of the current git revision
  3968. Examples: "gabcdefh", "v0.1" or "v0.1-5-gabcdefh".
  3969. """
  3970. abbrev_slice = slice(0, abbrev if abbrev is not None else 7)
  3971. # Get the repository
  3972. with open_repo_closing(repo) as r:
  3973. # Get a list of all tags
  3974. refs = r.get_refs()
  3975. tags = {}
  3976. for key, value in refs.items():
  3977. key_str = key.decode()
  3978. obj = r.get_object(value)
  3979. if "tags" not in key_str:
  3980. continue
  3981. _, tag = key_str.rsplit("/", 1)
  3982. if isinstance(obj, Tag):
  3983. # Annotated tag case
  3984. commit = r.get_object(obj.object[1])
  3985. else:
  3986. # Lightweight tag case - obj is already the commit
  3987. commit = obj
  3988. if not isinstance(commit, Commit):
  3989. raise AssertionError(
  3990. f"Expected Commit object, got {type(commit).__name__}"
  3991. )
  3992. tag_info: list[Any] = [
  3993. datetime.datetime(*time.gmtime(commit.commit_time)[:6]),
  3994. commit.id.decode("ascii"),
  3995. ]
  3996. tags[tag] = tag_info
  3997. # Sort tags by datetime (first element of the value list)
  3998. sorted_tags = sorted(
  3999. tags.items(), key=lambda tag_item: tag_item[1][0], reverse=True
  4000. )
  4001. # Get the latest commit
  4002. latest_commit = r[r.head()]
  4003. # If there are no tags, return the latest commit
  4004. if len(sorted_tags) == 0:
  4005. if abbrev is not None:
  4006. return "g{}".format(latest_commit.id.decode("ascii")[abbrev_slice])
  4007. return f"g{find_unique_abbrev(r.object_store, latest_commit.id)}"
  4008. # We're now 0 commits from the top
  4009. commit_count = 0
  4010. # Walk through all commits
  4011. walker = r.get_walker()
  4012. for entry in walker:
  4013. # Check if tag
  4014. commit_id = entry.commit.id.decode("ascii")
  4015. for tag_item in sorted_tags:
  4016. tag_name = tag_item[0]
  4017. tag_commit = tag_item[1][1]
  4018. if commit_id == tag_commit:
  4019. if commit_count == 0:
  4020. return tag_name
  4021. else:
  4022. if abbrev is not None:
  4023. abbrev_hash = latest_commit.id.decode("ascii")[abbrev_slice]
  4024. else:
  4025. abbrev_hash = find_unique_abbrev(
  4026. r.object_store, latest_commit.id
  4027. )
  4028. return f"{tag_name}-{commit_count}-g{abbrev_hash}"
  4029. commit_count += 1
  4030. # Return plain commit if no parent tag can be found
  4031. if abbrev is not None:
  4032. return "g{}".format(latest_commit.id.decode("ascii")[abbrev_slice])
  4033. return f"g{find_unique_abbrev(r.object_store, latest_commit.id)}"
  4034. def get_object_by_path(
  4035. repo: RepoPath,
  4036. path: Union[str, bytes],
  4037. committish: Optional[Union[str, bytes, Commit, Tag]] = None,
  4038. ) -> Union[Blob, Tree, Commit, Tag]:
  4039. """Get an object by path.
  4040. Args:
  4041. repo: A path to the repository
  4042. path: Path to look up
  4043. committish: Commit to look up path in
  4044. Returns: A `ShaFile` object
  4045. """
  4046. if committish is None:
  4047. committish = "HEAD"
  4048. # Get the repository
  4049. with open_repo_closing(repo) as r:
  4050. commit = parse_commit(r, committish)
  4051. base_tree = commit.tree
  4052. if not isinstance(path, bytes):
  4053. path = commit_encode(commit, path)
  4054. (_mode, sha) = tree_lookup_path(r.object_store.__getitem__, base_tree, path)
  4055. obj = r[sha]
  4056. assert isinstance(obj, (Blob, Tree, Commit, Tag))
  4057. return obj
  4058. def write_tree(repo: RepoPath) -> bytes:
  4059. """Write a tree object from the index.
  4060. Args:
  4061. repo: Repository for which to write tree
  4062. Returns: tree id for the tree that was written
  4063. """
  4064. with open_repo_closing(repo) as r:
  4065. return r.open_index().commit(r.object_store)
  4066. def _do_merge(
  4067. r: Repo,
  4068. merge_commit_id: bytes,
  4069. no_commit: bool = False,
  4070. no_ff: bool = False,
  4071. message: Optional[bytes] = None,
  4072. author: Optional[bytes] = None,
  4073. committer: Optional[bytes] = None,
  4074. ) -> tuple[Optional[bytes], list[bytes]]:
  4075. """Internal merge implementation that operates on an open repository.
  4076. Args:
  4077. r: Open repository object
  4078. merge_commit_id: SHA of commit to merge
  4079. no_commit: If True, do not create a merge commit
  4080. no_ff: If True, force creation of a merge commit
  4081. message: Optional merge commit message
  4082. author: Optional author for merge commit
  4083. committer: Optional committer for merge commit
  4084. Returns:
  4085. Tuple of (merge_commit_sha, conflicts) where merge_commit_sha is None
  4086. if no_commit=True or there were conflicts
  4087. """
  4088. from .graph import find_merge_base
  4089. from .merge import three_way_merge
  4090. # Get HEAD commit
  4091. try:
  4092. head_commit_id = r.refs[b"HEAD"]
  4093. except KeyError:
  4094. raise Error("No HEAD reference found")
  4095. head_commit = r[head_commit_id]
  4096. assert isinstance(head_commit, Commit), "Expected a Commit object"
  4097. merge_commit = r[merge_commit_id]
  4098. assert isinstance(merge_commit, Commit), "Expected a Commit object"
  4099. # Check if fast-forward is possible
  4100. merge_bases = find_merge_base(r, [head_commit_id, merge_commit_id])
  4101. if not merge_bases:
  4102. raise Error("No common ancestor found")
  4103. # Use the first merge base
  4104. base_commit_id = merge_bases[0]
  4105. # Check if we're trying to merge the same commit
  4106. if head_commit_id == merge_commit_id:
  4107. # Already up to date
  4108. return (None, [])
  4109. # Check for fast-forward
  4110. if base_commit_id == head_commit_id and not no_ff:
  4111. # Fast-forward merge
  4112. r.refs[b"HEAD"] = merge_commit_id
  4113. # Update the working directory
  4114. changes = tree_changes(r.object_store, head_commit.tree, merge_commit.tree)
  4115. update_working_tree(
  4116. r, head_commit.tree, merge_commit.tree, change_iterator=changes
  4117. )
  4118. return (merge_commit_id, [])
  4119. if base_commit_id == merge_commit_id:
  4120. # Already up to date
  4121. return (None, [])
  4122. # Perform three-way merge
  4123. base_commit = r[base_commit_id]
  4124. assert isinstance(base_commit, Commit), "Expected a Commit object"
  4125. gitattributes = r.get_gitattributes()
  4126. config = r.get_config()
  4127. merged_tree, conflicts = three_way_merge(
  4128. r.object_store, base_commit, head_commit, merge_commit, gitattributes, config
  4129. )
  4130. # Add merged tree to object store
  4131. r.object_store.add_object(merged_tree)
  4132. # Update index and working directory
  4133. changes = tree_changes(r.object_store, head_commit.tree, merged_tree.id)
  4134. update_working_tree(r, head_commit.tree, merged_tree.id, change_iterator=changes)
  4135. if conflicts or no_commit:
  4136. # Don't create a commit if there are conflicts or no_commit is True
  4137. return (None, conflicts)
  4138. # Create merge commit
  4139. merge_commit_obj = Commit()
  4140. merge_commit_obj.tree = merged_tree.id
  4141. merge_commit_obj.parents = [head_commit_id, merge_commit_id]
  4142. # Set author/committer
  4143. if author is None:
  4144. author = get_user_identity(r.get_config_stack())
  4145. if committer is None:
  4146. committer = author
  4147. merge_commit_obj.author = author
  4148. merge_commit_obj.committer = committer
  4149. # Set timestamps
  4150. timestamp = int(time.time())
  4151. timezone = 0 # UTC
  4152. merge_commit_obj.author_time = timestamp
  4153. merge_commit_obj.author_timezone = timezone
  4154. merge_commit_obj.commit_time = timestamp
  4155. merge_commit_obj.commit_timezone = timezone
  4156. # Set commit message
  4157. if message is None:
  4158. message = f"Merge commit '{merge_commit_id.decode()[:7]}'\n".encode()
  4159. merge_commit_obj.message = message.encode() if isinstance(message, str) else message
  4160. # Add commit to object store
  4161. r.object_store.add_object(merge_commit_obj)
  4162. # Update HEAD
  4163. r.refs[b"HEAD"] = merge_commit_obj.id
  4164. return (merge_commit_obj.id, [])
  4165. def merge(
  4166. repo: Union[str, os.PathLike[str], Repo],
  4167. committish: Union[str, bytes, Commit, Tag],
  4168. no_commit: bool = False,
  4169. no_ff: bool = False,
  4170. message: Optional[bytes] = None,
  4171. author: Optional[bytes] = None,
  4172. committer: Optional[bytes] = None,
  4173. ) -> tuple[Optional[bytes], list[bytes]]:
  4174. """Merge a commit into the current branch.
  4175. Args:
  4176. repo: Repository to merge into
  4177. committish: Commit to merge
  4178. no_commit: If True, do not create a merge commit
  4179. no_ff: If True, force creation of a merge commit
  4180. message: Optional merge commit message
  4181. author: Optional author for merge commit
  4182. committer: Optional committer for merge commit
  4183. Returns:
  4184. Tuple of (merge_commit_sha, conflicts) where merge_commit_sha is None
  4185. if no_commit=True or there were conflicts
  4186. Raises:
  4187. Error: If there is no HEAD reference or commit cannot be found
  4188. """
  4189. with open_repo_closing(repo) as r:
  4190. # Parse the commit to merge
  4191. try:
  4192. merge_commit_id = parse_commit(r, committish).id
  4193. except KeyError:
  4194. raise Error(
  4195. f"Cannot find commit '{committish.decode() if isinstance(committish, bytes) else committish}'"
  4196. )
  4197. result = _do_merge(
  4198. r, merge_commit_id, no_commit, no_ff, message, author, committer
  4199. )
  4200. # Trigger auto GC if needed
  4201. from .gc import maybe_auto_gc
  4202. maybe_auto_gc(r)
  4203. return result
  4204. def unpack_objects(
  4205. pack_path: Union[str, os.PathLike[str]], target: Union[str, os.PathLike[str]] = "."
  4206. ) -> int:
  4207. """Unpack objects from a pack file into the repository.
  4208. Args:
  4209. pack_path: Path to the pack file to unpack
  4210. target: Path to the repository to unpack into
  4211. Returns:
  4212. Number of objects unpacked
  4213. """
  4214. from .pack import Pack
  4215. with open_repo_closing(target) as r:
  4216. pack_basename = os.path.splitext(pack_path)[0]
  4217. with Pack(pack_basename) as pack:
  4218. count = 0
  4219. for unpacked in pack.iter_unpacked():
  4220. obj = unpacked.sha_file()
  4221. r.object_store.add_object(obj)
  4222. count += 1
  4223. return count
  4224. def merge_tree(
  4225. repo: RepoPath,
  4226. base_tree: Optional[Union[str, bytes, Tree, Commit, Tag]],
  4227. our_tree: Union[str, bytes, Tree, Commit, Tag],
  4228. their_tree: Union[str, bytes, Tree, Commit, Tag],
  4229. ) -> tuple[bytes, list[bytes]]:
  4230. """Perform a three-way tree merge without touching the working directory.
  4231. This is similar to git merge-tree, performing a merge at the tree level
  4232. without creating commits or updating any references.
  4233. Args:
  4234. repo: Repository containing the trees
  4235. base_tree: Tree-ish of the common ancestor (or None for no common ancestor)
  4236. our_tree: Tree-ish of our side of the merge
  4237. their_tree: Tree-ish of their side of the merge
  4238. Returns:
  4239. tuple: A tuple of (merged_tree_id, conflicts) where:
  4240. - merged_tree_id is the SHA-1 of the merged tree
  4241. - conflicts is a list of paths (as bytes) that had conflicts
  4242. Raises:
  4243. KeyError: If any of the tree-ish arguments cannot be resolved
  4244. """
  4245. from .merge import Merger
  4246. with open_repo_closing(repo) as r:
  4247. # Resolve tree-ish arguments to actual trees
  4248. base = parse_tree(r, base_tree) if base_tree else None
  4249. ours = parse_tree(r, our_tree)
  4250. theirs = parse_tree(r, their_tree)
  4251. # Perform the merge
  4252. gitattributes = r.get_gitattributes()
  4253. config = r.get_config()
  4254. merger = Merger(r.object_store, gitattributes, config)
  4255. merged_tree, conflicts = merger.merge_trees(base, ours, theirs)
  4256. # Add the merged tree to the object store
  4257. r.object_store.add_object(merged_tree)
  4258. return merged_tree.id, conflicts
  4259. def cherry_pick( # noqa: D417
  4260. repo: Union[str, os.PathLike[str], Repo],
  4261. committish: Union[str, bytes, Commit, Tag, None],
  4262. no_commit: bool = False,
  4263. continue_: bool = False,
  4264. abort: bool = False,
  4265. ) -> Optional[bytes]:
  4266. r"""Cherry-pick a commit onto the current branch.
  4267. Args:
  4268. repo: Repository to cherry-pick into
  4269. committish: Commit to cherry-pick (can be None only when resuming or aborting)
  4270. no_commit: If True, do not create a commit after applying changes
  4271. ``continue_``: Resume an in-progress cherry-pick after resolving conflicts if True
  4272. abort: Abort an in-progress cherry-pick
  4273. Returns:
  4274. The SHA of the newly created commit, or None if no_commit=True or there were conflicts
  4275. Raises:
  4276. Error: If there is no HEAD reference, commit cannot be found, or operation fails
  4277. """
  4278. from .merge import three_way_merge
  4279. # Validate that committish is provided when needed
  4280. if not (continue_ or abort) and committish is None:
  4281. raise ValueError("committish is required when not using --continue or --abort")
  4282. with open_repo_closing(repo) as r:
  4283. # Handle abort
  4284. if abort:
  4285. # Clean up any cherry-pick state
  4286. try:
  4287. os.remove(os.path.join(r.controldir(), "CHERRY_PICK_HEAD"))
  4288. except FileNotFoundError:
  4289. pass
  4290. try:
  4291. os.remove(os.path.join(r.controldir(), "MERGE_MSG"))
  4292. except FileNotFoundError:
  4293. pass
  4294. # Reset index to HEAD
  4295. head_commit = r[b"HEAD"]
  4296. assert isinstance(head_commit, Commit)
  4297. r.get_worktree().reset_index(head_commit.tree)
  4298. return None
  4299. # Handle continue
  4300. if continue_:
  4301. # Check if there's a cherry-pick in progress
  4302. cherry_pick_head_path = os.path.join(r.controldir(), "CHERRY_PICK_HEAD")
  4303. try:
  4304. with open(cherry_pick_head_path, "rb") as f:
  4305. cherry_pick_commit_id = f.read().strip()
  4306. cherry_pick_commit = r[cherry_pick_commit_id]
  4307. except FileNotFoundError:
  4308. raise Error("No cherry-pick in progress")
  4309. # Check for unresolved conflicts
  4310. if r.open_index().has_conflicts():
  4311. raise Error("Unresolved conflicts remain")
  4312. # Create the commit
  4313. tree_id = r.open_index().commit(r.object_store)
  4314. # Read saved message if any
  4315. merge_msg_path = os.path.join(r.controldir(), "MERGE_MSG")
  4316. try:
  4317. with open(merge_msg_path, "rb") as f:
  4318. message = f.read()
  4319. except FileNotFoundError:
  4320. assert isinstance(cherry_pick_commit, Commit)
  4321. message = cherry_pick_commit.message
  4322. assert isinstance(cherry_pick_commit, Commit)
  4323. new_commit = r.get_worktree().commit(
  4324. message=message,
  4325. tree=tree_id,
  4326. author=cherry_pick_commit.author,
  4327. author_timestamp=cherry_pick_commit.author_time,
  4328. author_timezone=cherry_pick_commit.author_timezone,
  4329. )
  4330. # Clean up state files
  4331. try:
  4332. os.remove(cherry_pick_head_path)
  4333. except FileNotFoundError:
  4334. pass
  4335. try:
  4336. os.remove(merge_msg_path)
  4337. except FileNotFoundError:
  4338. pass
  4339. return new_commit
  4340. # Normal cherry-pick operation
  4341. # Get current HEAD
  4342. try:
  4343. head_commit = r[b"HEAD"]
  4344. except KeyError:
  4345. raise Error("No HEAD reference found")
  4346. # Parse the commit to cherry-pick
  4347. # committish cannot be None here due to validation above
  4348. assert committish is not None
  4349. try:
  4350. cherry_pick_commit = parse_commit(r, committish)
  4351. except KeyError:
  4352. raise Error(
  4353. f"Cannot find commit '{committish.decode() if isinstance(committish, bytes) else committish}'"
  4354. )
  4355. # Check if commit has parents
  4356. assert isinstance(cherry_pick_commit, Commit)
  4357. if not cherry_pick_commit.parents:
  4358. raise Error("Cannot cherry-pick root commit")
  4359. # Get parent of cherry-pick commit
  4360. parent_commit = r[cherry_pick_commit.parents[0]]
  4361. assert isinstance(parent_commit, Commit)
  4362. # Perform three-way merge
  4363. assert isinstance(head_commit, Commit)
  4364. merged_tree, conflicts = three_way_merge(
  4365. r.object_store, parent_commit, head_commit, cherry_pick_commit
  4366. )
  4367. # Add merged tree to object store
  4368. r.object_store.add_object(merged_tree)
  4369. # Update working tree and index
  4370. # Reset index to match merged tree
  4371. r.get_worktree().reset_index(merged_tree.id)
  4372. # Update working tree from the new index
  4373. # Allow overwriting because we're applying the merge result
  4374. assert isinstance(head_commit, Commit)
  4375. changes = tree_changes(r.object_store, head_commit.tree, merged_tree.id)
  4376. update_working_tree(
  4377. r,
  4378. head_commit.tree,
  4379. merged_tree.id,
  4380. change_iterator=changes,
  4381. allow_overwrite_modified=True,
  4382. )
  4383. if conflicts:
  4384. # Save state for later continuation
  4385. with open(os.path.join(r.controldir(), "CHERRY_PICK_HEAD"), "wb") as f:
  4386. f.write(cherry_pick_commit.id + b"\n")
  4387. # Save commit message
  4388. with open(os.path.join(r.controldir(), "MERGE_MSG"), "wb") as f:
  4389. f.write(cherry_pick_commit.message)
  4390. raise Error(
  4391. f"Conflicts in: {', '.join(c.decode('utf-8', 'replace') for c in conflicts)}\n"
  4392. f"Fix conflicts and run 'dulwich cherry-pick --continue'"
  4393. )
  4394. if no_commit:
  4395. return None
  4396. # Create the commit
  4397. new_commit = r.get_worktree().commit(
  4398. message=cherry_pick_commit.message,
  4399. tree=merged_tree.id,
  4400. author=cherry_pick_commit.author,
  4401. author_timestamp=cherry_pick_commit.author_time,
  4402. author_timezone=cherry_pick_commit.author_timezone,
  4403. )
  4404. return new_commit
  4405. def revert(
  4406. repo: Union[str, os.PathLike[str], Repo],
  4407. commits: Union[str, bytes, Commit, Tag, Sequence[Union[str, bytes, Commit, Tag]]],
  4408. no_commit: bool = False,
  4409. message: Optional[Union[str, bytes]] = None,
  4410. author: Optional[bytes] = None,
  4411. committer: Optional[bytes] = None,
  4412. ) -> Optional[bytes]:
  4413. """Revert one or more commits.
  4414. This creates a new commit that undoes the changes introduced by the
  4415. specified commits. Unlike reset, revert creates a new commit that
  4416. preserves history.
  4417. Args:
  4418. repo: Path to repository or repository object
  4419. commits: List of commit-ish (SHA, ref, etc.) to revert, or a single commit-ish
  4420. no_commit: If True, apply changes to index/working tree but don't commit
  4421. message: Optional commit message (default: "Revert <original subject>")
  4422. author: Optional author for revert commit
  4423. committer: Optional committer for revert commit
  4424. Returns:
  4425. SHA1 of the new revert commit, or None if no_commit=True
  4426. Raises:
  4427. Error: If revert fails due to conflicts or other issues
  4428. """
  4429. from .merge import three_way_merge
  4430. # Normalize commits to a list
  4431. if isinstance(commits, (str, bytes, Commit, Tag)):
  4432. commits = [commits]
  4433. with open_repo_closing(repo) as r:
  4434. # Convert string refs to bytes
  4435. commits_to_revert = []
  4436. for commit_ref in commits:
  4437. if isinstance(commit_ref, str):
  4438. commit_ref = commit_ref.encode("utf-8")
  4439. commit = parse_commit(r, commit_ref)
  4440. commits_to_revert.append(commit)
  4441. # Get current HEAD
  4442. try:
  4443. head_commit_id = r.refs[b"HEAD"]
  4444. except KeyError:
  4445. raise Error("No HEAD reference found")
  4446. head_commit = r[head_commit_id]
  4447. assert isinstance(head_commit, Commit)
  4448. current_tree = head_commit.tree
  4449. # Process commits in order
  4450. for commit_to_revert in commits_to_revert:
  4451. # For revert, we want to apply the inverse of the commit
  4452. # This means using the commit's tree as "base" and its parent as "theirs"
  4453. if not commit_to_revert.parents:
  4454. raise Error(
  4455. f"Cannot revert commit {commit_to_revert.id.decode() if isinstance(commit_to_revert.id, bytes) else commit_to_revert.id} - it has no parents"
  4456. )
  4457. # For simplicity, we only handle commits with one parent (no merge commits)
  4458. if len(commit_to_revert.parents) > 1:
  4459. raise Error(
  4460. f"Cannot revert merge commit {commit_to_revert.id.decode() if isinstance(commit_to_revert.id, bytes) else commit_to_revert.id} - not yet implemented"
  4461. )
  4462. parent_commit = r[commit_to_revert.parents[0]]
  4463. assert isinstance(parent_commit, Commit)
  4464. # Perform three-way merge:
  4465. # - base: the commit we're reverting (what we want to remove)
  4466. # - ours: current HEAD (what we have now)
  4467. # - theirs: parent of commit being reverted (what we want to go back to)
  4468. assert isinstance(commit_to_revert, Commit)
  4469. head_for_merge = r[head_commit_id]
  4470. assert isinstance(head_for_merge, Commit)
  4471. merged_tree, conflicts = three_way_merge(
  4472. r.object_store,
  4473. commit_to_revert, # base
  4474. head_for_merge, # ours
  4475. parent_commit, # theirs
  4476. )
  4477. if conflicts:
  4478. # Update working tree with conflicts
  4479. changes = tree_changes(r.object_store, current_tree, merged_tree.id)
  4480. update_working_tree(
  4481. r, current_tree, merged_tree.id, change_iterator=changes
  4482. )
  4483. conflicted_paths = [c.decode("utf-8", "replace") for c in conflicts]
  4484. raise Error(f"Conflicts while reverting: {', '.join(conflicted_paths)}")
  4485. # Add merged tree to object store
  4486. r.object_store.add_object(merged_tree)
  4487. # Update working tree
  4488. changes = tree_changes(r.object_store, current_tree, merged_tree.id)
  4489. update_working_tree(
  4490. r, current_tree, merged_tree.id, change_iterator=changes
  4491. )
  4492. current_tree = merged_tree.id
  4493. if not no_commit:
  4494. # Create revert commit
  4495. revert_commit = Commit()
  4496. revert_commit.tree = merged_tree.id
  4497. revert_commit.parents = [head_commit_id]
  4498. # Set author/committer
  4499. if author is None:
  4500. author = get_user_identity(r.get_config_stack())
  4501. if committer is None:
  4502. committer = author
  4503. revert_commit.author = author
  4504. revert_commit.committer = committer
  4505. # Set timestamps
  4506. timestamp = int(time.time())
  4507. timezone = 0 # UTC
  4508. revert_commit.author_time = timestamp
  4509. revert_commit.author_timezone = timezone
  4510. revert_commit.commit_time = timestamp
  4511. revert_commit.commit_timezone = timezone
  4512. # Set message
  4513. if message is None:
  4514. # Extract original commit subject
  4515. original_message = commit_to_revert.message
  4516. if isinstance(original_message, bytes):
  4517. original_message = original_message.decode("utf-8", "replace")
  4518. subject = original_message.split("\n")[0]
  4519. message = f'Revert "{subject}"\n\nThis reverts commit {commit_to_revert.id.decode("ascii")}.'.encode()
  4520. elif isinstance(message, str):
  4521. message = message.encode("utf-8")
  4522. revert_commit.message = message
  4523. # Add commit to object store
  4524. r.object_store.add_object(revert_commit)
  4525. # Update HEAD
  4526. r.refs[b"HEAD"] = revert_commit.id
  4527. head_commit_id = revert_commit.id
  4528. return head_commit_id if not no_commit else None
  4529. def gc(
  4530. repo: RepoPath,
  4531. auto: bool = False,
  4532. aggressive: bool = False,
  4533. prune: bool = True,
  4534. grace_period: Optional[int] = 1209600, # 2 weeks default
  4535. dry_run: bool = False,
  4536. progress: Optional[Callable[[str], None]] = None,
  4537. ) -> "GCStats":
  4538. """Run garbage collection on a repository.
  4539. Args:
  4540. repo: Path to the repository or a Repo object
  4541. auto: If True, only run gc if needed
  4542. aggressive: If True, use more aggressive settings
  4543. prune: If True, prune unreachable objects
  4544. grace_period: Grace period in seconds for pruning (default 2 weeks)
  4545. dry_run: If True, only report what would be done
  4546. progress: Optional progress callback
  4547. Returns:
  4548. GCStats object with garbage collection statistics
  4549. """
  4550. from .gc import garbage_collect
  4551. with open_repo_closing(repo) as r:
  4552. return garbage_collect(
  4553. r,
  4554. auto=auto,
  4555. aggressive=aggressive,
  4556. prune=prune,
  4557. grace_period=grace_period,
  4558. dry_run=dry_run,
  4559. progress=progress,
  4560. )
  4561. def prune(
  4562. repo: RepoPath,
  4563. grace_period: Optional[int] = None,
  4564. dry_run: bool = False,
  4565. progress: Optional[Callable[[str], None]] = None,
  4566. ) -> None:
  4567. """Prune/clean up a repository's object store.
  4568. This removes temporary files that were left behind by interrupted
  4569. pack operations.
  4570. Args:
  4571. repo: Path to the repository or a Repo object
  4572. grace_period: Grace period in seconds for removing temporary files
  4573. (default 2 weeks)
  4574. dry_run: If True, only report what would be done
  4575. progress: Optional progress callback
  4576. """
  4577. with open_repo_closing(repo) as r:
  4578. if progress:
  4579. progress("Pruning temporary files")
  4580. if not dry_run:
  4581. r.object_store.prune(grace_period=grace_period)
  4582. def count_objects(repo: RepoPath = ".", verbose: bool = False) -> CountObjectsResult:
  4583. """Count unpacked objects and their disk usage.
  4584. Args:
  4585. repo: Path to repository or repository object
  4586. verbose: Whether to return verbose information
  4587. Returns:
  4588. CountObjectsResult object with detailed statistics
  4589. """
  4590. with open_repo_closing(repo) as r:
  4591. object_store = r.object_store
  4592. # Count loose objects
  4593. loose_count = 0
  4594. loose_size = 0
  4595. for sha in object_store._iter_loose_objects():
  4596. loose_count += 1
  4597. from .object_store import DiskObjectStore
  4598. assert isinstance(object_store, DiskObjectStore)
  4599. path = object_store._get_shafile_path(sha)
  4600. try:
  4601. stat_info = os.stat(path)
  4602. # Git uses disk usage, not file size. st_blocks is always in
  4603. # 512-byte blocks per POSIX standard
  4604. st_blocks = getattr(stat_info, "st_blocks", None)
  4605. if st_blocks is not None:
  4606. # Available on Linux and macOS
  4607. loose_size += st_blocks * 512
  4608. else:
  4609. # Fallback for Windows
  4610. loose_size += stat_info.st_size
  4611. except FileNotFoundError:
  4612. # Object may have been removed between iteration and stat
  4613. pass
  4614. if not verbose:
  4615. return CountObjectsResult(count=loose_count, size=loose_size)
  4616. # Count pack information
  4617. pack_count = len(object_store.packs)
  4618. in_pack_count = 0
  4619. pack_size = 0
  4620. for pack in object_store.packs:
  4621. in_pack_count += len(pack)
  4622. # Get pack file size
  4623. pack_path = pack._data_path
  4624. try:
  4625. pack_size += os.path.getsize(pack_path)
  4626. except FileNotFoundError:
  4627. pass
  4628. # Get index file size
  4629. idx_path = pack._idx_path
  4630. try:
  4631. pack_size += os.path.getsize(idx_path)
  4632. except FileNotFoundError:
  4633. pass
  4634. return CountObjectsResult(
  4635. count=loose_count,
  4636. size=loose_size,
  4637. in_pack=in_pack_count,
  4638. packs=pack_count,
  4639. size_pack=pack_size,
  4640. )
  4641. def is_interactive_rebase(repo: Union[Repo, str]) -> bool:
  4642. """Check if an interactive rebase is in progress.
  4643. Args:
  4644. repo: Repository to check
  4645. Returns:
  4646. True if interactive rebase is in progress, False otherwise
  4647. """
  4648. with open_repo_closing(repo) as r:
  4649. state_manager = r.get_rebase_state_manager()
  4650. if not state_manager.exists():
  4651. return False
  4652. # Check if todo file exists
  4653. todo = state_manager.load_todo()
  4654. return todo is not None
  4655. def rebase(
  4656. repo: Union[Repo, str],
  4657. upstream: Union[bytes, str],
  4658. onto: Optional[Union[bytes, str]] = None,
  4659. branch: Optional[Union[bytes, str]] = None,
  4660. abort: bool = False,
  4661. continue_rebase: bool = False,
  4662. skip: bool = False,
  4663. interactive: bool = False,
  4664. edit_todo: bool = False,
  4665. ) -> list[bytes]:
  4666. """Rebase commits onto another branch.
  4667. Args:
  4668. repo: Repository to rebase in
  4669. upstream: Upstream branch/commit to rebase onto
  4670. onto: Specific commit to rebase onto (defaults to upstream)
  4671. branch: Branch to rebase (defaults to current branch)
  4672. abort: Abort an in-progress rebase
  4673. continue_rebase: Continue an in-progress rebase
  4674. skip: Skip current commit and continue rebase
  4675. interactive: Start an interactive rebase
  4676. edit_todo: Edit the todo list of an interactive rebase
  4677. Returns:
  4678. List of new commit SHAs created by rebase
  4679. Raises:
  4680. Error: If rebase fails or conflicts occur
  4681. """
  4682. from .cli import launch_editor
  4683. from .rebase import (
  4684. RebaseConflict,
  4685. RebaseError,
  4686. Rebaser,
  4687. process_interactive_rebase,
  4688. start_interactive,
  4689. )
  4690. from .rebase import (
  4691. edit_todo as edit_todo_func,
  4692. )
  4693. with open_repo_closing(repo) as r:
  4694. rebaser = Rebaser(r)
  4695. if abort:
  4696. try:
  4697. rebaser.abort()
  4698. return []
  4699. except RebaseError as e:
  4700. raise Error(str(e))
  4701. if edit_todo:
  4702. # Edit the todo list of an interactive rebase
  4703. try:
  4704. edit_todo_func(r, launch_editor)
  4705. print("Todo list updated. Continue with 'rebase --continue'")
  4706. return []
  4707. except RebaseError as e:
  4708. raise Error(str(e))
  4709. if continue_rebase:
  4710. try:
  4711. if interactive:
  4712. # Continue interactive rebase
  4713. is_complete, pause_reason = process_interactive_rebase(
  4714. r, editor_callback=launch_editor
  4715. )
  4716. if is_complete:
  4717. return [c.id for c in rebaser._done]
  4718. else:
  4719. if pause_reason == "conflict":
  4720. raise Error("Conflicts detected. Resolve and continue.")
  4721. elif pause_reason == "edit":
  4722. print("Stopped for editing. Make changes and continue.")
  4723. elif pause_reason == "break":
  4724. print("Rebase paused at break. Continue when ready.")
  4725. else:
  4726. print(f"Rebase paused: {pause_reason}")
  4727. return []
  4728. else:
  4729. # Continue regular rebase
  4730. result = rebaser.continue_()
  4731. if result is None:
  4732. # Rebase complete
  4733. return [c.id for c in rebaser._done]
  4734. elif isinstance(result, tuple) and result[1]:
  4735. # Still have conflicts
  4736. raise Error(
  4737. f"Conflicts in: {', '.join(f.decode('utf-8', 'replace') for f in result[1])}"
  4738. )
  4739. except RebaseError as e:
  4740. raise Error(str(e))
  4741. # Convert string refs to bytes
  4742. if isinstance(upstream, str):
  4743. upstream = upstream.encode("utf-8")
  4744. if isinstance(onto, str):
  4745. onto = onto.encode("utf-8") if onto else None
  4746. if isinstance(branch, str):
  4747. branch = branch.encode("utf-8") if branch else None
  4748. try:
  4749. if interactive:
  4750. # Start interactive rebase
  4751. todo = start_interactive(r, upstream, onto, branch, launch_editor)
  4752. # Process the todo list
  4753. is_complete, pause_reason = process_interactive_rebase(
  4754. r, todo, editor_callback=launch_editor
  4755. )
  4756. if is_complete:
  4757. return [c.id for c in rebaser._done]
  4758. else:
  4759. if pause_reason == "conflict":
  4760. raise Error("Conflicts detected. Resolve and continue.")
  4761. elif pause_reason == "edit":
  4762. print("Stopped for editing. Make changes and continue.")
  4763. elif pause_reason == "break":
  4764. print("Rebase paused at break. Continue when ready.")
  4765. else:
  4766. print(f"Rebase paused: {pause_reason}")
  4767. return []
  4768. else:
  4769. # Regular rebase
  4770. rebaser.start(upstream, onto, branch)
  4771. # Continue rebase automatically
  4772. result = rebaser.continue_()
  4773. if result is not None:
  4774. # Conflicts
  4775. raise RebaseConflict(result[1])
  4776. # Return the SHAs of the rebased commits
  4777. return [c.id for c in rebaser._done]
  4778. except RebaseConflict as e:
  4779. raise Error(str(e))
  4780. except RebaseError as e:
  4781. raise Error(str(e))
  4782. def annotate(
  4783. repo: RepoPath,
  4784. path: Union[str, bytes],
  4785. committish: Optional[Union[str, bytes, Commit, Tag]] = None,
  4786. ) -> list[tuple[tuple[Commit, TreeEntry], bytes]]:
  4787. """Annotate the history of a file.
  4788. :param repo: Path to the repository
  4789. :param path: Path to annotate
  4790. :param committish: Commit id to find path in
  4791. :return: List of ((Commit, TreeChange), line) tuples
  4792. """
  4793. if committish is None:
  4794. committish = "HEAD"
  4795. from dulwich.annotate import annotate_lines
  4796. with open_repo_closing(repo) as r:
  4797. commit_id = parse_commit(r, committish).id
  4798. # Ensure path is bytes
  4799. if isinstance(path, str):
  4800. path = path.encode()
  4801. return annotate_lines(r.object_store, commit_id, path)
  4802. blame = annotate
  4803. def filter_branch(
  4804. repo: RepoPath = ".",
  4805. branch: Union[str, bytes] = "HEAD",
  4806. *,
  4807. filter_fn: Optional[Callable[[Commit], Optional["CommitData"]]] = None,
  4808. filter_author: Optional[Callable[[bytes], Optional[bytes]]] = None,
  4809. filter_committer: Optional[Callable[[bytes], Optional[bytes]]] = None,
  4810. filter_message: Optional[Callable[[bytes], Optional[bytes]]] = None,
  4811. tree_filter: Optional[Callable[[bytes, str], Optional[bytes]]] = None,
  4812. index_filter: Optional[Callable[[bytes, str], Optional[bytes]]] = None,
  4813. parent_filter: Optional[Callable[[Sequence[bytes]], list[bytes]]] = None,
  4814. commit_filter: Optional[Callable[[Commit, bytes], Optional[bytes]]] = None,
  4815. subdirectory_filter: Optional[Union[str, bytes]] = None,
  4816. prune_empty: bool = False,
  4817. tag_name_filter: Optional[Callable[[bytes], Optional[bytes]]] = None,
  4818. force: bool = False,
  4819. keep_original: bool = True,
  4820. refs: Optional[list[bytes]] = None,
  4821. ) -> dict[bytes, bytes]:
  4822. """Rewrite branch history by creating new commits with filtered properties.
  4823. This is similar to git filter-branch, allowing you to rewrite commit
  4824. history by modifying trees, parents, author, committer, or commit messages.
  4825. Args:
  4826. repo: Path to repository
  4827. branch: Branch to rewrite (defaults to HEAD)
  4828. filter_fn: Optional callable that takes a Commit object and returns
  4829. a dict of updated fields (author, committer, message, etc.)
  4830. filter_author: Optional callable that takes author bytes and returns
  4831. updated author bytes or None to keep unchanged
  4832. filter_committer: Optional callable that takes committer bytes and returns
  4833. updated committer bytes or None to keep unchanged
  4834. filter_message: Optional callable that takes commit message bytes
  4835. and returns updated message bytes
  4836. tree_filter: Optional callable that takes (tree_sha, temp_dir) and returns
  4837. new tree SHA after modifying working directory
  4838. index_filter: Optional callable that takes (tree_sha, temp_index_path) and
  4839. returns new tree SHA after modifying index
  4840. parent_filter: Optional callable that takes parent list and returns
  4841. modified parent list
  4842. commit_filter: Optional callable that takes (Commit, tree_sha) and returns
  4843. new commit SHA or None to skip commit
  4844. subdirectory_filter: Optional subdirectory path to extract as new root
  4845. prune_empty: Whether to prune commits that become empty
  4846. tag_name_filter: Optional callable to rename tags
  4847. force: Force operation even if branch has been filtered before
  4848. keep_original: Keep original refs under refs/original/
  4849. refs: List of refs to rewrite (defaults to [branch])
  4850. Returns:
  4851. Dict mapping old commit SHAs to new commit SHAs
  4852. Raises:
  4853. Error: If branch is already filtered and force is False
  4854. """
  4855. from .filter_branch import CommitFilter, filter_refs
  4856. with open_repo_closing(repo) as r:
  4857. # Parse branch/committish
  4858. if isinstance(branch, str):
  4859. branch = branch.encode()
  4860. # Determine which refs to process
  4861. if refs is None:
  4862. if branch == b"HEAD":
  4863. # Resolve HEAD to actual branch
  4864. try:
  4865. resolved = r.refs.follow(b"HEAD")
  4866. if resolved and resolved[0]:
  4867. # resolved is a list of (refname, sha) tuples
  4868. resolved_ref = resolved[0][-1]
  4869. if resolved_ref and resolved_ref != b"HEAD":
  4870. refs = [resolved_ref]
  4871. else:
  4872. # HEAD points directly to a commit
  4873. refs = [b"HEAD"]
  4874. else:
  4875. refs = [b"HEAD"]
  4876. except SymrefLoop:
  4877. refs = [b"HEAD"]
  4878. else:
  4879. # Convert branch name to full ref if needed
  4880. if not branch.startswith(b"refs/"):
  4881. branch = b"refs/heads/" + branch
  4882. refs = [branch]
  4883. # Convert subdirectory filter to bytes if needed
  4884. if subdirectory_filter:
  4885. if isinstance(subdirectory_filter, str):
  4886. subdirectory_filter = subdirectory_filter.encode()
  4887. else:
  4888. subdirectory_filter = None
  4889. # Create commit filter
  4890. filter_obj = CommitFilter(
  4891. r.object_store,
  4892. filter_fn=filter_fn,
  4893. filter_author=filter_author,
  4894. filter_committer=filter_committer,
  4895. filter_message=filter_message,
  4896. tree_filter=tree_filter,
  4897. index_filter=index_filter,
  4898. parent_filter=parent_filter,
  4899. commit_filter=commit_filter,
  4900. subdirectory_filter=subdirectory_filter,
  4901. prune_empty=prune_empty,
  4902. tag_name_filter=tag_name_filter,
  4903. )
  4904. # Tag callback for renaming tags
  4905. def rename_tag(old_ref: bytes, new_ref: bytes) -> None:
  4906. # Copy tag to new name
  4907. r.refs[new_ref] = r.refs[old_ref]
  4908. # Delete old tag
  4909. del r.refs[old_ref]
  4910. # Filter refs
  4911. try:
  4912. return filter_refs(
  4913. r.refs,
  4914. r.object_store,
  4915. refs,
  4916. filter_obj,
  4917. keep_original=keep_original,
  4918. force=force,
  4919. tag_callback=rename_tag if tag_name_filter else None,
  4920. )
  4921. except ValueError as e:
  4922. raise Error(str(e)) from e
  4923. def format_patch(
  4924. repo: RepoPath = ".",
  4925. committish: Optional[Union[bytes, tuple[bytes, bytes]]] = None,
  4926. outstream: TextIO = sys.stdout,
  4927. outdir: Optional[Union[str, os.PathLike[str]]] = None,
  4928. n: int = 1,
  4929. stdout: bool = False,
  4930. version: Optional[str] = None,
  4931. ) -> list[str]:
  4932. """Generate patches suitable for git am.
  4933. Args:
  4934. repo: Path to repository
  4935. committish: Commit-ish or commit range to generate patches for.
  4936. Can be a single commit id, or a tuple of (start, end) commit ids
  4937. for a range. If None, formats the last n commits from HEAD.
  4938. outstream: Stream to write to if stdout=True
  4939. outdir: Directory to write patch files to (default: current directory)
  4940. n: Number of patches to generate if committish is None
  4941. stdout: Write patches to stdout instead of files
  4942. version: Version string to include in patches (default: Dulwich version)
  4943. Returns:
  4944. List of patch filenames that were created (empty if stdout=True)
  4945. """
  4946. if outdir is None:
  4947. outdir = "."
  4948. filenames = []
  4949. with open_repo_closing(repo) as r:
  4950. # Determine which commits to format
  4951. commits_to_format = []
  4952. if committish is None:
  4953. # Get the last n commits from HEAD
  4954. try:
  4955. walker = r.get_walker()
  4956. for entry in walker:
  4957. commits_to_format.append(entry.commit)
  4958. if len(commits_to_format) >= n:
  4959. break
  4960. commits_to_format.reverse()
  4961. except KeyError:
  4962. # No HEAD or empty repository
  4963. pass
  4964. elif isinstance(committish, tuple):
  4965. # Handle commit range (start, end)
  4966. start_commit, end_commit = committish
  4967. # Extract commit IDs from commit objects if needed
  4968. start_id = (
  4969. start_commit.id if isinstance(start_commit, Commit) else start_commit
  4970. )
  4971. end_id = end_commit.id if isinstance(end_commit, Commit) else end_commit
  4972. # Walk from end back to start
  4973. walker = r.get_walker(include=[end_id], exclude=[start_id])
  4974. for entry in walker:
  4975. commits_to_format.append(entry.commit)
  4976. commits_to_format.reverse()
  4977. else:
  4978. # Single commit
  4979. commit = r.object_store[committish]
  4980. assert isinstance(commit, Commit)
  4981. commits_to_format.append(commit)
  4982. # Generate patches
  4983. total = len(commits_to_format)
  4984. for i, commit in enumerate(commits_to_format, 1):
  4985. assert isinstance(commit, Commit)
  4986. # Get the parent
  4987. if commit.parents:
  4988. parent_id = commit.parents[0]
  4989. parent = r.object_store[parent_id]
  4990. assert isinstance(parent, Commit)
  4991. else:
  4992. parent = None
  4993. # Generate the diff
  4994. from io import BytesIO
  4995. diff_content = BytesIO()
  4996. if parent:
  4997. write_tree_diff(
  4998. diff_content,
  4999. r.object_store,
  5000. parent.tree,
  5001. commit.tree,
  5002. )
  5003. else:
  5004. # Initial commit - diff against empty tree
  5005. write_tree_diff(
  5006. diff_content,
  5007. r.object_store,
  5008. None,
  5009. commit.tree,
  5010. )
  5011. # Generate patch with commit metadata
  5012. if stdout:
  5013. # Get binary stream from TextIO
  5014. if hasattr(outstream, "buffer"):
  5015. binary_out: IO[bytes] = outstream.buffer
  5016. else:
  5017. # Fallback for non-text streams
  5018. binary_out = outstream # type: ignore[assignment]
  5019. write_commit_patch(
  5020. binary_out,
  5021. commit,
  5022. diff_content.getvalue(),
  5023. (i, total),
  5024. version=version,
  5025. )
  5026. else:
  5027. # Generate filename
  5028. summary = get_summary(commit)
  5029. filename = os.path.join(outdir, f"{i:04d}-{summary}.patch")
  5030. with open(filename, "wb") as f:
  5031. write_commit_patch(
  5032. f,
  5033. commit,
  5034. diff_content.getvalue(),
  5035. (i, total),
  5036. version=version,
  5037. )
  5038. filenames.append(filename)
  5039. return filenames
  5040. def bisect_start(
  5041. repo: Union[str, os.PathLike[str], Repo] = ".",
  5042. bad: Optional[Union[str, bytes, Commit, Tag]] = None,
  5043. good: Optional[
  5044. Union[str, bytes, Commit, Tag, Sequence[Union[str, bytes, Commit, Tag]]]
  5045. ] = None,
  5046. paths: Optional[Sequence[bytes]] = None,
  5047. no_checkout: bool = False,
  5048. term_bad: str = "bad",
  5049. term_good: str = "good",
  5050. ) -> Optional[bytes]:
  5051. """Start a new bisect session.
  5052. Args:
  5053. repo: Path to repository or a Repo object
  5054. bad: The bad commit (defaults to HEAD)
  5055. good: List of good commits or a single good commit
  5056. paths: Optional paths to limit bisect to
  5057. no_checkout: If True, don't checkout commits during bisect
  5058. term_bad: Term to use for bad commits (default: "bad")
  5059. term_good: Term to use for good commits (default: "good")
  5060. """
  5061. with open_repo_closing(repo) as r:
  5062. state = BisectState(r)
  5063. # Convert single good commit to sequence
  5064. if good is not None and isinstance(good, (str, bytes, Commit, Tag)):
  5065. good = [good]
  5066. # Parse commits
  5067. bad_sha = parse_commit(r, bad).id if bad else None
  5068. good_shas = [parse_commit(r, g).id for g in good] if good else None
  5069. state.start(bad_sha, good_shas, paths, no_checkout, term_bad, term_good)
  5070. # Return the next commit to test if we have both good and bad
  5071. if bad_sha and good_shas:
  5072. next_sha = state._find_next_commit()
  5073. if next_sha and not no_checkout:
  5074. # Checkout the next commit
  5075. old_commit = r[r.head()]
  5076. assert isinstance(old_commit, Commit)
  5077. old_tree = old_commit.tree if r.head() else None
  5078. r.refs[b"HEAD"] = next_sha
  5079. commit = r[next_sha]
  5080. assert isinstance(commit, Commit)
  5081. changes = tree_changes(r.object_store, old_tree, commit.tree)
  5082. update_working_tree(r, old_tree, commit.tree, change_iterator=changes)
  5083. return next_sha
  5084. return None
  5085. def bisect_bad(
  5086. repo: Union[str, os.PathLike[str], Repo] = ".",
  5087. rev: Optional[Union[str, bytes, Commit, Tag]] = None,
  5088. ) -> Optional[bytes]:
  5089. """Mark a commit as bad.
  5090. Args:
  5091. repo: Path to repository or a Repo object
  5092. rev: Commit to mark as bad (defaults to HEAD)
  5093. Returns:
  5094. The SHA of the next commit to test, or None if bisect is complete
  5095. """
  5096. with open_repo_closing(repo) as r:
  5097. state = BisectState(r)
  5098. rev_sha = parse_commit(r, rev).id if rev else None
  5099. next_sha = state.mark_bad(rev_sha)
  5100. if next_sha:
  5101. # Checkout the next commit
  5102. old_commit = r[r.head()]
  5103. assert isinstance(old_commit, Commit)
  5104. old_tree = old_commit.tree if r.head() else None
  5105. r.refs[b"HEAD"] = next_sha
  5106. commit = r[next_sha]
  5107. assert isinstance(commit, Commit)
  5108. changes = tree_changes(r.object_store, old_tree, commit.tree)
  5109. update_working_tree(r, old_tree, commit.tree, change_iterator=changes)
  5110. return next_sha
  5111. def bisect_good(
  5112. repo: Union[str, os.PathLike[str], Repo] = ".",
  5113. rev: Optional[Union[str, bytes, Commit, Tag]] = None,
  5114. ) -> Optional[bytes]:
  5115. """Mark a commit as good.
  5116. Args:
  5117. repo: Path to repository or a Repo object
  5118. rev: Commit to mark as good (defaults to HEAD)
  5119. Returns:
  5120. The SHA of the next commit to test, or None if bisect is complete
  5121. """
  5122. with open_repo_closing(repo) as r:
  5123. state = BisectState(r)
  5124. rev_sha = parse_commit(r, rev).id if rev else None
  5125. next_sha = state.mark_good(rev_sha)
  5126. if next_sha:
  5127. # Checkout the next commit
  5128. old_commit = r[r.head()]
  5129. assert isinstance(old_commit, Commit)
  5130. old_tree = old_commit.tree if r.head() else None
  5131. r.refs[b"HEAD"] = next_sha
  5132. commit = r[next_sha]
  5133. assert isinstance(commit, Commit)
  5134. changes = tree_changes(r.object_store, old_tree, commit.tree)
  5135. update_working_tree(r, old_tree, commit.tree, change_iterator=changes)
  5136. return next_sha
  5137. def bisect_skip(
  5138. repo: Union[str, os.PathLike[str], Repo] = ".",
  5139. revs: Optional[
  5140. Union[str, bytes, Commit, Tag, Sequence[Union[str, bytes, Commit, Tag]]]
  5141. ] = None,
  5142. ) -> Optional[bytes]:
  5143. """Skip one or more commits.
  5144. Args:
  5145. repo: Path to repository or a Repo object
  5146. revs: List of commits to skip (defaults to [HEAD])
  5147. Returns:
  5148. The SHA of the next commit to test, or None if bisect is complete
  5149. """
  5150. with open_repo_closing(repo) as r:
  5151. state = BisectState(r)
  5152. if revs is None:
  5153. rev_shas = None
  5154. else:
  5155. # Convert single rev to sequence
  5156. if isinstance(revs, (str, bytes, Commit, Tag)):
  5157. revs = [revs]
  5158. rev_shas = [parse_commit(r, rev).id for rev in revs]
  5159. next_sha = state.skip(rev_shas)
  5160. if next_sha:
  5161. # Checkout the next commit
  5162. old_commit = r[r.head()]
  5163. assert isinstance(old_commit, Commit)
  5164. old_tree = old_commit.tree if r.head() else None
  5165. r.refs[b"HEAD"] = next_sha
  5166. commit = r[next_sha]
  5167. assert isinstance(commit, Commit)
  5168. changes = tree_changes(r.object_store, old_tree, commit.tree)
  5169. update_working_tree(r, old_tree, commit.tree, change_iterator=changes)
  5170. return next_sha
  5171. def bisect_reset(
  5172. repo: Union[str, os.PathLike[str], Repo] = ".",
  5173. commit: Optional[Union[str, bytes, Commit, Tag]] = None,
  5174. ) -> None:
  5175. """Reset bisect state and return to original branch/commit.
  5176. Args:
  5177. repo: Path to repository or a Repo object
  5178. commit: Optional commit to reset to (defaults to original branch/commit)
  5179. """
  5180. with open_repo_closing(repo) as r:
  5181. state = BisectState(r)
  5182. # Get old tree before reset
  5183. try:
  5184. old_commit = r[r.head()]
  5185. assert isinstance(old_commit, Commit)
  5186. old_tree = old_commit.tree
  5187. except KeyError:
  5188. old_tree = None
  5189. commit_sha = parse_commit(r, commit).id if commit else None
  5190. state.reset(commit_sha)
  5191. # Update working tree to new HEAD
  5192. try:
  5193. new_head = r.head()
  5194. if new_head:
  5195. new_commit = r[new_head]
  5196. assert isinstance(new_commit, Commit)
  5197. changes = tree_changes(r.object_store, old_tree, new_commit.tree)
  5198. update_working_tree(
  5199. r, old_tree, new_commit.tree, change_iterator=changes
  5200. )
  5201. except KeyError:
  5202. # No HEAD after reset
  5203. pass
  5204. def bisect_log(repo: Union[str, os.PathLike[str], Repo] = ".") -> str:
  5205. """Get the bisect log.
  5206. Args:
  5207. repo: Path to repository or a Repo object
  5208. Returns:
  5209. The bisect log as a string
  5210. """
  5211. with open_repo_closing(repo) as r:
  5212. state = BisectState(r)
  5213. return state.get_log()
  5214. def bisect_replay(
  5215. repo: Union[str, os.PathLike[str], Repo],
  5216. log_file: Union[str, os.PathLike[str], BinaryIO],
  5217. ) -> None:
  5218. """Replay a bisect log.
  5219. Args:
  5220. repo: Path to repository or a Repo object
  5221. log_file: Path to the log file or file-like object
  5222. """
  5223. with open_repo_closing(repo) as r:
  5224. state = BisectState(r)
  5225. if isinstance(log_file, (str, os.PathLike)):
  5226. with open(log_file) as f:
  5227. log_content = f.read()
  5228. else:
  5229. content = log_file.read()
  5230. log_content = content.decode() if isinstance(content, bytes) else content
  5231. state.replay(log_content)
  5232. def reflog(
  5233. repo: RepoPath = ".", ref: Union[str, bytes] = b"HEAD", all: bool = False
  5234. ) -> Iterator[Union[Any, tuple[bytes, Any]]]:
  5235. """Show reflog entries for a reference or all references.
  5236. Args:
  5237. repo: Path to repository or a Repo object
  5238. ref: Reference name (defaults to HEAD)
  5239. all: If True, show reflogs for all refs (ignores ref parameter)
  5240. Yields:
  5241. If all=False: ReflogEntry objects
  5242. If all=True: Tuples of (ref_name, ReflogEntry) for all refs with reflogs
  5243. """
  5244. import os
  5245. from .reflog import iter_reflogs
  5246. if isinstance(ref, str):
  5247. ref = ref.encode("utf-8")
  5248. with open_repo_closing(repo) as r:
  5249. if not all:
  5250. yield from r.read_reflog(ref)
  5251. else:
  5252. logs_dir = os.path.join(r.controldir(), "logs")
  5253. # Use iter_reflogs to discover all reflogs
  5254. for ref_bytes in iter_reflogs(logs_dir):
  5255. # Read the reflog entries for this ref
  5256. for entry in r.read_reflog(ref_bytes):
  5257. yield (ref_bytes, entry)
  5258. def lfs_track(
  5259. repo: Union[str, os.PathLike[str], Repo] = ".",
  5260. patterns: Optional[Sequence[str]] = None,
  5261. ) -> list[str]:
  5262. """Track file patterns with Git LFS.
  5263. Args:
  5264. repo: Path to repository
  5265. patterns: List of file patterns to track (e.g., ["*.bin", "*.pdf"])
  5266. If None, returns current tracked patterns
  5267. Returns:
  5268. List of tracked patterns
  5269. """
  5270. from .attrs import GitAttributes
  5271. with open_repo_closing(repo) as r:
  5272. gitattributes_path = os.path.join(r.path, ".gitattributes")
  5273. # Load existing GitAttributes
  5274. if os.path.exists(gitattributes_path):
  5275. gitattributes = GitAttributes.from_file(gitattributes_path)
  5276. else:
  5277. gitattributes = GitAttributes()
  5278. if patterns is None:
  5279. # Return current LFS tracked patterns
  5280. tracked = []
  5281. for pattern_obj, attrs in gitattributes:
  5282. if attrs.get(b"filter") == b"lfs":
  5283. tracked.append(pattern_obj.pattern.decode())
  5284. return tracked
  5285. # Add new patterns
  5286. for pattern in patterns:
  5287. # Ensure pattern is bytes
  5288. pattern_bytes = pattern.encode() if isinstance(pattern, str) else pattern
  5289. # Set LFS attributes for the pattern
  5290. gitattributes.set_attribute(pattern_bytes, b"filter", b"lfs")
  5291. gitattributes.set_attribute(pattern_bytes, b"diff", b"lfs")
  5292. gitattributes.set_attribute(pattern_bytes, b"merge", b"lfs")
  5293. gitattributes.set_attribute(pattern_bytes, b"text", False)
  5294. # Write updated attributes
  5295. gitattributes.write_to_file(gitattributes_path)
  5296. # Stage the .gitattributes file
  5297. add(r, [".gitattributes"])
  5298. return lfs_track(r) # Return updated list
  5299. def lfs_untrack(
  5300. repo: Union[str, os.PathLike[str], Repo] = ".",
  5301. patterns: Optional[Sequence[str]] = None,
  5302. ) -> list[str]:
  5303. """Untrack file patterns from Git LFS.
  5304. Args:
  5305. repo: Path to repository
  5306. patterns: List of file patterns to untrack
  5307. Returns:
  5308. List of remaining tracked patterns
  5309. """
  5310. from .attrs import GitAttributes
  5311. if not patterns:
  5312. return lfs_track(repo)
  5313. with open_repo_closing(repo) as r:
  5314. gitattributes_path = os.path.join(r.path, ".gitattributes")
  5315. if not os.path.exists(gitattributes_path):
  5316. return []
  5317. # Load existing GitAttributes
  5318. gitattributes = GitAttributes.from_file(gitattributes_path)
  5319. # Remove specified patterns
  5320. for pattern in patterns:
  5321. pattern_bytes = pattern.encode() if isinstance(pattern, str) else pattern
  5322. # Check if pattern is tracked by LFS
  5323. for pattern_obj, attrs in list(gitattributes):
  5324. if (
  5325. pattern_obj.pattern == pattern_bytes
  5326. and attrs.get(b"filter") == b"lfs"
  5327. ):
  5328. gitattributes.remove_pattern(pattern_bytes)
  5329. break
  5330. # Write updated attributes
  5331. gitattributes.write_to_file(gitattributes_path)
  5332. # Stage the .gitattributes file
  5333. add(r, [".gitattributes"])
  5334. return lfs_track(r) # Return updated list
  5335. def lfs_init(repo: Union[str, os.PathLike[str], Repo] = ".") -> None:
  5336. """Initialize Git LFS in a repository.
  5337. Args:
  5338. repo: Path to repository
  5339. Returns:
  5340. None
  5341. """
  5342. from .lfs import LFSStore
  5343. with open_repo_closing(repo) as r:
  5344. # Create LFS store
  5345. LFSStore.from_repo(r, create=True)
  5346. # Set up Git config for LFS
  5347. config = r.get_config()
  5348. config.set((b"filter", b"lfs"), b"process", b"git-lfs filter-process")
  5349. config.set((b"filter", b"lfs"), b"required", b"true")
  5350. config.set((b"filter", b"lfs"), b"clean", b"git-lfs clean -- %f")
  5351. config.set((b"filter", b"lfs"), b"smudge", b"git-lfs smudge -- %f")
  5352. config.write_to_path()
  5353. def lfs_clean(
  5354. repo: Union[str, os.PathLike[str], Repo] = ".",
  5355. path: Optional[Union[str, os.PathLike[str]]] = None,
  5356. ) -> bytes:
  5357. """Clean a file by converting it to an LFS pointer.
  5358. Args:
  5359. repo: Path to repository
  5360. path: Path to file to clean (relative to repo root)
  5361. Returns:
  5362. LFS pointer content as bytes
  5363. """
  5364. from .lfs import LFSFilterDriver, LFSStore
  5365. with open_repo_closing(repo) as r:
  5366. if path is None:
  5367. raise ValueError("Path must be specified")
  5368. # Get LFS store
  5369. lfs_store = LFSStore.from_repo(r)
  5370. filter_driver = LFSFilterDriver(lfs_store, config=r.get_config())
  5371. # Read file content
  5372. full_path = os.path.join(r.path, path)
  5373. with open(full_path, "rb") as f:
  5374. content = f.read()
  5375. # Clean the content (convert to LFS pointer)
  5376. return filter_driver.clean(content)
  5377. def lfs_smudge(
  5378. repo: Union[str, os.PathLike[str], Repo] = ".",
  5379. pointer_content: Optional[bytes] = None,
  5380. ) -> bytes:
  5381. """Smudge an LFS pointer by retrieving the actual content.
  5382. Args:
  5383. repo: Path to repository
  5384. pointer_content: LFS pointer content as bytes
  5385. Returns:
  5386. Actual file content as bytes
  5387. """
  5388. from .lfs import LFSFilterDriver, LFSStore
  5389. with open_repo_closing(repo) as r:
  5390. if pointer_content is None:
  5391. raise ValueError("Pointer content must be specified")
  5392. # Get LFS store
  5393. lfs_store = LFSStore.from_repo(r)
  5394. filter_driver = LFSFilterDriver(lfs_store, config=r.get_config())
  5395. # Smudge the pointer (retrieve actual content)
  5396. return filter_driver.smudge(pointer_content)
  5397. def lfs_ls_files(
  5398. repo: Union[str, os.PathLike[str], Repo] = ".",
  5399. ref: Optional[Union[str, bytes]] = None,
  5400. ) -> list[tuple[bytes, str, int]]:
  5401. """List files tracked by Git LFS.
  5402. Args:
  5403. repo: Path to repository
  5404. ref: Git ref to check (defaults to HEAD)
  5405. Returns:
  5406. List of (path, oid, size) tuples for LFS files
  5407. """
  5408. from .lfs import LFSPointer
  5409. from .object_store import iter_tree_contents
  5410. with open_repo_closing(repo) as r:
  5411. if ref is None:
  5412. ref = b"HEAD"
  5413. elif isinstance(ref, str):
  5414. ref = ref.encode()
  5415. # Get the commit and tree
  5416. try:
  5417. commit = r[ref]
  5418. assert isinstance(commit, Commit)
  5419. tree = r[commit.tree]
  5420. assert isinstance(tree, Tree)
  5421. except KeyError:
  5422. return []
  5423. lfs_files = []
  5424. # Walk the tree
  5425. for path, mode, sha in iter_tree_contents(r.object_store, tree.id):
  5426. assert path is not None
  5427. assert mode is not None
  5428. assert sha is not None
  5429. if not stat.S_ISREG(mode):
  5430. continue
  5431. # Check if it's an LFS pointer
  5432. obj = r.object_store[sha]
  5433. if not isinstance(obj, Blob):
  5434. raise AssertionError(f"Expected Blob object, got {type(obj).__name__}")
  5435. pointer = LFSPointer.from_bytes(obj.data)
  5436. if pointer is not None:
  5437. lfs_files.append((path, pointer.oid, pointer.size))
  5438. return lfs_files
  5439. def lfs_migrate(
  5440. repo: Union[str, os.PathLike[str], Repo] = ".",
  5441. include: Optional[list[str]] = None,
  5442. exclude: Optional[list[str]] = None,
  5443. everything: bool = False,
  5444. ) -> int:
  5445. """Migrate files to Git LFS.
  5446. Args:
  5447. repo: Path to repository
  5448. include: Patterns of files to include
  5449. exclude: Patterns of files to exclude
  5450. everything: Migrate all files above a certain size
  5451. Returns:
  5452. Number of migrated files
  5453. """
  5454. from .lfs import LFSFilterDriver, LFSStore
  5455. with open_repo_closing(repo) as r:
  5456. # Initialize LFS if needed
  5457. lfs_store = LFSStore.from_repo(r, create=True)
  5458. filter_driver = LFSFilterDriver(lfs_store, config=r.get_config())
  5459. # Get current index
  5460. index = r.open_index()
  5461. migrated = 0
  5462. # Determine files to migrate
  5463. files_to_migrate = []
  5464. if everything:
  5465. # Migrate all files above 100MB
  5466. for path, entry in index.items():
  5467. full_path = os.path.join(r.path, path.decode())
  5468. if os.path.exists(full_path):
  5469. size = os.path.getsize(full_path)
  5470. if size > 100 * 1024 * 1024: # 100MB
  5471. files_to_migrate.append(path.decode())
  5472. else:
  5473. # Use include/exclude patterns
  5474. for path, entry in index.items():
  5475. path_str = path.decode()
  5476. # Check include patterns
  5477. if include:
  5478. matched = any(
  5479. fnmatch.fnmatch(path_str, pattern) for pattern in include
  5480. )
  5481. if not matched:
  5482. continue
  5483. # Check exclude patterns
  5484. if exclude:
  5485. excluded = any(
  5486. fnmatch.fnmatch(path_str, pattern) for pattern in exclude
  5487. )
  5488. if excluded:
  5489. continue
  5490. files_to_migrate.append(path_str)
  5491. # Migrate files
  5492. for path_str in files_to_migrate:
  5493. full_path = os.path.join(r.path, path_str)
  5494. if not os.path.exists(full_path):
  5495. continue
  5496. # Read file content
  5497. with open(full_path, "rb") as f:
  5498. content = f.read()
  5499. # Convert to LFS pointer
  5500. pointer_content = filter_driver.clean(content)
  5501. # Write pointer back to file
  5502. with open(full_path, "wb") as f:
  5503. f.write(pointer_content)
  5504. # Create blob for pointer content and update index
  5505. blob = Blob()
  5506. blob.data = pointer_content
  5507. r.object_store.add_object(blob)
  5508. st = os.stat(full_path)
  5509. index_entry = index_entry_from_stat(st, blob.id, 0)
  5510. path_bytes = path_str.encode() if isinstance(path_str, str) else path_str
  5511. index[path_bytes] = index_entry
  5512. migrated += 1
  5513. # Write updated index
  5514. index.write()
  5515. # Track patterns if include was specified
  5516. if include:
  5517. lfs_track(r, include)
  5518. return migrated
  5519. def lfs_pointer_check(
  5520. repo: Union[str, os.PathLike[str], Repo] = ".",
  5521. paths: Optional[Sequence[str]] = None,
  5522. ) -> dict[str, Optional[Any]]:
  5523. """Check if files are valid LFS pointers.
  5524. Args:
  5525. repo: Path to repository
  5526. paths: List of file paths to check (if None, check all files)
  5527. Returns:
  5528. Dict mapping paths to LFSPointer objects (or None if not a pointer)
  5529. """
  5530. from .lfs import LFSPointer
  5531. with open_repo_closing(repo) as r:
  5532. results = {}
  5533. if paths is None:
  5534. # Check all files in index
  5535. index = r.open_index()
  5536. paths = [path.decode() for path in index]
  5537. for path in paths:
  5538. full_path = os.path.join(r.path, path)
  5539. if os.path.exists(full_path):
  5540. try:
  5541. with open(full_path, "rb") as f:
  5542. content = f.read()
  5543. pointer = LFSPointer.from_bytes(content)
  5544. results[path] = pointer
  5545. except OSError:
  5546. results[path] = None
  5547. else:
  5548. results[path] = None
  5549. return results
  5550. def lfs_fetch(
  5551. repo: Union[str, os.PathLike[str], Repo] = ".",
  5552. remote: str = "origin",
  5553. refs: Optional[list[Union[str, bytes]]] = None,
  5554. ) -> int:
  5555. """Fetch LFS objects from remote.
  5556. Args:
  5557. repo: Path to repository
  5558. remote: Remote name (default: origin)
  5559. refs: Specific refs to fetch LFS objects for (default: all refs)
  5560. Returns:
  5561. Number of objects fetched
  5562. """
  5563. from .lfs import LFSClient, LFSPointer, LFSStore
  5564. with open_repo_closing(repo) as r:
  5565. # Get LFS server URL from config
  5566. config = r.get_config()
  5567. lfs_url_bytes = config.get((b"lfs",), b"url")
  5568. if not lfs_url_bytes:
  5569. # Try remote URL
  5570. remote_url = config.get((b"remote", remote.encode()), b"url")
  5571. if remote_url:
  5572. # Append /info/lfs to remote URL
  5573. remote_url_str = remote_url.decode()
  5574. if remote_url_str.endswith(".git"):
  5575. remote_url_str = remote_url_str[:-4]
  5576. lfs_url = f"{remote_url_str}/info/lfs"
  5577. else:
  5578. raise ValueError(f"No LFS URL configured for remote {remote}")
  5579. else:
  5580. lfs_url = lfs_url_bytes.decode()
  5581. # Get authentication
  5582. auth = None
  5583. # TODO: Support credential helpers and other auth methods
  5584. # Create LFS client and store
  5585. client = LFSClient(lfs_url, auth)
  5586. store = LFSStore.from_repo(r)
  5587. # Find all LFS pointers in the refs
  5588. pointers_to_fetch = []
  5589. if refs is None:
  5590. # Get all refs
  5591. refs = list(r.refs.keys())
  5592. for ref in refs:
  5593. if isinstance(ref, str):
  5594. ref = ref.encode()
  5595. try:
  5596. commit = r[r.refs[ref]]
  5597. except KeyError:
  5598. continue
  5599. # Walk the commit tree
  5600. assert isinstance(commit, Commit)
  5601. for path, mode, sha in r.object_store.iter_tree_contents(commit.tree):
  5602. assert sha is not None
  5603. try:
  5604. obj = r.object_store[sha]
  5605. except KeyError:
  5606. pass
  5607. else:
  5608. if isinstance(obj, Blob):
  5609. pointer = LFSPointer.from_bytes(obj.data)
  5610. if pointer and pointer.is_valid_oid():
  5611. # Check if we already have it
  5612. try:
  5613. store.open_object(pointer.oid)
  5614. except KeyError:
  5615. pointers_to_fetch.append((pointer.oid, pointer.size))
  5616. # Fetch missing objects
  5617. fetched = 0
  5618. for oid, size in pointers_to_fetch:
  5619. content = client.download(oid, size)
  5620. store.write_object([content])
  5621. fetched += 1
  5622. return fetched
  5623. def lfs_pull(
  5624. repo: Union[str, os.PathLike[str], Repo] = ".", remote: str = "origin"
  5625. ) -> int:
  5626. """Pull LFS objects for current checkout.
  5627. Args:
  5628. repo: Path to repository
  5629. remote: Remote name (default: origin)
  5630. Returns:
  5631. Number of objects fetched
  5632. """
  5633. from .lfs import LFSPointer, LFSStore
  5634. with open_repo_closing(repo) as r:
  5635. # First do a fetch for HEAD
  5636. fetched = lfs_fetch(repo, remote, [b"HEAD"])
  5637. # Then checkout LFS files in working directory
  5638. store = LFSStore.from_repo(r)
  5639. index = r.open_index()
  5640. for path, entry in index.items():
  5641. full_path = os.path.join(r.path, path.decode())
  5642. if os.path.exists(full_path):
  5643. with open(full_path, "rb") as f:
  5644. content = f.read()
  5645. pointer = LFSPointer.from_bytes(content)
  5646. if pointer and pointer.is_valid_oid():
  5647. try:
  5648. # Replace pointer with actual content
  5649. with store.open_object(pointer.oid) as lfs_file:
  5650. lfs_content = lfs_file.read()
  5651. with open(full_path, "wb") as f:
  5652. f.write(lfs_content)
  5653. except KeyError:
  5654. # Object not available
  5655. pass
  5656. return fetched
  5657. def lfs_push(
  5658. repo: Union[str, os.PathLike[str], Repo] = ".",
  5659. remote: str = "origin",
  5660. refs: Optional[list[Union[str, bytes]]] = None,
  5661. ) -> int:
  5662. """Push LFS objects to remote.
  5663. Args:
  5664. repo: Path to repository
  5665. remote: Remote name (default: origin)
  5666. refs: Specific refs to push LFS objects for (default: current branch)
  5667. Returns:
  5668. Number of objects pushed
  5669. """
  5670. from .lfs import LFSClient, LFSPointer, LFSStore
  5671. with open_repo_closing(repo) as r:
  5672. # Get LFS server URL from config
  5673. config = r.get_config()
  5674. lfs_url_bytes = config.get((b"lfs",), b"url")
  5675. if not lfs_url_bytes:
  5676. # Try remote URL
  5677. remote_url = config.get((b"remote", remote.encode()), b"url")
  5678. if remote_url:
  5679. # Append /info/lfs to remote URL
  5680. remote_url_str = remote_url.decode()
  5681. if remote_url_str.endswith(".git"):
  5682. remote_url_str = remote_url_str[:-4]
  5683. lfs_url = f"{remote_url_str}/info/lfs"
  5684. else:
  5685. raise ValueError(f"No LFS URL configured for remote {remote}")
  5686. else:
  5687. lfs_url = lfs_url_bytes.decode()
  5688. # Get authentication
  5689. auth = None
  5690. # TODO: Support credential helpers and other auth methods
  5691. # Create LFS client and store
  5692. client = LFSClient(lfs_url, auth)
  5693. store = LFSStore.from_repo(r)
  5694. # Find all LFS objects to push
  5695. if refs is None:
  5696. # Push current branch
  5697. head_ref = r.refs.read_ref(b"HEAD")
  5698. refs = [head_ref] if head_ref else []
  5699. objects_to_push = set()
  5700. for ref in refs:
  5701. if isinstance(ref, str):
  5702. ref = ref.encode()
  5703. try:
  5704. if ref.startswith(b"refs/"):
  5705. commit = r[r.refs[ref]]
  5706. else:
  5707. commit = r[ref]
  5708. except KeyError:
  5709. continue
  5710. # Walk the commit tree
  5711. assert isinstance(commit, Commit)
  5712. for path, mode, sha in r.object_store.iter_tree_contents(commit.tree):
  5713. assert sha is not None
  5714. try:
  5715. obj = r.object_store[sha]
  5716. except KeyError:
  5717. pass
  5718. else:
  5719. if isinstance(obj, Blob):
  5720. pointer = LFSPointer.from_bytes(obj.data)
  5721. if pointer and pointer.is_valid_oid():
  5722. objects_to_push.add((pointer.oid, pointer.size))
  5723. # Push objects
  5724. pushed = 0
  5725. for oid, size in objects_to_push:
  5726. try:
  5727. with store.open_object(oid) as f:
  5728. content = f.read()
  5729. except KeyError:
  5730. # Object not in local store
  5731. logging.warn("LFS object %s not found locally", oid)
  5732. else:
  5733. client.upload(oid, size, content)
  5734. pushed += 1
  5735. return pushed
  5736. def lfs_status(repo: Union[str, os.PathLike[str], Repo] = ".") -> dict[str, list[str]]:
  5737. """Show status of LFS files.
  5738. Args:
  5739. repo: Path to repository
  5740. Returns:
  5741. Dict with status information
  5742. """
  5743. from .lfs import LFSPointer, LFSStore
  5744. with open_repo_closing(repo) as r:
  5745. store = LFSStore.from_repo(r)
  5746. index = r.open_index()
  5747. status: dict[str, list[str]] = {
  5748. "tracked": [],
  5749. "not_staged": [],
  5750. "not_committed": [],
  5751. "not_pushed": [],
  5752. "missing": [],
  5753. }
  5754. # Check working directory files
  5755. for path, entry in index.items():
  5756. path_str = path.decode()
  5757. full_path = os.path.join(r.path, path_str)
  5758. if os.path.exists(full_path):
  5759. with open(full_path, "rb") as f:
  5760. content = f.read()
  5761. pointer = LFSPointer.from_bytes(content)
  5762. if pointer and pointer.is_valid_oid():
  5763. status["tracked"].append(path_str)
  5764. # Check if object exists locally
  5765. try:
  5766. store.open_object(pointer.oid)
  5767. except KeyError:
  5768. status["missing"].append(path_str)
  5769. # Check if file has been modified
  5770. if isinstance(entry, ConflictedIndexEntry):
  5771. continue # Skip conflicted entries
  5772. try:
  5773. staged_obj = r.object_store[entry.sha]
  5774. except KeyError:
  5775. pass
  5776. else:
  5777. if not isinstance(staged_obj, Blob):
  5778. raise AssertionError(
  5779. f"Expected Blob object, got {type(staged_obj).__name__}"
  5780. )
  5781. staged_pointer = LFSPointer.from_bytes(staged_obj.data)
  5782. if staged_pointer and staged_pointer.oid != pointer.oid:
  5783. status["not_staged"].append(path_str)
  5784. # TODO: Check for not committed and not pushed files
  5785. return status
  5786. def worktree_list(repo: RepoPath = ".") -> list[Any]:
  5787. """List all worktrees for a repository.
  5788. Args:
  5789. repo: Path to repository
  5790. Returns:
  5791. List of WorkTreeInfo objects
  5792. """
  5793. from .worktree import list_worktrees
  5794. with open_repo_closing(repo) as r:
  5795. return list_worktrees(r)
  5796. def worktree_add(
  5797. repo: RepoPath = ".",
  5798. path: Optional[Union[str, os.PathLike[str]]] = None,
  5799. branch: Optional[Union[str, bytes]] = None,
  5800. commit: Optional[Union[str, bytes]] = None,
  5801. detach: bool = False,
  5802. force: bool = False,
  5803. ) -> str:
  5804. """Add a new worktree.
  5805. Args:
  5806. repo: Path to repository
  5807. path: Path for new worktree
  5808. branch: Branch to checkout (creates if doesn't exist)
  5809. commit: Specific commit to checkout
  5810. detach: Create with detached HEAD
  5811. force: Force creation even if branch is already checked out
  5812. Returns:
  5813. Path to the newly created worktree
  5814. """
  5815. from .worktree import add_worktree
  5816. if path is None:
  5817. raise ValueError("Path is required for worktree add")
  5818. with open_repo_closing(repo) as r:
  5819. commit_bytes = commit.encode() if isinstance(commit, str) else commit
  5820. wt_repo = add_worktree(
  5821. r, path, branch=branch, commit=commit_bytes, detach=detach, force=force
  5822. )
  5823. return wt_repo.path
  5824. def worktree_remove(
  5825. repo: RepoPath = ".",
  5826. path: Optional[Union[str, os.PathLike[str]]] = None,
  5827. force: bool = False,
  5828. ) -> None:
  5829. """Remove a worktree.
  5830. Args:
  5831. repo: Path to repository
  5832. path: Path to worktree to remove
  5833. force: Force removal even if there are local changes
  5834. """
  5835. from .worktree import remove_worktree
  5836. if path is None:
  5837. raise ValueError("Path is required for worktree remove")
  5838. with open_repo_closing(repo) as r:
  5839. remove_worktree(r, path, force=force)
  5840. def worktree_prune(
  5841. repo: RepoPath = ".", dry_run: bool = False, expire: Optional[int] = None
  5842. ) -> list[str]:
  5843. """Prune worktree administrative files.
  5844. Args:
  5845. repo: Path to repository
  5846. dry_run: Only show what would be removed
  5847. expire: Only prune worktrees older than this many seconds
  5848. Returns:
  5849. List of pruned worktree names
  5850. """
  5851. from .worktree import prune_worktrees
  5852. with open_repo_closing(repo) as r:
  5853. return prune_worktrees(r, expire=expire, dry_run=dry_run)
  5854. def worktree_lock(
  5855. repo: RepoPath = ".",
  5856. path: Optional[Union[str, os.PathLike[str]]] = None,
  5857. reason: Optional[str] = None,
  5858. ) -> None:
  5859. """Lock a worktree to prevent it from being pruned.
  5860. Args:
  5861. repo: Path to repository
  5862. path: Path to worktree to lock
  5863. reason: Optional reason for locking
  5864. """
  5865. from .worktree import lock_worktree
  5866. if path is None:
  5867. raise ValueError("Path is required for worktree lock")
  5868. with open_repo_closing(repo) as r:
  5869. lock_worktree(r, path, reason=reason)
  5870. def worktree_unlock(
  5871. repo: RepoPath = ".", path: Optional[Union[str, os.PathLike[str]]] = None
  5872. ) -> None:
  5873. """Unlock a worktree.
  5874. Args:
  5875. repo: Path to repository
  5876. path: Path to worktree to unlock
  5877. """
  5878. from .worktree import unlock_worktree
  5879. if path is None:
  5880. raise ValueError("Path is required for worktree unlock")
  5881. with open_repo_closing(repo) as r:
  5882. unlock_worktree(r, path)
  5883. def worktree_move(
  5884. repo: RepoPath = ".",
  5885. old_path: Optional[Union[str, os.PathLike[str]]] = None,
  5886. new_path: Optional[Union[str, os.PathLike[str]]] = None,
  5887. ) -> None:
  5888. """Move a worktree to a new location.
  5889. Args:
  5890. repo: Path to repository
  5891. old_path: Current path of worktree
  5892. new_path: New path for worktree
  5893. """
  5894. from .worktree import move_worktree
  5895. if old_path is None or new_path is None:
  5896. raise ValueError("Both old_path and new_path are required for worktree move")
  5897. with open_repo_closing(repo) as r:
  5898. move_worktree(r, old_path, new_path)
  5899. def worktree_repair(
  5900. repo: RepoPath = ".",
  5901. paths: Optional[list[Union[str, os.PathLike[str]]]] = None,
  5902. ) -> list[str]:
  5903. """Repair worktree administrative files.
  5904. Args:
  5905. repo: Path to repository
  5906. paths: Optional list of worktree paths to repair. If None, repairs
  5907. connections from the main repository to all linked worktrees.
  5908. Returns:
  5909. List of repaired worktree paths
  5910. """
  5911. from .worktree import repair_worktree
  5912. with open_repo_closing(repo) as r:
  5913. return repair_worktree(r, paths=paths)