test_cli.py 82 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250
  1. #!/usr/bin/env python
  2. # test_cli.py -- tests for dulwich.cli
  3. # vim: expandtab
  4. #
  5. # Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
  6. #
  7. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  8. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  9. # General Public License as published by the Free Software Foundation; version 2.0
  10. # or (at your option) any later version. You can redistribute it and/or
  11. # modify it under the terms of either of these two licenses.
  12. #
  13. # Unless required by applicable law or agreed to in writing, software
  14. # distributed under the License is distributed on an "AS IS" BASIS,
  15. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  16. # See the License for the specific language governing permissions and
  17. # limitations under the License.
  18. #
  19. # You should have received a copy of the licenses; if not, see
  20. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  21. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  22. # License, Version 2.0.
  23. """Tests for dulwich.cli."""
  24. import io
  25. import os
  26. import shutil
  27. import sys
  28. import tempfile
  29. import unittest
  30. from unittest import skipIf
  31. from unittest.mock import MagicMock, patch
  32. from dulwich import cli
  33. from dulwich.cli import format_bytes, parse_relative_time
  34. from dulwich.repo import Repo
  35. from dulwich.tests.utils import (
  36. build_commit_graph,
  37. )
  38. from . import TestCase
  39. class DulwichCliTestCase(TestCase):
  40. """Base class for CLI tests."""
  41. def setUp(self) -> None:
  42. super().setUp()
  43. self.test_dir = tempfile.mkdtemp()
  44. self.addCleanup(shutil.rmtree, self.test_dir)
  45. self.repo_path = os.path.join(self.test_dir, "repo")
  46. os.mkdir(self.repo_path)
  47. self.repo = Repo.init(self.repo_path)
  48. self.addCleanup(self.repo.close)
  49. def _run_cli(self, *args, stdout_stream=None):
  50. """Run CLI command and capture output."""
  51. class MockStream:
  52. def __init__(self):
  53. self._buffer = io.BytesIO()
  54. self.buffer = self._buffer
  55. def write(self, data):
  56. if isinstance(data, bytes):
  57. self._buffer.write(data)
  58. else:
  59. self._buffer.write(data.encode("utf-8"))
  60. def getvalue(self):
  61. value = self._buffer.getvalue()
  62. try:
  63. return value.decode("utf-8")
  64. except UnicodeDecodeError:
  65. return value
  66. def __getattr__(self, name):
  67. return getattr(self._buffer, name)
  68. old_stdout = sys.stdout
  69. old_stderr = sys.stderr
  70. old_cwd = os.getcwd()
  71. try:
  72. # Use custom stdout_stream if provided, otherwise use MockStream
  73. if stdout_stream:
  74. sys.stdout = stdout_stream
  75. if not hasattr(sys.stdout, "buffer"):
  76. sys.stdout.buffer = sys.stdout
  77. else:
  78. sys.stdout = MockStream()
  79. sys.stderr = MockStream()
  80. os.chdir(self.repo_path)
  81. result = cli.main(list(args))
  82. return result, sys.stdout.getvalue(), sys.stderr.getvalue()
  83. finally:
  84. sys.stdout = old_stdout
  85. sys.stderr = old_stderr
  86. os.chdir(old_cwd)
  87. class InitCommandTest(DulwichCliTestCase):
  88. """Tests for init command."""
  89. def test_init_basic(self):
  90. # Create a new directory for init
  91. new_repo_path = os.path.join(self.test_dir, "new_repo")
  92. result, stdout, stderr = self._run_cli("init", new_repo_path)
  93. self.assertTrue(os.path.exists(os.path.join(new_repo_path, ".git")))
  94. def test_init_bare(self):
  95. # Create a new directory for bare repo
  96. bare_repo_path = os.path.join(self.test_dir, "bare_repo")
  97. result, stdout, stderr = self._run_cli("init", "--bare", bare_repo_path)
  98. self.assertTrue(os.path.exists(os.path.join(bare_repo_path, "HEAD")))
  99. self.assertFalse(os.path.exists(os.path.join(bare_repo_path, ".git")))
  100. class AddCommandTest(DulwichCliTestCase):
  101. """Tests for add command."""
  102. def test_add_single_file(self):
  103. # Create a file to add
  104. test_file = os.path.join(self.repo_path, "test.txt")
  105. with open(test_file, "w") as f:
  106. f.write("test content")
  107. result, stdout, stderr = self._run_cli("add", "test.txt")
  108. # Check that file is in index
  109. self.assertIn(b"test.txt", self.repo.open_index())
  110. def test_add_multiple_files(self):
  111. # Create multiple files
  112. for i in range(3):
  113. test_file = os.path.join(self.repo_path, f"test{i}.txt")
  114. with open(test_file, "w") as f:
  115. f.write(f"content {i}")
  116. result, stdout, stderr = self._run_cli(
  117. "add", "test0.txt", "test1.txt", "test2.txt"
  118. )
  119. index = self.repo.open_index()
  120. self.assertIn(b"test0.txt", index)
  121. self.assertIn(b"test1.txt", index)
  122. self.assertIn(b"test2.txt", index)
  123. class RmCommandTest(DulwichCliTestCase):
  124. """Tests for rm command."""
  125. def test_rm_file(self):
  126. # Create, add and commit a file first
  127. test_file = os.path.join(self.repo_path, "test.txt")
  128. with open(test_file, "w") as f:
  129. f.write("test content")
  130. self._run_cli("add", "test.txt")
  131. self._run_cli("commit", "--message=Add test file")
  132. # Now remove it from index and working directory
  133. result, stdout, stderr = self._run_cli("rm", "test.txt")
  134. # Check that file is not in index
  135. self.assertNotIn(b"test.txt", self.repo.open_index())
  136. class CommitCommandTest(DulwichCliTestCase):
  137. """Tests for commit command."""
  138. def test_commit_basic(self):
  139. # Create and add a file
  140. test_file = os.path.join(self.repo_path, "test.txt")
  141. with open(test_file, "w") as f:
  142. f.write("test content")
  143. self._run_cli("add", "test.txt")
  144. # Commit
  145. result, stdout, stderr = self._run_cli("commit", "--message=Initial commit")
  146. # Check that HEAD points to a commit
  147. self.assertIsNotNone(self.repo.head())
  148. def test_commit_all_flag(self):
  149. # Create initial commit
  150. test_file = os.path.join(self.repo_path, "test.txt")
  151. with open(test_file, "w") as f:
  152. f.write("initial content")
  153. self._run_cli("add", "test.txt")
  154. self._run_cli("commit", "--message=Initial commit")
  155. # Modify the file (don't stage it)
  156. with open(test_file, "w") as f:
  157. f.write("modified content")
  158. # Create another file and don't add it (untracked)
  159. untracked_file = os.path.join(self.repo_path, "untracked.txt")
  160. with open(untracked_file, "w") as f:
  161. f.write("untracked content")
  162. # Commit with -a flag should stage and commit the modified file,
  163. # but not the untracked file
  164. result, stdout, stderr = self._run_cli(
  165. "commit", "-a", "--message=Modified commit"
  166. )
  167. self.assertIsNotNone(self.repo.head())
  168. # Check that the modification was committed
  169. with open(test_file) as f:
  170. content = f.read()
  171. self.assertEqual(content, "modified content")
  172. # Check that untracked file is still untracked
  173. self.assertTrue(os.path.exists(untracked_file))
  174. def test_commit_all_flag_no_changes(self):
  175. # Create initial commit
  176. test_file = os.path.join(self.repo_path, "test.txt")
  177. with open(test_file, "w") as f:
  178. f.write("initial content")
  179. self._run_cli("add", "test.txt")
  180. self._run_cli("commit", "--message=Initial commit")
  181. # Try to commit with -a when there are no changes
  182. # This should still work (git allows this)
  183. result, stdout, stderr = self._run_cli(
  184. "commit", "-a", "--message=No changes commit"
  185. )
  186. self.assertIsNotNone(self.repo.head())
  187. def test_commit_all_flag_multiple_files(self):
  188. # Create initial commit with multiple files
  189. file1 = os.path.join(self.repo_path, "file1.txt")
  190. file2 = os.path.join(self.repo_path, "file2.txt")
  191. with open(file1, "w") as f:
  192. f.write("content1")
  193. with open(file2, "w") as f:
  194. f.write("content2")
  195. self._run_cli("add", "file1.txt", "file2.txt")
  196. self._run_cli("commit", "--message=Initial commit")
  197. # Modify both files
  198. with open(file1, "w") as f:
  199. f.write("modified content1")
  200. with open(file2, "w") as f:
  201. f.write("modified content2")
  202. # Create an untracked file
  203. untracked_file = os.path.join(self.repo_path, "untracked.txt")
  204. with open(untracked_file, "w") as f:
  205. f.write("untracked content")
  206. # Commit with -a should stage both modified files but not untracked
  207. result, stdout, stderr = self._run_cli(
  208. "commit", "-a", "--message=Modified both files"
  209. )
  210. self.assertIsNotNone(self.repo.head())
  211. # Verify modifications were committed
  212. with open(file1) as f:
  213. self.assertEqual(f.read(), "modified content1")
  214. with open(file2) as f:
  215. self.assertEqual(f.read(), "modified content2")
  216. # Verify untracked file still exists
  217. self.assertTrue(os.path.exists(untracked_file))
  218. class LogCommandTest(DulwichCliTestCase):
  219. """Tests for log command."""
  220. def test_log_empty_repo(self):
  221. result, stdout, stderr = self._run_cli("log")
  222. # Empty repo should not crash
  223. def test_log_with_commits(self):
  224. # Create some commits
  225. c1, c2, c3 = build_commit_graph(
  226. self.repo.object_store, [[1], [2, 1], [3, 1, 2]]
  227. )
  228. self.repo.refs[b"HEAD"] = c3.id
  229. result, stdout, stderr = self._run_cli("log")
  230. self.assertIn("Commit 3", stdout)
  231. self.assertIn("Commit 2", stdout)
  232. self.assertIn("Commit 1", stdout)
  233. def test_log_reverse(self):
  234. # Create some commits
  235. c1, c2, c3 = build_commit_graph(
  236. self.repo.object_store, [[1], [2, 1], [3, 1, 2]]
  237. )
  238. self.repo.refs[b"HEAD"] = c3.id
  239. result, stdout, stderr = self._run_cli("log", "--reverse")
  240. # Check order - commit 1 should appear before commit 3
  241. pos1 = stdout.index("Commit 1")
  242. pos3 = stdout.index("Commit 3")
  243. self.assertLess(pos1, pos3)
  244. class StatusCommandTest(DulwichCliTestCase):
  245. """Tests for status command."""
  246. def test_status_empty(self):
  247. result, stdout, stderr = self._run_cli("status")
  248. # Should not crash on empty repo
  249. def test_status_with_untracked(self):
  250. # Create an untracked file
  251. test_file = os.path.join(self.repo_path, "untracked.txt")
  252. with open(test_file, "w") as f:
  253. f.write("untracked content")
  254. result, stdout, stderr = self._run_cli("status")
  255. self.assertIn("Untracked files:", stdout)
  256. self.assertIn("untracked.txt", stdout)
  257. class BranchCommandTest(DulwichCliTestCase):
  258. """Tests for branch command."""
  259. def test_branch_create(self):
  260. # Create initial commit
  261. test_file = os.path.join(self.repo_path, "test.txt")
  262. with open(test_file, "w") as f:
  263. f.write("test")
  264. self._run_cli("add", "test.txt")
  265. self._run_cli("commit", "--message=Initial")
  266. # Create branch
  267. result, stdout, stderr = self._run_cli("branch", "test-branch")
  268. self.assertIn(b"refs/heads/test-branch", self.repo.refs.keys())
  269. def test_branch_delete(self):
  270. # Create initial commit and branch
  271. test_file = os.path.join(self.repo_path, "test.txt")
  272. with open(test_file, "w") as f:
  273. f.write("test")
  274. self._run_cli("add", "test.txt")
  275. self._run_cli("commit", "--message=Initial")
  276. self._run_cli("branch", "test-branch")
  277. # Delete branch
  278. result, stdout, stderr = self._run_cli("branch", "-d", "test-branch")
  279. self.assertNotIn(b"refs/heads/test-branch", self.repo.refs.keys())
  280. class CheckoutCommandTest(DulwichCliTestCase):
  281. """Tests for checkout command."""
  282. def test_checkout_branch(self):
  283. # Create initial commit and branch
  284. test_file = os.path.join(self.repo_path, "test.txt")
  285. with open(test_file, "w") as f:
  286. f.write("test")
  287. self._run_cli("add", "test.txt")
  288. self._run_cli("commit", "--message=Initial")
  289. self._run_cli("branch", "test-branch")
  290. # Checkout branch
  291. result, stdout, stderr = self._run_cli("checkout", "test-branch")
  292. self.assertEqual(
  293. self.repo.refs.read_ref(b"HEAD"), b"ref: refs/heads/test-branch"
  294. )
  295. class TagCommandTest(DulwichCliTestCase):
  296. """Tests for tag command."""
  297. def test_tag_create(self):
  298. # Create initial commit
  299. test_file = os.path.join(self.repo_path, "test.txt")
  300. with open(test_file, "w") as f:
  301. f.write("test")
  302. self._run_cli("add", "test.txt")
  303. self._run_cli("commit", "--message=Initial")
  304. # Create tag
  305. result, stdout, stderr = self._run_cli("tag", "v1.0")
  306. self.assertIn(b"refs/tags/v1.0", self.repo.refs.keys())
  307. class DiffCommandTest(DulwichCliTestCase):
  308. """Tests for diff command."""
  309. def test_diff_working_tree(self):
  310. # Create and commit a file
  311. test_file = os.path.join(self.repo_path, "test.txt")
  312. with open(test_file, "w") as f:
  313. f.write("initial content\n")
  314. self._run_cli("add", "test.txt")
  315. self._run_cli("commit", "--message=Initial")
  316. # Modify the file
  317. with open(test_file, "w") as f:
  318. f.write("initial content\nmodified\n")
  319. # Test unstaged diff
  320. result, stdout, stderr = self._run_cli("diff")
  321. self.assertIn("+modified", stdout)
  322. def test_diff_staged(self):
  323. # Create initial commit
  324. test_file = os.path.join(self.repo_path, "test.txt")
  325. with open(test_file, "w") as f:
  326. f.write("initial content\n")
  327. self._run_cli("add", "test.txt")
  328. self._run_cli("commit", "--message=Initial")
  329. # Modify and stage the file
  330. with open(test_file, "w") as f:
  331. f.write("initial content\nnew file\n")
  332. self._run_cli("add", "test.txt")
  333. # Test staged diff
  334. result, stdout, stderr = self._run_cli("diff", "--staged")
  335. self.assertIn("+new file", stdout)
  336. def test_diff_cached(self):
  337. # Create initial commit
  338. test_file = os.path.join(self.repo_path, "test.txt")
  339. with open(test_file, "w") as f:
  340. f.write("initial content\n")
  341. self._run_cli("add", "test.txt")
  342. self._run_cli("commit", "--message=Initial")
  343. # Modify and stage the file
  344. with open(test_file, "w") as f:
  345. f.write("initial content\nnew file\n")
  346. self._run_cli("add", "test.txt")
  347. # Test cached diff (alias for staged)
  348. result, stdout, stderr = self._run_cli("diff", "--cached")
  349. self.assertIn("+new file", stdout)
  350. def test_diff_commit(self):
  351. # Create two commits
  352. test_file = os.path.join(self.repo_path, "test.txt")
  353. with open(test_file, "w") as f:
  354. f.write("first version\n")
  355. self._run_cli("add", "test.txt")
  356. self._run_cli("commit", "--message=First")
  357. with open(test_file, "w") as f:
  358. f.write("first version\nsecond line\n")
  359. self._run_cli("add", "test.txt")
  360. self._run_cli("commit", "--message=Second")
  361. # Add working tree changes
  362. with open(test_file, "a") as f:
  363. f.write("working tree change\n")
  364. # Test single commit diff (should show working tree vs HEAD)
  365. result, stdout, stderr = self._run_cli("diff", "HEAD")
  366. self.assertIn("+working tree change", stdout)
  367. def test_diff_two_commits(self):
  368. # Create two commits
  369. test_file = os.path.join(self.repo_path, "test.txt")
  370. with open(test_file, "w") as f:
  371. f.write("first version\n")
  372. self._run_cli("add", "test.txt")
  373. self._run_cli("commit", "--message=First")
  374. # Get first commit SHA
  375. first_commit = self.repo.refs[b"HEAD"].decode()
  376. with open(test_file, "w") as f:
  377. f.write("first version\nsecond line\n")
  378. self._run_cli("add", "test.txt")
  379. self._run_cli("commit", "--message=Second")
  380. # Get second commit SHA
  381. second_commit = self.repo.refs[b"HEAD"].decode()
  382. # Test diff between two commits
  383. result, stdout, stderr = self._run_cli("diff", first_commit, second_commit)
  384. self.assertIn("+second line", stdout)
  385. def test_diff_commit_vs_working_tree(self):
  386. # Test that diff <commit> shows working tree vs commit (not commit vs parent)
  387. test_file = os.path.join(self.repo_path, "test.txt")
  388. with open(test_file, "w") as f:
  389. f.write("first version\n")
  390. self._run_cli("add", "test.txt")
  391. self._run_cli("commit", "--message=First")
  392. first_commit = self.repo.refs[b"HEAD"].decode()
  393. with open(test_file, "w") as f:
  394. f.write("first version\nsecond line\n")
  395. self._run_cli("add", "test.txt")
  396. self._run_cli("commit", "--message=Second")
  397. # Add changes to working tree
  398. with open(test_file, "w") as f:
  399. f.write("completely different\n")
  400. # diff <first_commit> should show working tree vs first commit
  401. result, stdout, stderr = self._run_cli("diff", first_commit)
  402. self.assertIn("-first version", stdout)
  403. self.assertIn("+completely different", stdout)
  404. def test_diff_with_paths(self):
  405. # Test path filtering
  406. # Create multiple files
  407. file1 = os.path.join(self.repo_path, "file1.txt")
  408. file2 = os.path.join(self.repo_path, "file2.txt")
  409. subdir = os.path.join(self.repo_path, "subdir")
  410. os.makedirs(subdir)
  411. file3 = os.path.join(subdir, "file3.txt")
  412. with open(file1, "w") as f:
  413. f.write("content1\n")
  414. with open(file2, "w") as f:
  415. f.write("content2\n")
  416. with open(file3, "w") as f:
  417. f.write("content3\n")
  418. self._run_cli("add", ".")
  419. self._run_cli("commit", "--message=Initial")
  420. # Modify all files
  421. with open(file1, "w") as f:
  422. f.write("modified1\n")
  423. with open(file2, "w") as f:
  424. f.write("modified2\n")
  425. with open(file3, "w") as f:
  426. f.write("modified3\n")
  427. # Test diff with specific file
  428. result, stdout, stderr = self._run_cli("diff", "--", "file1.txt")
  429. self.assertIn("file1.txt", stdout)
  430. self.assertNotIn("file2.txt", stdout)
  431. self.assertNotIn("file3.txt", stdout)
  432. # Test diff with directory
  433. result, stdout, stderr = self._run_cli("diff", "--", "subdir")
  434. self.assertNotIn("file1.txt", stdout)
  435. self.assertNotIn("file2.txt", stdout)
  436. self.assertIn("file3.txt", stdout)
  437. # Test staged diff with paths
  438. self._run_cli("add", "file1.txt")
  439. result, stdout, stderr = self._run_cli("diff", "--staged", "--", "file1.txt")
  440. self.assertIn("file1.txt", stdout)
  441. self.assertIn("+modified1", stdout)
  442. # Test diff with multiple paths (file2 and file3 are still unstaged)
  443. result, stdout, stderr = self._run_cli(
  444. "diff", "--", "file2.txt", "subdir/file3.txt"
  445. )
  446. self.assertIn("file2.txt", stdout)
  447. self.assertIn("file3.txt", stdout)
  448. self.assertNotIn("file1.txt", stdout)
  449. # Test diff with commit and paths
  450. first_commit = self.repo.refs[b"HEAD"].decode()
  451. with open(file1, "w") as f:
  452. f.write("newer1\n")
  453. result, stdout, stderr = self._run_cli("diff", first_commit, "--", "file1.txt")
  454. self.assertIn("file1.txt", stdout)
  455. self.assertIn("-content1", stdout)
  456. self.assertIn("+newer1", stdout)
  457. self.assertNotIn("file2.txt", stdout)
  458. class FilterBranchCommandTest(DulwichCliTestCase):
  459. """Tests for filter-branch command."""
  460. def setUp(self):
  461. super().setUp()
  462. # Create a more complex repository structure for testing
  463. # Create some files in subdirectories
  464. os.makedirs(os.path.join(self.repo_path, "subdir"))
  465. os.makedirs(os.path.join(self.repo_path, "other"))
  466. # Create files
  467. files = {
  468. "README.md": "# Test Repo",
  469. "subdir/file1.txt": "File in subdir",
  470. "subdir/file2.txt": "Another file in subdir",
  471. "other/file3.txt": "File in other dir",
  472. "root.txt": "File at root",
  473. }
  474. for path, content in files.items():
  475. file_path = os.path.join(self.repo_path, path)
  476. with open(file_path, "w") as f:
  477. f.write(content)
  478. # Add all files and create initial commit
  479. self._run_cli("add", ".")
  480. self._run_cli("commit", "--message=Initial commit")
  481. # Create a second commit modifying subdir
  482. with open(os.path.join(self.repo_path, "subdir/file1.txt"), "a") as f:
  483. f.write("\nModified content")
  484. self._run_cli("add", "subdir/file1.txt")
  485. self._run_cli("commit", "--message=Modify subdir file")
  486. # Create a third commit in other dir
  487. with open(os.path.join(self.repo_path, "other/file3.txt"), "a") as f:
  488. f.write("\nMore content")
  489. self._run_cli("add", "other/file3.txt")
  490. self._run_cli("commit", "--message=Modify other file")
  491. # Create a branch
  492. self._run_cli("branch", "test-branch")
  493. # Create a tag
  494. self._run_cli("tag", "v1.0")
  495. def test_filter_branch_subdirectory_filter(self):
  496. """Test filter-branch with subdirectory filter."""
  497. # Run filter-branch to extract only the subdir
  498. result, stdout, stderr = self._run_cli(
  499. "filter-branch", "--subdirectory-filter", "subdir"
  500. )
  501. # Check that the operation succeeded
  502. self.assertEqual(result, 0)
  503. self.assertIn("Rewrite HEAD", stdout)
  504. # filter-branch rewrites history but doesn't update working tree
  505. # We need to check the commit contents, not the working tree
  506. # Reset to the rewritten HEAD to update working tree
  507. self._run_cli("reset", "--hard", "HEAD")
  508. # Now check that only files from subdir remain at root level
  509. self.assertTrue(os.path.exists(os.path.join(self.repo_path, "file1.txt")))
  510. self.assertTrue(os.path.exists(os.path.join(self.repo_path, "file2.txt")))
  511. self.assertFalse(os.path.exists(os.path.join(self.repo_path, "README.md")))
  512. self.assertFalse(os.path.exists(os.path.join(self.repo_path, "root.txt")))
  513. self.assertFalse(os.path.exists(os.path.join(self.repo_path, "other")))
  514. self.assertFalse(os.path.exists(os.path.join(self.repo_path, "subdir")))
  515. # Check that original refs were backed up
  516. original_refs = [
  517. ref for ref in self.repo.refs.keys() if ref.startswith(b"refs/original/")
  518. ]
  519. self.assertTrue(
  520. len(original_refs) > 0, "No original refs found after filter-branch"
  521. )
  522. @skipIf(sys.platform == "win32", "sed command not available on Windows")
  523. def test_filter_branch_msg_filter(self):
  524. """Test filter-branch with message filter."""
  525. # Run filter-branch to prepend [FILTERED] to commit messages
  526. result, stdout, stderr = self._run_cli(
  527. "filter-branch", "--msg-filter", "sed 's/^/[FILTERED] /'"
  528. )
  529. self.assertEqual(result, 0)
  530. # Check that commit messages were modified
  531. result, stdout, stderr = self._run_cli("log")
  532. self.assertIn("[FILTERED] Modify other file", stdout)
  533. self.assertIn("[FILTERED] Modify subdir file", stdout)
  534. self.assertIn("[FILTERED] Initial commit", stdout)
  535. def test_filter_branch_env_filter(self):
  536. """Test filter-branch with environment filter."""
  537. # Run filter-branch to change author email
  538. env_filter = """
  539. if [ "$GIT_AUTHOR_EMAIL" = "test@example.com" ]; then
  540. export GIT_AUTHOR_EMAIL="filtered@example.com"
  541. fi
  542. """
  543. result, stdout, stderr = self._run_cli(
  544. "filter-branch", "--env-filter", env_filter
  545. )
  546. self.assertEqual(result, 0)
  547. def test_filter_branch_prune_empty(self):
  548. """Test filter-branch with prune-empty option."""
  549. # Create a commit that only touches files outside subdir
  550. with open(os.path.join(self.repo_path, "root.txt"), "a") as f:
  551. f.write("\nNew line")
  552. self._run_cli("add", "root.txt")
  553. self._run_cli("commit", "--message=Modify root file only")
  554. # Run filter-branch to extract subdir with prune-empty
  555. result, stdout, stderr = self._run_cli(
  556. "filter-branch", "--subdirectory-filter", "subdir", "--prune-empty"
  557. )
  558. self.assertEqual(result, 0)
  559. # The last commit should have been pruned
  560. result, stdout, stderr = self._run_cli("log")
  561. self.assertNotIn("Modify root file only", stdout)
  562. @skipIf(sys.platform == "win32", "sed command not available on Windows")
  563. def test_filter_branch_force(self):
  564. """Test filter-branch with force option."""
  565. # Run filter-branch once with a filter that actually changes something
  566. result, stdout, stderr = self._run_cli(
  567. "filter-branch", "--msg-filter", "sed 's/^/[TEST] /'"
  568. )
  569. self.assertEqual(result, 0)
  570. # Check that backup refs were created
  571. # The implementation backs up refs under refs/original/
  572. original_refs = [
  573. ref for ref in self.repo.refs.keys() if ref.startswith(b"refs/original/")
  574. ]
  575. self.assertTrue(len(original_refs) > 0, "No original refs found")
  576. # Run again without force - should fail
  577. result, stdout, stderr = self._run_cli(
  578. "filter-branch", "--msg-filter", "sed 's/^/[TEST2] /'"
  579. )
  580. self.assertEqual(result, 1)
  581. self.assertIn("Cannot create a new backup", stdout)
  582. self.assertIn("refs/original", stdout)
  583. # Run with force - should succeed
  584. result, stdout, stderr = self._run_cli(
  585. "filter-branch", "--force", "--msg-filter", "sed 's/^/[TEST3] /'"
  586. )
  587. self.assertEqual(result, 0)
  588. @skipIf(sys.platform == "win32", "sed command not available on Windows")
  589. def test_filter_branch_specific_branch(self):
  590. """Test filter-branch on a specific branch."""
  591. # Switch to test-branch and add a commit
  592. self._run_cli("checkout", "test-branch")
  593. with open(os.path.join(self.repo_path, "branch-file.txt"), "w") as f:
  594. f.write("Branch specific file")
  595. self._run_cli("add", "branch-file.txt")
  596. self._run_cli("commit", "--message=Branch commit")
  597. # Run filter-branch on the test-branch
  598. result, stdout, stderr = self._run_cli(
  599. "filter-branch", "--msg-filter", "sed 's/^/[BRANCH] /'", "test-branch"
  600. )
  601. self.assertEqual(result, 0)
  602. self.assertIn("Ref 'refs/heads/test-branch' was rewritten", stdout)
  603. # Check that only test-branch was modified
  604. result, stdout, stderr = self._run_cli("log")
  605. self.assertIn("[BRANCH] Branch commit", stdout)
  606. # Switch to master and check it wasn't modified
  607. self._run_cli("checkout", "master")
  608. result, stdout, stderr = self._run_cli("log")
  609. self.assertNotIn("[BRANCH]", stdout)
  610. def test_filter_branch_tree_filter(self):
  611. """Test filter-branch with tree filter."""
  612. # Use a tree filter to remove a specific file
  613. tree_filter = "rm -f root.txt"
  614. result, stdout, stderr = self._run_cli(
  615. "filter-branch", "--tree-filter", tree_filter
  616. )
  617. self.assertEqual(result, 0)
  618. # Check that the file was removed from the latest commit
  619. # We need to check the commit tree, not the working directory
  620. result, stdout, stderr = self._run_cli("ls-tree", "HEAD")
  621. self.assertNotIn("root.txt", stdout)
  622. def test_filter_branch_index_filter(self):
  623. """Test filter-branch with index filter."""
  624. # Use an index filter to remove a file from the index
  625. index_filter = "git rm --cached --ignore-unmatch root.txt"
  626. result, stdout, stderr = self._run_cli(
  627. "filter-branch", "--index-filter", index_filter
  628. )
  629. self.assertEqual(result, 0)
  630. def test_filter_branch_parent_filter(self):
  631. """Test filter-branch with parent filter."""
  632. # Create a merge commit first
  633. self._run_cli("checkout", "HEAD", "-b", "feature")
  634. with open(os.path.join(self.repo_path, "feature.txt"), "w") as f:
  635. f.write("Feature")
  636. self._run_cli("add", "feature.txt")
  637. self._run_cli("commit", "--message=Feature commit")
  638. self._run_cli("checkout", "master")
  639. self._run_cli("merge", "feature", "--message=Merge feature")
  640. # Use parent filter to linearize history (remove second parent)
  641. parent_filter = "cut -d' ' -f1"
  642. result, stdout, stderr = self._run_cli(
  643. "filter-branch", "--parent-filter", parent_filter
  644. )
  645. self.assertEqual(result, 0)
  646. def test_filter_branch_commit_filter(self):
  647. """Test filter-branch with commit filter."""
  648. # Use commit filter to skip commits with certain messages
  649. commit_filter = """
  650. if grep -q "Modify other" <<< "$GIT_COMMIT_MESSAGE"; then
  651. skip_commit "$@"
  652. else
  653. git commit-tree "$@"
  654. fi
  655. """
  656. result, stdout, stderr = self._run_cli(
  657. "filter-branch", "--commit-filter", commit_filter
  658. )
  659. # Note: This test may fail because the commit filter syntax is simplified
  660. # In real Git, skip_commit is a function, but our implementation may differ
  661. def test_filter_branch_tag_name_filter(self):
  662. """Test filter-branch with tag name filter."""
  663. # Run filter-branch with tag name filter to rename tags
  664. result, stdout, stderr = self._run_cli(
  665. "filter-branch",
  666. "--tag-name-filter",
  667. "sed 's/^v/version-/'",
  668. "--msg-filter",
  669. "cat",
  670. )
  671. self.assertEqual(result, 0)
  672. # Check that tag was renamed
  673. self.assertIn(b"refs/tags/version-1.0", self.repo.refs.keys())
  674. def test_filter_branch_errors(self):
  675. """Test filter-branch error handling."""
  676. # Test with invalid subdirectory
  677. result, stdout, stderr = self._run_cli(
  678. "filter-branch", "--subdirectory-filter", "nonexistent"
  679. )
  680. # Should still succeed but produce empty history
  681. self.assertEqual(result, 0)
  682. def test_filter_branch_no_args(self):
  683. """Test filter-branch with no arguments."""
  684. # Should work as no-op
  685. result, stdout, stderr = self._run_cli("filter-branch")
  686. self.assertEqual(result, 0)
  687. class ShowCommandTest(DulwichCliTestCase):
  688. """Tests for show command."""
  689. def test_show_commit(self):
  690. # Create a commit
  691. test_file = os.path.join(self.repo_path, "test.txt")
  692. with open(test_file, "w") as f:
  693. f.write("test content")
  694. self._run_cli("add", "test.txt")
  695. self._run_cli("commit", "--message=Test commit")
  696. result, stdout, stderr = self._run_cli("show", "HEAD")
  697. self.assertIn("Test commit", stdout)
  698. class FormatPatchCommandTest(DulwichCliTestCase):
  699. """Tests for format-patch command."""
  700. def test_format_patch_single_commit(self):
  701. # Create a commit with actual content
  702. from dulwich.objects import Blob, Tree
  703. # Initial commit
  704. tree1 = Tree()
  705. self.repo.object_store.add_object(tree1)
  706. self.repo.do_commit(
  707. message=b"Initial commit",
  708. tree=tree1.id,
  709. )
  710. # Second commit with a file
  711. blob = Blob.from_string(b"Hello, World!\n")
  712. self.repo.object_store.add_object(blob)
  713. tree2 = Tree()
  714. tree2.add(b"hello.txt", 0o100644, blob.id)
  715. self.repo.object_store.add_object(tree2)
  716. self.repo.do_commit(
  717. message=b"Add hello.txt",
  718. tree=tree2.id,
  719. )
  720. # Test format-patch for last commit
  721. result, stdout, stderr = self._run_cli("format-patch", "-n", "1")
  722. self.assertEqual(result, None)
  723. self.assertIn("0001-Add-hello.txt.patch", stdout)
  724. # Check patch contents
  725. patch_file = os.path.join(self.repo_path, "0001-Add-hello.txt.patch")
  726. with open(patch_file, "rb") as f:
  727. content = f.read()
  728. # Check header
  729. self.assertIn(b"Subject: [PATCH 1/1] Add hello.txt", content)
  730. self.assertIn(b"From:", content)
  731. self.assertIn(b"Date:", content)
  732. # Check diff content
  733. self.assertIn(b"diff --git a/hello.txt b/hello.txt", content)
  734. self.assertIn(b"new file mode", content)
  735. self.assertIn(b"+Hello, World!", content)
  736. # Check footer
  737. self.assertIn(b"-- \nDulwich", content)
  738. # Clean up
  739. os.remove(patch_file)
  740. def test_format_patch_multiple_commits(self):
  741. from dulwich.objects import Blob, Tree
  742. # Initial commit
  743. tree1 = Tree()
  744. self.repo.object_store.add_object(tree1)
  745. self.repo.do_commit(
  746. message=b"Initial commit",
  747. tree=tree1.id,
  748. )
  749. # Second commit
  750. blob1 = Blob.from_string(b"File 1 content\n")
  751. self.repo.object_store.add_object(blob1)
  752. tree2 = Tree()
  753. tree2.add(b"file1.txt", 0o100644, blob1.id)
  754. self.repo.object_store.add_object(tree2)
  755. self.repo.do_commit(
  756. message=b"Add file1.txt",
  757. tree=tree2.id,
  758. )
  759. # Third commit
  760. blob2 = Blob.from_string(b"File 2 content\n")
  761. self.repo.object_store.add_object(blob2)
  762. tree3 = Tree()
  763. tree3.add(b"file1.txt", 0o100644, blob1.id)
  764. tree3.add(b"file2.txt", 0o100644, blob2.id)
  765. self.repo.object_store.add_object(tree3)
  766. self.repo.do_commit(
  767. message=b"Add file2.txt",
  768. tree=tree3.id,
  769. )
  770. # Test format-patch for last 2 commits
  771. result, stdout, stderr = self._run_cli("format-patch", "-n", "2")
  772. self.assertEqual(result, None)
  773. self.assertIn("0001-Add-file1.txt.patch", stdout)
  774. self.assertIn("0002-Add-file2.txt.patch", stdout)
  775. # Check first patch
  776. with open(os.path.join(self.repo_path, "0001-Add-file1.txt.patch"), "rb") as f:
  777. content = f.read()
  778. self.assertIn(b"Subject: [PATCH 1/2] Add file1.txt", content)
  779. self.assertIn(b"+File 1 content", content)
  780. # Check second patch
  781. with open(os.path.join(self.repo_path, "0002-Add-file2.txt.patch"), "rb") as f:
  782. content = f.read()
  783. self.assertIn(b"Subject: [PATCH 2/2] Add file2.txt", content)
  784. self.assertIn(b"+File 2 content", content)
  785. # Clean up
  786. os.remove(os.path.join(self.repo_path, "0001-Add-file1.txt.patch"))
  787. os.remove(os.path.join(self.repo_path, "0002-Add-file2.txt.patch"))
  788. def test_format_patch_output_directory(self):
  789. from dulwich.objects import Blob, Tree
  790. # Create a commit
  791. blob = Blob.from_string(b"Test content\n")
  792. self.repo.object_store.add_object(blob)
  793. tree = Tree()
  794. tree.add(b"test.txt", 0o100644, blob.id)
  795. self.repo.object_store.add_object(tree)
  796. self.repo.do_commit(
  797. message=b"Test commit",
  798. tree=tree.id,
  799. )
  800. # Create output directory
  801. output_dir = os.path.join(self.test_dir, "patches")
  802. os.makedirs(output_dir)
  803. # Test format-patch with output directory
  804. result, stdout, stderr = self._run_cli(
  805. "format-patch", "-o", output_dir, "-n", "1"
  806. )
  807. self.assertEqual(result, None)
  808. # Check that file was created in output directory with correct content
  809. patch_file = os.path.join(output_dir, "0001-Test-commit.patch")
  810. self.assertTrue(os.path.exists(patch_file))
  811. with open(patch_file, "rb") as f:
  812. content = f.read()
  813. self.assertIn(b"Subject: [PATCH 1/1] Test commit", content)
  814. self.assertIn(b"+Test content", content)
  815. def test_format_patch_commit_range(self):
  816. from dulwich.objects import Blob, Tree
  817. # Create commits with actual file changes
  818. commits = []
  819. trees = []
  820. # Initial empty commit
  821. tree0 = Tree()
  822. self.repo.object_store.add_object(tree0)
  823. trees.append(tree0)
  824. c0 = self.repo.do_commit(
  825. message=b"Initial commit",
  826. tree=tree0.id,
  827. )
  828. commits.append(c0)
  829. # Add three files in separate commits
  830. for i in range(1, 4):
  831. blob = Blob.from_string(f"Content {i}\n".encode())
  832. self.repo.object_store.add_object(blob)
  833. tree = Tree()
  834. # Copy previous files
  835. for j in range(1, i):
  836. prev_blob_id = trees[j][f"file{j}.txt".encode()][1]
  837. tree.add(f"file{j}.txt".encode(), 0o100644, prev_blob_id)
  838. # Add new file
  839. tree.add(f"file{i}.txt".encode(), 0o100644, blob.id)
  840. self.repo.object_store.add_object(tree)
  841. trees.append(tree)
  842. c = self.repo.do_commit(
  843. message=f"Add file{i}.txt".encode(),
  844. tree=tree.id,
  845. )
  846. commits.append(c)
  847. # Test format-patch with commit range (should get commits 2 and 3)
  848. result, stdout, stderr = self._run_cli(
  849. "format-patch", f"{commits[1].decode()}..{commits[3].decode()}"
  850. )
  851. self.assertEqual(result, None)
  852. # Should create patches for commits 2 and 3
  853. self.assertIn("0001-Add-file2.txt.patch", stdout)
  854. self.assertIn("0002-Add-file3.txt.patch", stdout)
  855. # Verify patch contents
  856. with open(os.path.join(self.repo_path, "0001-Add-file2.txt.patch"), "rb") as f:
  857. content = f.read()
  858. self.assertIn(b"Subject: [PATCH 1/2] Add file2.txt", content)
  859. self.assertIn(b"+Content 2", content)
  860. self.assertNotIn(b"file3.txt", content) # Should not include file3
  861. with open(os.path.join(self.repo_path, "0002-Add-file3.txt.patch"), "rb") as f:
  862. content = f.read()
  863. self.assertIn(b"Subject: [PATCH 2/2] Add file3.txt", content)
  864. self.assertIn(b"+Content 3", content)
  865. self.assertNotIn(b"file2.txt", content) # Should not modify file2
  866. # Clean up
  867. os.remove(os.path.join(self.repo_path, "0001-Add-file2.txt.patch"))
  868. os.remove(os.path.join(self.repo_path, "0002-Add-file3.txt.patch"))
  869. def test_format_patch_stdout(self):
  870. from dulwich.objects import Blob, Tree
  871. # Create a commit with modified file
  872. tree1 = Tree()
  873. blob1 = Blob.from_string(b"Original content\n")
  874. self.repo.object_store.add_object(blob1)
  875. tree1.add(b"file.txt", 0o100644, blob1.id)
  876. self.repo.object_store.add_object(tree1)
  877. self.repo.do_commit(
  878. message=b"Initial commit",
  879. tree=tree1.id,
  880. )
  881. tree2 = Tree()
  882. blob2 = Blob.from_string(b"Modified content\n")
  883. self.repo.object_store.add_object(blob2)
  884. tree2.add(b"file.txt", 0o100644, blob2.id)
  885. self.repo.object_store.add_object(tree2)
  886. self.repo.do_commit(
  887. message=b"Modify file.txt",
  888. tree=tree2.id,
  889. )
  890. # Mock stdout as a BytesIO for binary output
  891. stdout_stream = io.BytesIO()
  892. stdout_stream.buffer = stdout_stream
  893. # Run command with --stdout
  894. old_stdout = sys.stdout
  895. old_stderr = sys.stderr
  896. old_cwd = os.getcwd()
  897. try:
  898. sys.stdout = stdout_stream
  899. sys.stderr = io.StringIO()
  900. os.chdir(self.repo_path)
  901. cli.main(["format-patch", "--stdout", "-n", "1"])
  902. finally:
  903. sys.stdout = old_stdout
  904. sys.stderr = old_stderr
  905. os.chdir(old_cwd)
  906. # Check output
  907. stdout_stream.seek(0)
  908. output = stdout_stream.read()
  909. self.assertIn(b"Subject: [PATCH 1/1] Modify file.txt", output)
  910. self.assertIn(b"diff --git a/file.txt b/file.txt", output)
  911. self.assertIn(b"-Original content", output)
  912. self.assertIn(b"+Modified content", output)
  913. self.assertIn(b"-- \nDulwich", output)
  914. def test_format_patch_empty_repo(self):
  915. # Test with empty repository
  916. result, stdout, stderr = self._run_cli("format-patch", "-n", "5")
  917. self.assertEqual(result, None)
  918. # Should produce no output for empty repo
  919. self.assertEqual(stdout.strip(), "")
  920. class FetchPackCommandTest(DulwichCliTestCase):
  921. """Tests for fetch-pack command."""
  922. @patch("dulwich.cli.get_transport_and_path")
  923. def test_fetch_pack_basic(self, mock_transport):
  924. # Mock the transport
  925. mock_client = MagicMock()
  926. mock_transport.return_value = (mock_client, "/path/to/repo")
  927. mock_client.fetch.return_value = None
  928. result, stdout, stderr = self._run_cli(
  929. "fetch-pack", "git://example.com/repo.git"
  930. )
  931. mock_client.fetch.assert_called_once()
  932. class LsRemoteCommandTest(DulwichCliTestCase):
  933. """Tests for ls-remote command."""
  934. def test_ls_remote_basic(self):
  935. # Create a commit
  936. test_file = os.path.join(self.repo_path, "test.txt")
  937. with open(test_file, "w") as f:
  938. f.write("test")
  939. self._run_cli("add", "test.txt")
  940. self._run_cli("commit", "--message=Initial")
  941. # Test basic ls-remote
  942. result, stdout, stderr = self._run_cli("ls-remote", self.repo_path)
  943. lines = stdout.strip().split("\n")
  944. self.assertTrue(any("HEAD" in line for line in lines))
  945. self.assertTrue(any("refs/heads/master" in line for line in lines))
  946. def test_ls_remote_symref(self):
  947. # Create a commit
  948. test_file = os.path.join(self.repo_path, "test.txt")
  949. with open(test_file, "w") as f:
  950. f.write("test")
  951. self._run_cli("add", "test.txt")
  952. self._run_cli("commit", "--message=Initial")
  953. # Test ls-remote with --symref option
  954. result, stdout, stderr = self._run_cli("ls-remote", "--symref", self.repo_path)
  955. lines = stdout.strip().split("\n")
  956. # Should show symref for HEAD in exact format: "ref: refs/heads/master\tHEAD"
  957. expected_line = "ref: refs/heads/master\tHEAD"
  958. self.assertIn(
  959. expected_line,
  960. lines,
  961. f"Expected line '{expected_line}' not found in output: {lines}",
  962. )
  963. class PullCommandTest(DulwichCliTestCase):
  964. """Tests for pull command."""
  965. @patch("dulwich.porcelain.pull")
  966. def test_pull_basic(self, mock_pull):
  967. result, stdout, stderr = self._run_cli("pull", "origin")
  968. mock_pull.assert_called_once()
  969. @patch("dulwich.porcelain.pull")
  970. def test_pull_with_refspec(self, mock_pull):
  971. result, stdout, stderr = self._run_cli("pull", "origin", "master")
  972. mock_pull.assert_called_once()
  973. class PushCommandTest(DulwichCliTestCase):
  974. """Tests for push command."""
  975. @patch("dulwich.porcelain.push")
  976. def test_push_basic(self, mock_push):
  977. result, stdout, stderr = self._run_cli("push", "origin")
  978. mock_push.assert_called_once()
  979. @patch("dulwich.porcelain.push")
  980. def test_push_force(self, mock_push):
  981. result, stdout, stderr = self._run_cli("push", "-f", "origin")
  982. mock_push.assert_called_with(".", "origin", None, force=True)
  983. class ArchiveCommandTest(DulwichCliTestCase):
  984. """Tests for archive command."""
  985. def test_archive_basic(self):
  986. # Create a commit
  987. test_file = os.path.join(self.repo_path, "test.txt")
  988. with open(test_file, "w") as f:
  989. f.write("test content")
  990. self._run_cli("add", "test.txt")
  991. self._run_cli("commit", "--message=Initial")
  992. # Archive produces binary output, so use BytesIO
  993. result, stdout, stderr = self._run_cli(
  994. "archive", "HEAD", stdout_stream=io.BytesIO()
  995. )
  996. # Should complete without error and produce some binary output
  997. self.assertIsInstance(stdout, bytes)
  998. self.assertGreater(len(stdout), 0)
  999. class ForEachRefCommandTest(DulwichCliTestCase):
  1000. """Tests for for-each-ref command."""
  1001. def test_for_each_ref(self):
  1002. # Create a commit
  1003. test_file = os.path.join(self.repo_path, "test.txt")
  1004. with open(test_file, "w") as f:
  1005. f.write("test")
  1006. self._run_cli("add", "test.txt")
  1007. self._run_cli("commit", "--message=Initial")
  1008. result, stdout, stderr = self._run_cli("for-each-ref")
  1009. self.assertIn("refs/heads/master", stdout)
  1010. class PackRefsCommandTest(DulwichCliTestCase):
  1011. """Tests for pack-refs command."""
  1012. def test_pack_refs(self):
  1013. # Create some refs
  1014. test_file = os.path.join(self.repo_path, "test.txt")
  1015. with open(test_file, "w") as f:
  1016. f.write("test")
  1017. self._run_cli("add", "test.txt")
  1018. self._run_cli("commit", "--message=Initial")
  1019. self._run_cli("branch", "test-branch")
  1020. result, stdout, stderr = self._run_cli("pack-refs", "--all")
  1021. # Check that packed-refs file exists
  1022. self.assertTrue(
  1023. os.path.exists(os.path.join(self.repo_path, ".git", "packed-refs"))
  1024. )
  1025. class SubmoduleCommandTest(DulwichCliTestCase):
  1026. """Tests for submodule commands."""
  1027. def test_submodule_list(self):
  1028. # Create an initial commit so repo has a HEAD
  1029. test_file = os.path.join(self.repo_path, "test.txt")
  1030. with open(test_file, "w") as f:
  1031. f.write("test")
  1032. self._run_cli("add", "test.txt")
  1033. self._run_cli("commit", "--message=Initial")
  1034. result, stdout, stderr = self._run_cli("submodule")
  1035. # Should not crash on repo without submodules
  1036. def test_submodule_init(self):
  1037. # Create .gitmodules file for init to work
  1038. gitmodules = os.path.join(self.repo_path, ".gitmodules")
  1039. with open(gitmodules, "w") as f:
  1040. f.write("") # Empty .gitmodules file
  1041. result, stdout, stderr = self._run_cli("submodule", "init")
  1042. # Should not crash
  1043. class StashCommandTest(DulwichCliTestCase):
  1044. """Tests for stash commands."""
  1045. def test_stash_list_empty(self):
  1046. result, stdout, stderr = self._run_cli("stash", "list")
  1047. # Should not crash on empty stash
  1048. def test_stash_push_pop(self):
  1049. # Create a file and modify it
  1050. test_file = os.path.join(self.repo_path, "test.txt")
  1051. with open(test_file, "w") as f:
  1052. f.write("initial")
  1053. self._run_cli("add", "test.txt")
  1054. self._run_cli("commit", "--message=Initial")
  1055. # Modify file
  1056. with open(test_file, "w") as f:
  1057. f.write("modified")
  1058. # Stash changes
  1059. result, stdout, stderr = self._run_cli("stash", "push")
  1060. self.assertIn("Saved working directory", stdout)
  1061. # Note: Dulwich stash doesn't currently update the working tree
  1062. # so the file remains modified after stash push
  1063. # Note: stash pop is not fully implemented in Dulwich yet
  1064. # so we only test stash push here
  1065. class MergeCommandTest(DulwichCliTestCase):
  1066. """Tests for merge command."""
  1067. def test_merge_basic(self):
  1068. # Create initial commit
  1069. test_file = os.path.join(self.repo_path, "test.txt")
  1070. with open(test_file, "w") as f:
  1071. f.write("initial")
  1072. self._run_cli("add", "test.txt")
  1073. self._run_cli("commit", "--message=Initial")
  1074. # Create and checkout new branch
  1075. self._run_cli("branch", "feature")
  1076. self._run_cli("checkout", "feature")
  1077. # Make changes in feature branch
  1078. with open(test_file, "w") as f:
  1079. f.write("feature changes")
  1080. self._run_cli("add", "test.txt")
  1081. self._run_cli("commit", "--message=Feature commit")
  1082. # Go back to main
  1083. self._run_cli("checkout", "master")
  1084. # Merge feature branch
  1085. result, stdout, stderr = self._run_cli("merge", "feature")
  1086. class HelpCommandTest(DulwichCliTestCase):
  1087. """Tests for help command."""
  1088. def test_help_basic(self):
  1089. result, stdout, stderr = self._run_cli("help")
  1090. self.assertIn("dulwich command line tool", stdout)
  1091. def test_help_all(self):
  1092. result, stdout, stderr = self._run_cli("help", "-a")
  1093. self.assertIn("Available commands:", stdout)
  1094. self.assertIn("add", stdout)
  1095. self.assertIn("commit", stdout)
  1096. class RemoteCommandTest(DulwichCliTestCase):
  1097. """Tests for remote commands."""
  1098. def test_remote_add(self):
  1099. result, stdout, stderr = self._run_cli(
  1100. "remote", "add", "origin", "https://github.com/example/repo.git"
  1101. )
  1102. # Check remote was added to config
  1103. config = self.repo.get_config()
  1104. self.assertEqual(
  1105. config.get((b"remote", b"origin"), b"url"),
  1106. b"https://github.com/example/repo.git",
  1107. )
  1108. class CheckIgnoreCommandTest(DulwichCliTestCase):
  1109. """Tests for check-ignore command."""
  1110. def test_check_ignore(self):
  1111. # Create .gitignore
  1112. gitignore = os.path.join(self.repo_path, ".gitignore")
  1113. with open(gitignore, "w") as f:
  1114. f.write("*.log\n")
  1115. result, stdout, stderr = self._run_cli("check-ignore", "test.log", "test.txt")
  1116. self.assertIn("test.log", stdout)
  1117. self.assertNotIn("test.txt", stdout)
  1118. class LsFilesCommandTest(DulwichCliTestCase):
  1119. """Tests for ls-files command."""
  1120. def test_ls_files(self):
  1121. # Add some files
  1122. for name in ["a.txt", "b.txt", "c.txt"]:
  1123. path = os.path.join(self.repo_path, name)
  1124. with open(path, "w") as f:
  1125. f.write(f"content of {name}")
  1126. self._run_cli("add", "a.txt", "b.txt", "c.txt")
  1127. result, stdout, stderr = self._run_cli("ls-files")
  1128. self.assertIn("a.txt", stdout)
  1129. self.assertIn("b.txt", stdout)
  1130. self.assertIn("c.txt", stdout)
  1131. class LsTreeCommandTest(DulwichCliTestCase):
  1132. """Tests for ls-tree command."""
  1133. def test_ls_tree(self):
  1134. # Create a directory structure
  1135. os.mkdir(os.path.join(self.repo_path, "subdir"))
  1136. with open(os.path.join(self.repo_path, "file.txt"), "w") as f:
  1137. f.write("file content")
  1138. with open(os.path.join(self.repo_path, "subdir", "nested.txt"), "w") as f:
  1139. f.write("nested content")
  1140. self._run_cli("add", ".")
  1141. self._run_cli("commit", "--message=Initial")
  1142. result, stdout, stderr = self._run_cli("ls-tree", "HEAD")
  1143. self.assertIn("file.txt", stdout)
  1144. self.assertIn("subdir", stdout)
  1145. def test_ls_tree_recursive(self):
  1146. # Create nested structure
  1147. os.mkdir(os.path.join(self.repo_path, "subdir"))
  1148. with open(os.path.join(self.repo_path, "subdir", "nested.txt"), "w") as f:
  1149. f.write("nested")
  1150. self._run_cli("add", ".")
  1151. self._run_cli("commit", "--message=Initial")
  1152. result, stdout, stderr = self._run_cli("ls-tree", "-r", "HEAD")
  1153. self.assertIn("subdir/nested.txt", stdout)
  1154. class DescribeCommandTest(DulwichCliTestCase):
  1155. """Tests for describe command."""
  1156. def test_describe(self):
  1157. # Create tagged commit
  1158. test_file = os.path.join(self.repo_path, "test.txt")
  1159. with open(test_file, "w") as f:
  1160. f.write("test")
  1161. self._run_cli("add", "test.txt")
  1162. self._run_cli("commit", "--message=Initial")
  1163. self._run_cli("tag", "v1.0")
  1164. result, stdout, stderr = self._run_cli("describe")
  1165. self.assertIn("v1.0", stdout)
  1166. class FsckCommandTest(DulwichCliTestCase):
  1167. """Tests for fsck command."""
  1168. def test_fsck(self):
  1169. # Create a commit
  1170. test_file = os.path.join(self.repo_path, "test.txt")
  1171. with open(test_file, "w") as f:
  1172. f.write("test")
  1173. self._run_cli("add", "test.txt")
  1174. self._run_cli("commit", "--message=Initial")
  1175. result, stdout, stderr = self._run_cli("fsck")
  1176. # Should complete without errors
  1177. class RepackCommandTest(DulwichCliTestCase):
  1178. """Tests for repack command."""
  1179. def test_repack(self):
  1180. # Create some objects
  1181. for i in range(5):
  1182. test_file = os.path.join(self.repo_path, f"test{i}.txt")
  1183. with open(test_file, "w") as f:
  1184. f.write(f"content {i}")
  1185. self._run_cli("add", f"test{i}.txt")
  1186. self._run_cli("commit", f"--message=Commit {i}")
  1187. result, stdout, stderr = self._run_cli("repack")
  1188. # Should create pack files
  1189. pack_dir = os.path.join(self.repo_path, ".git", "objects", "pack")
  1190. self.assertTrue(any(f.endswith(".pack") for f in os.listdir(pack_dir)))
  1191. class ResetCommandTest(DulwichCliTestCase):
  1192. """Tests for reset command."""
  1193. def test_reset_soft(self):
  1194. # Create commits
  1195. test_file = os.path.join(self.repo_path, "test.txt")
  1196. with open(test_file, "w") as f:
  1197. f.write("first")
  1198. self._run_cli("add", "test.txt")
  1199. self._run_cli("commit", "--message=First")
  1200. first_commit = self.repo.head()
  1201. with open(test_file, "w") as f:
  1202. f.write("second")
  1203. self._run_cli("add", "test.txt")
  1204. self._run_cli("commit", "--message=Second")
  1205. # Reset soft
  1206. result, stdout, stderr = self._run_cli("reset", "--soft", first_commit.decode())
  1207. # HEAD should be at first commit
  1208. self.assertEqual(self.repo.head(), first_commit)
  1209. class WriteTreeCommandTest(DulwichCliTestCase):
  1210. """Tests for write-tree command."""
  1211. def test_write_tree(self):
  1212. # Create and add files
  1213. test_file = os.path.join(self.repo_path, "test.txt")
  1214. with open(test_file, "w") as f:
  1215. f.write("test")
  1216. self._run_cli("add", "test.txt")
  1217. result, stdout, stderr = self._run_cli("write-tree")
  1218. # Should output tree SHA
  1219. self.assertEqual(len(stdout.strip()), 40)
  1220. class UpdateServerInfoCommandTest(DulwichCliTestCase):
  1221. """Tests for update-server-info command."""
  1222. def test_update_server_info(self):
  1223. result, stdout, stderr = self._run_cli("update-server-info")
  1224. # Should create info/refs file
  1225. info_refs = os.path.join(self.repo_path, ".git", "info", "refs")
  1226. self.assertTrue(os.path.exists(info_refs))
  1227. class SymbolicRefCommandTest(DulwichCliTestCase):
  1228. """Tests for symbolic-ref command."""
  1229. def test_symbolic_ref(self):
  1230. # Create a branch
  1231. test_file = os.path.join(self.repo_path, "test.txt")
  1232. with open(test_file, "w") as f:
  1233. f.write("test")
  1234. self._run_cli("add", "test.txt")
  1235. self._run_cli("commit", "--message=Initial")
  1236. self._run_cli("branch", "test-branch")
  1237. result, stdout, stderr = self._run_cli(
  1238. "symbolic-ref", "HEAD", "refs/heads/test-branch"
  1239. )
  1240. # HEAD should now point to test-branch
  1241. self.assertEqual(
  1242. self.repo.refs.read_ref(b"HEAD"), b"ref: refs/heads/test-branch"
  1243. )
  1244. class BundleCommandTest(DulwichCliTestCase):
  1245. """Tests for bundle commands."""
  1246. def setUp(self):
  1247. super().setUp()
  1248. # Create a basic repository with some commits for bundle testing
  1249. # Create initial commit
  1250. test_file = os.path.join(self.repo_path, "file1.txt")
  1251. with open(test_file, "w") as f:
  1252. f.write("Content of file1\n")
  1253. self._run_cli("add", "file1.txt")
  1254. self._run_cli("commit", "--message=Initial commit")
  1255. # Create second commit
  1256. test_file2 = os.path.join(self.repo_path, "file2.txt")
  1257. with open(test_file2, "w") as f:
  1258. f.write("Content of file2\n")
  1259. self._run_cli("add", "file2.txt")
  1260. self._run_cli("commit", "--message=Add file2")
  1261. # Create a branch and tag for testing
  1262. self._run_cli("branch", "feature")
  1263. self._run_cli("tag", "v1.0")
  1264. def test_bundle_create_basic(self):
  1265. """Test basic bundle creation."""
  1266. bundle_file = os.path.join(self.test_dir, "test.bundle")
  1267. result, stdout, stderr = self._run_cli("bundle", "create", bundle_file, "HEAD")
  1268. self.assertEqual(result, 0)
  1269. self.assertTrue(os.path.exists(bundle_file))
  1270. self.assertGreater(os.path.getsize(bundle_file), 0)
  1271. def test_bundle_create_all_refs(self):
  1272. """Test bundle creation with --all flag."""
  1273. bundle_file = os.path.join(self.test_dir, "all.bundle")
  1274. result, stdout, stderr = self._run_cli("bundle", "create", "--all", bundle_file)
  1275. self.assertEqual(result, 0)
  1276. self.assertTrue(os.path.exists(bundle_file))
  1277. def test_bundle_create_specific_refs(self):
  1278. """Test bundle creation with specific refs."""
  1279. bundle_file = os.path.join(self.test_dir, "refs.bundle")
  1280. # Only use HEAD since feature branch may not exist
  1281. result, stdout, stderr = self._run_cli("bundle", "create", bundle_file, "HEAD")
  1282. self.assertEqual(result, 0)
  1283. self.assertTrue(os.path.exists(bundle_file))
  1284. def test_bundle_create_with_range(self):
  1285. """Test bundle creation with commit range."""
  1286. # Get the first commit SHA by looking at the log
  1287. result, stdout, stderr = self._run_cli("log", "--reverse")
  1288. lines = stdout.strip().split("\n")
  1289. # Find first commit line that contains a SHA
  1290. first_commit = None
  1291. for line in lines:
  1292. if line.startswith("commit "):
  1293. first_commit = line.split()[1][:8] # Get short SHA
  1294. break
  1295. if first_commit:
  1296. bundle_file = os.path.join(self.test_dir, "range.bundle")
  1297. result, stdout, stderr = self._run_cli(
  1298. "bundle", "create", bundle_file, f"{first_commit}..HEAD"
  1299. )
  1300. self.assertEqual(result, 0)
  1301. self.assertTrue(os.path.exists(bundle_file))
  1302. else:
  1303. self.skipTest("Could not determine first commit SHA")
  1304. def test_bundle_create_to_stdout(self):
  1305. """Test bundle creation to stdout."""
  1306. result, stdout, stderr = self._run_cli("bundle", "create", "-", "HEAD")
  1307. self.assertEqual(result, 0)
  1308. self.assertGreater(len(stdout), 0)
  1309. # Bundle output is binary, so check it's not empty
  1310. self.assertIsInstance(stdout, (str, bytes))
  1311. def test_bundle_create_no_refs(self):
  1312. """Test bundle creation with no refs specified."""
  1313. bundle_file = os.path.join(self.test_dir, "noref.bundle")
  1314. result, stdout, stderr = self._run_cli("bundle", "create", bundle_file)
  1315. self.assertEqual(result, 1)
  1316. self.assertIn("No refs specified", stdout)
  1317. def test_bundle_create_empty_bundle_refused(self):
  1318. """Test that empty bundles are refused."""
  1319. bundle_file = os.path.join(self.test_dir, "empty.bundle")
  1320. # Try to create bundle with non-existent ref - this should fail with KeyError
  1321. with self.assertRaises(KeyError):
  1322. result, stdout, stderr = self._run_cli(
  1323. "bundle", "create", bundle_file, "nonexistent-ref"
  1324. )
  1325. def test_bundle_verify_valid(self):
  1326. """Test bundle verification of valid bundle."""
  1327. bundle_file = os.path.join(self.test_dir, "valid.bundle")
  1328. # First create a bundle
  1329. result, stdout, stderr = self._run_cli("bundle", "create", bundle_file, "HEAD")
  1330. self.assertEqual(result, 0)
  1331. # Now verify it
  1332. result, stdout, stderr = self._run_cli("bundle", "verify", bundle_file)
  1333. self.assertEqual(result, 0)
  1334. self.assertIn("valid and can be applied", stdout)
  1335. def test_bundle_verify_quiet(self):
  1336. """Test bundle verification with quiet flag."""
  1337. bundle_file = os.path.join(self.test_dir, "quiet.bundle")
  1338. # Create bundle
  1339. self._run_cli("bundle", "create", bundle_file, "HEAD")
  1340. # Verify quietly
  1341. result, stdout, stderr = self._run_cli(
  1342. "bundle", "verify", "--quiet", bundle_file
  1343. )
  1344. self.assertEqual(result, 0)
  1345. self.assertEqual(stdout.strip(), "")
  1346. def test_bundle_verify_from_stdin(self):
  1347. """Test bundle verification from stdin."""
  1348. bundle_file = os.path.join(self.test_dir, "stdin.bundle")
  1349. # Create bundle
  1350. self._run_cli("bundle", "create", bundle_file, "HEAD")
  1351. # Read bundle content
  1352. with open(bundle_file, "rb") as f:
  1353. bundle_content = f.read()
  1354. # Mock stdin with bundle content
  1355. old_stdin = sys.stdin
  1356. try:
  1357. sys.stdin = io.BytesIO(bundle_content)
  1358. sys.stdin.buffer = sys.stdin
  1359. result, stdout, stderr = self._run_cli("bundle", "verify", "-")
  1360. self.assertEqual(result, 0)
  1361. finally:
  1362. sys.stdin = old_stdin
  1363. def test_bundle_list_heads(self):
  1364. """Test listing bundle heads."""
  1365. bundle_file = os.path.join(self.test_dir, "heads.bundle")
  1366. # Create bundle with HEAD only
  1367. self._run_cli("bundle", "create", bundle_file, "HEAD")
  1368. # List heads
  1369. result, stdout, stderr = self._run_cli("bundle", "list-heads", bundle_file)
  1370. self.assertEqual(result, 0)
  1371. # Should contain at least the HEAD reference
  1372. self.assertTrue(len(stdout.strip()) > 0)
  1373. def test_bundle_list_heads_specific_refs(self):
  1374. """Test listing specific bundle heads."""
  1375. bundle_file = os.path.join(self.test_dir, "specific.bundle")
  1376. # Create bundle with HEAD
  1377. self._run_cli("bundle", "create", bundle_file, "HEAD")
  1378. # List heads without filtering
  1379. result, stdout, stderr = self._run_cli("bundle", "list-heads", bundle_file)
  1380. self.assertEqual(result, 0)
  1381. # Should contain some reference
  1382. self.assertTrue(len(stdout.strip()) > 0)
  1383. def test_bundle_list_heads_from_stdin(self):
  1384. """Test listing bundle heads from stdin."""
  1385. bundle_file = os.path.join(self.test_dir, "stdin-heads.bundle")
  1386. # Create bundle
  1387. self._run_cli("bundle", "create", bundle_file, "HEAD")
  1388. # Read bundle content
  1389. with open(bundle_file, "rb") as f:
  1390. bundle_content = f.read()
  1391. # Mock stdin
  1392. old_stdin = sys.stdin
  1393. try:
  1394. sys.stdin = io.BytesIO(bundle_content)
  1395. sys.stdin.buffer = sys.stdin
  1396. result, stdout, stderr = self._run_cli("bundle", "list-heads", "-")
  1397. self.assertEqual(result, 0)
  1398. finally:
  1399. sys.stdin = old_stdin
  1400. def test_bundle_unbundle(self):
  1401. """Test bundle unbundling."""
  1402. bundle_file = os.path.join(self.test_dir, "unbundle.bundle")
  1403. # Create bundle
  1404. self._run_cli("bundle", "create", bundle_file, "HEAD")
  1405. # Unbundle
  1406. result, stdout, stderr = self._run_cli("bundle", "unbundle", bundle_file)
  1407. self.assertEqual(result, 0)
  1408. def test_bundle_unbundle_specific_refs(self):
  1409. """Test unbundling specific refs."""
  1410. bundle_file = os.path.join(self.test_dir, "unbundle-specific.bundle")
  1411. # Create bundle with HEAD
  1412. self._run_cli("bundle", "create", bundle_file, "HEAD")
  1413. # Unbundle only HEAD
  1414. result, stdout, stderr = self._run_cli(
  1415. "bundle", "unbundle", bundle_file, "HEAD"
  1416. )
  1417. self.assertEqual(result, 0)
  1418. def test_bundle_unbundle_from_stdin(self):
  1419. """Test unbundling from stdin."""
  1420. bundle_file = os.path.join(self.test_dir, "stdin-unbundle.bundle")
  1421. # Create bundle
  1422. self._run_cli("bundle", "create", bundle_file, "HEAD")
  1423. # Read bundle content to simulate stdin
  1424. with open(bundle_file, "rb") as f:
  1425. bundle_content = f.read()
  1426. # Mock stdin with bundle content
  1427. old_stdin = sys.stdin
  1428. try:
  1429. # Create a BytesIO object with buffer attribute
  1430. mock_stdin = io.BytesIO(bundle_content)
  1431. mock_stdin.buffer = mock_stdin
  1432. sys.stdin = mock_stdin
  1433. result, stdout, stderr = self._run_cli("bundle", "unbundle", "-")
  1434. self.assertEqual(result, 0)
  1435. finally:
  1436. sys.stdin = old_stdin
  1437. def test_bundle_unbundle_with_progress(self):
  1438. """Test unbundling with progress output."""
  1439. bundle_file = os.path.join(self.test_dir, "progress.bundle")
  1440. # Create bundle
  1441. self._run_cli("bundle", "create", bundle_file, "HEAD")
  1442. # Unbundle with progress
  1443. result, stdout, stderr = self._run_cli(
  1444. "bundle", "unbundle", "--progress", bundle_file
  1445. )
  1446. self.assertEqual(result, 0)
  1447. def test_bundle_create_with_progress(self):
  1448. """Test bundle creation with progress output."""
  1449. bundle_file = os.path.join(self.test_dir, "create-progress.bundle")
  1450. result, stdout, stderr = self._run_cli(
  1451. "bundle", "create", "--progress", bundle_file, "HEAD"
  1452. )
  1453. self.assertEqual(result, 0)
  1454. self.assertTrue(os.path.exists(bundle_file))
  1455. def test_bundle_create_with_quiet(self):
  1456. """Test bundle creation with quiet flag."""
  1457. bundle_file = os.path.join(self.test_dir, "quiet-create.bundle")
  1458. result, stdout, stderr = self._run_cli(
  1459. "bundle", "create", "--quiet", bundle_file, "HEAD"
  1460. )
  1461. self.assertEqual(result, 0)
  1462. self.assertTrue(os.path.exists(bundle_file))
  1463. def test_bundle_create_version_2(self):
  1464. """Test bundle creation with specific version."""
  1465. bundle_file = os.path.join(self.test_dir, "v2.bundle")
  1466. result, stdout, stderr = self._run_cli(
  1467. "bundle", "create", "--version", "2", bundle_file, "HEAD"
  1468. )
  1469. self.assertEqual(result, 0)
  1470. self.assertTrue(os.path.exists(bundle_file))
  1471. def test_bundle_create_version_3(self):
  1472. """Test bundle creation with version 3."""
  1473. bundle_file = os.path.join(self.test_dir, "v3.bundle")
  1474. result, stdout, stderr = self._run_cli(
  1475. "bundle", "create", "--version", "3", bundle_file, "HEAD"
  1476. )
  1477. self.assertEqual(result, 0)
  1478. self.assertTrue(os.path.exists(bundle_file))
  1479. def test_bundle_invalid_subcommand(self):
  1480. """Test invalid bundle subcommand."""
  1481. result, stdout, stderr = self._run_cli("bundle", "invalid-command")
  1482. self.assertEqual(result, 1)
  1483. self.assertIn("Unknown bundle subcommand", stdout)
  1484. def test_bundle_no_subcommand(self):
  1485. """Test bundle command with no subcommand."""
  1486. result, stdout, stderr = self._run_cli("bundle")
  1487. self.assertEqual(result, 1)
  1488. self.assertIn("Usage: bundle", stdout)
  1489. def test_bundle_create_with_stdin_refs(self):
  1490. """Test bundle creation reading refs from stdin."""
  1491. bundle_file = os.path.join(self.test_dir, "stdin-refs.bundle")
  1492. # Mock stdin with refs
  1493. old_stdin = sys.stdin
  1494. try:
  1495. sys.stdin = io.StringIO("master\nfeature\n")
  1496. result, stdout, stderr = self._run_cli(
  1497. "bundle", "create", "--stdin", bundle_file
  1498. )
  1499. self.assertEqual(result, 0)
  1500. self.assertTrue(os.path.exists(bundle_file))
  1501. finally:
  1502. sys.stdin = old_stdin
  1503. def test_bundle_verify_missing_prerequisites(self):
  1504. """Test bundle verification with missing prerequisites."""
  1505. # Create a simple bundle first
  1506. bundle_file = os.path.join(self.test_dir, "prereq.bundle")
  1507. self._run_cli("bundle", "create", bundle_file, "HEAD")
  1508. # Create a new repo to simulate missing objects
  1509. new_repo_path = os.path.join(self.test_dir, "new_repo")
  1510. os.mkdir(new_repo_path)
  1511. new_repo = Repo.init(new_repo_path)
  1512. new_repo.close()
  1513. # Try to verify in new repo
  1514. old_cwd = os.getcwd()
  1515. try:
  1516. os.chdir(new_repo_path)
  1517. result, stdout, stderr = self._run_cli("bundle", "verify", bundle_file)
  1518. # Just check that verification runs - result depends on bundle content
  1519. self.assertIn(result, [0, 1])
  1520. finally:
  1521. os.chdir(old_cwd)
  1522. def test_bundle_create_with_committish_range(self):
  1523. """Test bundle creation with commit range using parse_committish_range."""
  1524. # Create additional commits for range testing
  1525. test_file3 = os.path.join(self.repo_path, "file3.txt")
  1526. with open(test_file3, "w") as f:
  1527. f.write("Content of file3\n")
  1528. self._run_cli("add", "file3.txt")
  1529. self._run_cli("commit", "--message=Add file3")
  1530. # Get commit SHAs
  1531. result, stdout, stderr = self._run_cli("log")
  1532. lines = stdout.strip().split("\n")
  1533. # Extract SHAs from commit lines
  1534. commits = []
  1535. for line in lines:
  1536. if line.startswith("commit:"):
  1537. sha = line.split()[1]
  1538. commits.append(sha[:8]) # Get short SHA
  1539. # We should have exactly 3 commits: Add file3, Add file2, Initial commit
  1540. self.assertEqual(len(commits), 3)
  1541. bundle_file = os.path.join(self.test_dir, "range-test.bundle")
  1542. # Test with commit range using .. syntax
  1543. # Create a bundle containing commits reachable from commits[0] but not from commits[2]
  1544. result, stdout, stderr = self._run_cli(
  1545. "bundle", "create", bundle_file, f"{commits[2]}..HEAD"
  1546. )
  1547. if result != 0:
  1548. self.fail(
  1549. f"Bundle create failed with exit code {result}. stdout: {stdout!r}, stderr: {stderr!r}"
  1550. )
  1551. self.assertEqual(result, 0)
  1552. self.assertTrue(os.path.exists(bundle_file))
  1553. # Verify the bundle was created
  1554. result, stdout, stderr = self._run_cli("bundle", "verify", bundle_file)
  1555. self.assertEqual(result, 0)
  1556. self.assertIn("valid and can be applied", stdout)
  1557. class FormatBytesTestCase(TestCase):
  1558. """Tests for format_bytes function."""
  1559. def test_bytes(self):
  1560. """Test formatting bytes."""
  1561. self.assertEqual("0.0 B", format_bytes(0))
  1562. self.assertEqual("1.0 B", format_bytes(1))
  1563. self.assertEqual("512.0 B", format_bytes(512))
  1564. self.assertEqual("1023.0 B", format_bytes(1023))
  1565. def test_kilobytes(self):
  1566. """Test formatting kilobytes."""
  1567. self.assertEqual("1.0 KB", format_bytes(1024))
  1568. self.assertEqual("1.5 KB", format_bytes(1536))
  1569. self.assertEqual("2.0 KB", format_bytes(2048))
  1570. self.assertEqual("1023.0 KB", format_bytes(1024 * 1023))
  1571. def test_megabytes(self):
  1572. """Test formatting megabytes."""
  1573. self.assertEqual("1.0 MB", format_bytes(1024 * 1024))
  1574. self.assertEqual("1.5 MB", format_bytes(1024 * 1024 * 1.5))
  1575. self.assertEqual("10.0 MB", format_bytes(1024 * 1024 * 10))
  1576. self.assertEqual("1023.0 MB", format_bytes(1024 * 1024 * 1023))
  1577. def test_gigabytes(self):
  1578. """Test formatting gigabytes."""
  1579. self.assertEqual("1.0 GB", format_bytes(1024 * 1024 * 1024))
  1580. self.assertEqual("2.5 GB", format_bytes(1024 * 1024 * 1024 * 2.5))
  1581. self.assertEqual("1023.0 GB", format_bytes(1024 * 1024 * 1024 * 1023))
  1582. def test_terabytes(self):
  1583. """Test formatting terabytes."""
  1584. self.assertEqual("1.0 TB", format_bytes(1024 * 1024 * 1024 * 1024))
  1585. self.assertEqual("5.0 TB", format_bytes(1024 * 1024 * 1024 * 1024 * 5))
  1586. self.assertEqual("1000.0 TB", format_bytes(1024 * 1024 * 1024 * 1024 * 1000))
  1587. class ParseRelativeTimeTestCase(TestCase):
  1588. """Tests for parse_relative_time function."""
  1589. def test_now(self):
  1590. """Test parsing 'now'."""
  1591. self.assertEqual(0, parse_relative_time("now"))
  1592. def test_seconds(self):
  1593. """Test parsing seconds."""
  1594. self.assertEqual(1, parse_relative_time("1 second ago"))
  1595. self.assertEqual(5, parse_relative_time("5 seconds ago"))
  1596. self.assertEqual(30, parse_relative_time("30 seconds ago"))
  1597. def test_minutes(self):
  1598. """Test parsing minutes."""
  1599. self.assertEqual(60, parse_relative_time("1 minute ago"))
  1600. self.assertEqual(300, parse_relative_time("5 minutes ago"))
  1601. self.assertEqual(1800, parse_relative_time("30 minutes ago"))
  1602. def test_hours(self):
  1603. """Test parsing hours."""
  1604. self.assertEqual(3600, parse_relative_time("1 hour ago"))
  1605. self.assertEqual(7200, parse_relative_time("2 hours ago"))
  1606. self.assertEqual(86400, parse_relative_time("24 hours ago"))
  1607. def test_days(self):
  1608. """Test parsing days."""
  1609. self.assertEqual(86400, parse_relative_time("1 day ago"))
  1610. self.assertEqual(604800, parse_relative_time("7 days ago"))
  1611. self.assertEqual(2592000, parse_relative_time("30 days ago"))
  1612. def test_weeks(self):
  1613. """Test parsing weeks."""
  1614. self.assertEqual(604800, parse_relative_time("1 week ago"))
  1615. self.assertEqual(1209600, parse_relative_time("2 weeks ago"))
  1616. self.assertEqual(
  1617. 36288000, parse_relative_time("60 weeks ago")
  1618. ) # 60 * 7 * 24 * 60 * 60
  1619. def test_invalid_format(self):
  1620. """Test invalid time formats."""
  1621. with self.assertRaises(ValueError) as cm:
  1622. parse_relative_time("invalid")
  1623. self.assertIn("Invalid relative time format", str(cm.exception))
  1624. with self.assertRaises(ValueError) as cm:
  1625. parse_relative_time("2 weeks")
  1626. self.assertIn("Invalid relative time format", str(cm.exception))
  1627. with self.assertRaises(ValueError) as cm:
  1628. parse_relative_time("ago")
  1629. self.assertIn("Invalid relative time format", str(cm.exception))
  1630. with self.assertRaises(ValueError) as cm:
  1631. parse_relative_time("two weeks ago")
  1632. self.assertIn("Invalid number in relative time", str(cm.exception))
  1633. def test_invalid_unit(self):
  1634. """Test invalid time units."""
  1635. with self.assertRaises(ValueError) as cm:
  1636. parse_relative_time("5 months ago")
  1637. self.assertIn("Unknown time unit: months", str(cm.exception))
  1638. with self.assertRaises(ValueError) as cm:
  1639. parse_relative_time("2 years ago")
  1640. self.assertIn("Unknown time unit: years", str(cm.exception))
  1641. def test_singular_plural(self):
  1642. """Test that both singular and plural forms work."""
  1643. self.assertEqual(
  1644. parse_relative_time("1 second ago"), parse_relative_time("1 seconds ago")
  1645. )
  1646. self.assertEqual(
  1647. parse_relative_time("1 minute ago"), parse_relative_time("1 minutes ago")
  1648. )
  1649. self.assertEqual(
  1650. parse_relative_time("1 hour ago"), parse_relative_time("1 hours ago")
  1651. )
  1652. self.assertEqual(
  1653. parse_relative_time("1 day ago"), parse_relative_time("1 days ago")
  1654. )
  1655. self.assertEqual(
  1656. parse_relative_time("1 week ago"), parse_relative_time("1 weeks ago")
  1657. )
  1658. class GetPagerTest(TestCase):
  1659. """Tests for get_pager function."""
  1660. def setUp(self):
  1661. super().setUp()
  1662. # Save original environment
  1663. self.original_env = os.environ.copy()
  1664. # Clear pager-related environment variables
  1665. for var in ["DULWICH_PAGER", "GIT_PAGER", "PAGER"]:
  1666. os.environ.pop(var, None)
  1667. # Reset the global pager disable flag
  1668. cli.get_pager._disabled = False
  1669. def tearDown(self):
  1670. super().tearDown()
  1671. # Restore original environment
  1672. os.environ.clear()
  1673. os.environ.update(self.original_env)
  1674. # Reset the global pager disable flag
  1675. cli.get_pager._disabled = False
  1676. def test_pager_disabled_globally(self):
  1677. """Test that globally disabled pager returns stdout wrapper."""
  1678. cli.disable_pager()
  1679. pager = cli.get_pager()
  1680. self.assertIsInstance(pager, cli._StreamContextAdapter)
  1681. self.assertEqual(pager.stream, sys.stdout)
  1682. def test_pager_not_tty(self):
  1683. """Test that pager is disabled when stdout is not a TTY."""
  1684. with patch("sys.stdout.isatty", return_value=False):
  1685. pager = cli.get_pager()
  1686. self.assertIsInstance(pager, cli._StreamContextAdapter)
  1687. def test_pager_env_dulwich_pager(self):
  1688. """Test DULWICH_PAGER environment variable."""
  1689. os.environ["DULWICH_PAGER"] = "custom_pager"
  1690. with patch("sys.stdout.isatty", return_value=True):
  1691. pager = cli.get_pager()
  1692. self.assertIsInstance(pager, cli.Pager)
  1693. self.assertEqual(pager.pager_cmd, "custom_pager")
  1694. def test_pager_env_dulwich_pager_false(self):
  1695. """Test DULWICH_PAGER=false disables pager."""
  1696. os.environ["DULWICH_PAGER"] = "false"
  1697. with patch("sys.stdout.isatty", return_value=True):
  1698. pager = cli.get_pager()
  1699. self.assertIsInstance(pager, cli._StreamContextAdapter)
  1700. def test_pager_env_git_pager(self):
  1701. """Test GIT_PAGER environment variable."""
  1702. os.environ["GIT_PAGER"] = "git_custom_pager"
  1703. with patch("sys.stdout.isatty", return_value=True):
  1704. pager = cli.get_pager()
  1705. self.assertIsInstance(pager, cli.Pager)
  1706. self.assertEqual(pager.pager_cmd, "git_custom_pager")
  1707. def test_pager_env_pager(self):
  1708. """Test PAGER environment variable."""
  1709. os.environ["PAGER"] = "my_pager"
  1710. with patch("sys.stdout.isatty", return_value=True):
  1711. pager = cli.get_pager()
  1712. self.assertIsInstance(pager, cli.Pager)
  1713. self.assertEqual(pager.pager_cmd, "my_pager")
  1714. def test_pager_env_priority(self):
  1715. """Test environment variable priority order."""
  1716. os.environ["PAGER"] = "pager_low"
  1717. os.environ["GIT_PAGER"] = "pager_medium"
  1718. os.environ["DULWICH_PAGER"] = "pager_high"
  1719. with patch("sys.stdout.isatty", return_value=True):
  1720. pager = cli.get_pager()
  1721. self.assertEqual(pager.pager_cmd, "pager_high")
  1722. def test_pager_config_core_pager(self):
  1723. """Test core.pager configuration."""
  1724. config = MagicMock()
  1725. config.get.return_value = b"config_pager"
  1726. with patch("sys.stdout.isatty", return_value=True):
  1727. pager = cli.get_pager(config=config)
  1728. self.assertIsInstance(pager, cli.Pager)
  1729. self.assertEqual(pager.pager_cmd, "config_pager")
  1730. config.get.assert_called_with(("core",), b"pager")
  1731. def test_pager_config_core_pager_false(self):
  1732. """Test core.pager=false disables pager."""
  1733. config = MagicMock()
  1734. config.get.return_value = b"false"
  1735. with patch("sys.stdout.isatty", return_value=True):
  1736. pager = cli.get_pager(config=config)
  1737. self.assertIsInstance(pager, cli._StreamContextAdapter)
  1738. def test_pager_config_core_pager_empty(self):
  1739. """Test core.pager="" disables pager."""
  1740. config = MagicMock()
  1741. config.get.return_value = b""
  1742. with patch("sys.stdout.isatty", return_value=True):
  1743. pager = cli.get_pager(config=config)
  1744. self.assertIsInstance(pager, cli._StreamContextAdapter)
  1745. def test_pager_config_per_command(self):
  1746. """Test per-command pager configuration."""
  1747. config = MagicMock()
  1748. config.get.side_effect = lambda section, key: {
  1749. (("pager",), b"log"): b"log_pager",
  1750. }.get((section, key), KeyError())
  1751. with patch("sys.stdout.isatty", return_value=True):
  1752. pager = cli.get_pager(config=config, cmd_name="log")
  1753. self.assertIsInstance(pager, cli.Pager)
  1754. self.assertEqual(pager.pager_cmd, "log_pager")
  1755. def test_pager_config_per_command_false(self):
  1756. """Test per-command pager=false disables pager."""
  1757. config = MagicMock()
  1758. config.get.return_value = b"false"
  1759. with patch("sys.stdout.isatty", return_value=True):
  1760. pager = cli.get_pager(config=config, cmd_name="log")
  1761. self.assertIsInstance(pager, cli._StreamContextAdapter)
  1762. def test_pager_config_per_command_true(self):
  1763. """Test per-command pager=true uses default pager."""
  1764. config = MagicMock()
  1765. def get_side_effect(section, key):
  1766. if section == ("pager",) and key == b"log":
  1767. return b"true"
  1768. raise KeyError
  1769. config.get.side_effect = get_side_effect
  1770. with patch("sys.stdout.isatty", return_value=True):
  1771. with patch("shutil.which", side_effect=lambda cmd: cmd == "less"):
  1772. pager = cli.get_pager(config=config, cmd_name="log")
  1773. self.assertIsInstance(pager, cli.Pager)
  1774. self.assertEqual(pager.pager_cmd, "less -FRX")
  1775. def test_pager_priority_order(self):
  1776. """Test complete priority order."""
  1777. # Set up all possible configurations
  1778. os.environ["PAGER"] = "env_pager"
  1779. os.environ["GIT_PAGER"] = "env_git_pager"
  1780. config = MagicMock()
  1781. def get_side_effect(section, key):
  1782. if section == ("pager",) and key == b"log":
  1783. return b"cmd_pager"
  1784. elif section == ("core",) and key == b"pager":
  1785. return b"core_pager"
  1786. raise KeyError
  1787. config.get.side_effect = get_side_effect
  1788. with patch("sys.stdout.isatty", return_value=True):
  1789. # Per-command config should win
  1790. pager = cli.get_pager(config=config, cmd_name="log")
  1791. self.assertEqual(pager.pager_cmd, "cmd_pager")
  1792. def test_pager_fallback_less(self):
  1793. """Test fallback to less with proper flags."""
  1794. with patch("sys.stdout.isatty", return_value=True):
  1795. with patch("shutil.which", side_effect=lambda cmd: cmd == "less"):
  1796. pager = cli.get_pager()
  1797. self.assertIsInstance(pager, cli.Pager)
  1798. self.assertEqual(pager.pager_cmd, "less -FRX")
  1799. def test_pager_fallback_more(self):
  1800. """Test fallback to more when less is not available."""
  1801. with patch("sys.stdout.isatty", return_value=True):
  1802. with patch("shutil.which", side_effect=lambda cmd: cmd == "more"):
  1803. pager = cli.get_pager()
  1804. self.assertIsInstance(pager, cli.Pager)
  1805. self.assertEqual(pager.pager_cmd, "more")
  1806. def test_pager_fallback_cat(self):
  1807. """Test ultimate fallback to cat."""
  1808. with patch("sys.stdout.isatty", return_value=True):
  1809. with patch("shutil.which", return_value=None):
  1810. pager = cli.get_pager()
  1811. self.assertIsInstance(pager, cli.Pager)
  1812. self.assertEqual(pager.pager_cmd, "cat")
  1813. def test_pager_context_manager(self):
  1814. """Test that pager works as a context manager."""
  1815. with patch("sys.stdout.isatty", return_value=True):
  1816. with cli.get_pager() as pager:
  1817. self.assertTrue(hasattr(pager, "write"))
  1818. self.assertTrue(hasattr(pager, "flush"))
  1819. if __name__ == "__main__":
  1820. unittest.main()