test_worktree.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959
  1. # test_worktree.py -- Tests for dulwich.worktree
  2. # Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for dulwich.worktree."""
  22. import os
  23. import shutil
  24. import stat
  25. import tempfile
  26. from unittest import skipIf
  27. from dulwich.errors import CommitError
  28. from dulwich.index import get_unstaged_changes as _get_unstaged_changes
  29. from dulwich.object_store import tree_lookup_path
  30. from dulwich.repo import Repo
  31. from dulwich.worktree import (
  32. WorkTree,
  33. add_worktree,
  34. list_worktrees,
  35. lock_worktree,
  36. move_worktree,
  37. prune_worktrees,
  38. remove_worktree,
  39. repair_worktree,
  40. temporary_worktree,
  41. unlock_worktree,
  42. )
  43. from . import TestCase
  44. def get_unstaged_changes(repo):
  45. """Helper to get unstaged changes for a repo."""
  46. index = repo.open_index()
  47. normalizer = repo.get_blob_normalizer()
  48. filter_callback = normalizer.checkin_normalize if normalizer else None
  49. return list(_get_unstaged_changes(index, repo.path, filter_callback, False))
  50. class WorkTreeTestCase(TestCase):
  51. """Base test case for WorkTree tests."""
  52. def setUp(self):
  53. super().setUp()
  54. self.tempdir = tempfile.mkdtemp()
  55. self.test_dir = os.path.join(self.tempdir, "main")
  56. self.repo = Repo.init(self.test_dir, mkdir=True)
  57. # Create initial commit with a file
  58. with open(os.path.join(self.test_dir, "a"), "wb") as f:
  59. f.write(b"contents of file a")
  60. self.repo.get_worktree().stage(["a"])
  61. self.root_commit = self.repo.get_worktree().commit(
  62. message=b"Initial commit",
  63. committer=b"Test Committer <test@nodomain.com>",
  64. author=b"Test Author <test@nodomain.com>",
  65. commit_timestamp=12345,
  66. commit_timezone=0,
  67. author_timestamp=12345,
  68. author_timezone=0,
  69. )
  70. self.worktree = self.repo.get_worktree()
  71. def tearDown(self):
  72. self.repo.close()
  73. super().tearDown()
  74. def write_file(self, filename, content):
  75. """Helper to write a file in the repo."""
  76. with open(os.path.join(self.test_dir, filename), "wb") as f:
  77. f.write(content)
  78. class WorkTreeInitTests(TestCase):
  79. """Tests for WorkTree initialization."""
  80. def test_init_with_repo_path(self):
  81. """Test WorkTree initialization with same path as repo."""
  82. with tempfile.TemporaryDirectory() as tmpdir:
  83. repo = Repo.init(tmpdir)
  84. worktree = WorkTree(repo, tmpdir)
  85. self.assertEqual(worktree.path, tmpdir)
  86. self.assertEqual(worktree._repo, repo)
  87. self.assertTrue(os.path.isabs(worktree.path))
  88. def test_init_with_different_path(self):
  89. """Test WorkTree initialization with different path from repo."""
  90. with tempfile.TemporaryDirectory() as tmpdir:
  91. repo_path = os.path.join(tmpdir, "repo")
  92. worktree_path = os.path.join(tmpdir, "worktree")
  93. os.makedirs(repo_path)
  94. os.makedirs(worktree_path)
  95. repo = Repo.init(repo_path)
  96. worktree = WorkTree(repo, worktree_path)
  97. self.assertNotEqual(worktree.path, repo.path)
  98. self.assertEqual(worktree.path, worktree_path)
  99. self.assertEqual(worktree._repo, repo)
  100. self.assertTrue(os.path.isabs(worktree.path))
  101. def test_init_with_bytes_path(self):
  102. """Test WorkTree initialization with bytes path."""
  103. with tempfile.TemporaryDirectory() as tmpdir:
  104. repo = Repo.init(tmpdir)
  105. worktree = WorkTree(repo, tmpdir.encode("utf-8"))
  106. self.assertEqual(worktree.path, tmpdir)
  107. self.assertIsInstance(worktree.path, str)
  108. class WorkTreeStagingTests(WorkTreeTestCase):
  109. """Tests for WorkTree staging operations."""
  110. def test_stage_absolute(self):
  111. """Test that staging with absolute paths raises ValueError."""
  112. r = self.repo
  113. os.remove(os.path.join(r.path, "a"))
  114. self.assertRaises(ValueError, self.worktree.stage, [os.path.join(r.path, "a")])
  115. def test_stage_deleted(self):
  116. """Test staging a deleted file."""
  117. r = self.repo
  118. os.remove(os.path.join(r.path, "a"))
  119. self.worktree.stage(["a"])
  120. self.worktree.stage(["a"]) # double-stage a deleted path
  121. self.assertEqual([], list(r.open_index()))
  122. def test_stage_directory(self):
  123. """Test staging a directory."""
  124. r = self.repo
  125. os.mkdir(os.path.join(r.path, "c"))
  126. self.worktree.stage(["c"])
  127. self.assertEqual([b"a"], list(r.open_index()))
  128. def test_stage_submodule(self):
  129. """Test staging a submodule."""
  130. r = self.repo
  131. s = Repo.init(os.path.join(r.path, "sub"), mkdir=True)
  132. s.get_worktree().commit(
  133. message=b"message",
  134. )
  135. self.worktree.stage(["sub"])
  136. self.assertEqual([b"a", b"sub"], list(r.open_index()))
  137. class WorkTreeUnstagingTests(WorkTreeTestCase):
  138. """Tests for WorkTree unstaging operations."""
  139. def test_unstage_modify_file_with_dir(self):
  140. """Test unstaging a modified file in a directory."""
  141. os.mkdir(os.path.join(self.repo.path, "new_dir"))
  142. full_path = os.path.join(self.repo.path, "new_dir", "foo")
  143. with open(full_path, "w") as f:
  144. f.write("hello")
  145. self.worktree.stage(["new_dir/foo"])
  146. self.worktree.commit(
  147. message=b"unittest",
  148. committer=b"Jane <jane@example.com>",
  149. author=b"John <john@example.com>",
  150. )
  151. with open(full_path, "a") as f:
  152. f.write("something new")
  153. self.worktree.unstage(["new_dir/foo"])
  154. unstaged = get_unstaged_changes(self.repo)
  155. self.assertEqual([b"new_dir/foo"], unstaged)
  156. def test_unstage_while_no_commit(self):
  157. """Test unstaging when there are no commits."""
  158. file = "foo"
  159. full_path = os.path.join(self.repo.path, file)
  160. with open(full_path, "w") as f:
  161. f.write("hello")
  162. self.worktree.stage([file])
  163. self.worktree.unstage([file])
  164. # Check that file is no longer in index
  165. index = self.repo.open_index()
  166. self.assertNotIn(b"foo", index)
  167. def test_unstage_add_file(self):
  168. """Test unstaging a newly added file."""
  169. file = "foo"
  170. full_path = os.path.join(self.repo.path, file)
  171. self.worktree.commit(
  172. message=b"unittest",
  173. committer=b"Jane <jane@example.com>",
  174. author=b"John <john@example.com>",
  175. )
  176. with open(full_path, "w") as f:
  177. f.write("hello")
  178. self.worktree.stage([file])
  179. self.worktree.unstage([file])
  180. # Check that file is no longer in index
  181. index = self.repo.open_index()
  182. self.assertNotIn(b"foo", index)
  183. def test_unstage_modify_file(self):
  184. """Test unstaging a modified file."""
  185. file = "foo"
  186. full_path = os.path.join(self.repo.path, file)
  187. with open(full_path, "w") as f:
  188. f.write("hello")
  189. self.worktree.stage([file])
  190. self.worktree.commit(
  191. message=b"unittest",
  192. committer=b"Jane <jane@example.com>",
  193. author=b"John <john@example.com>",
  194. )
  195. with open(full_path, "a") as f:
  196. f.write("broken")
  197. self.worktree.stage([file])
  198. self.worktree.unstage([file])
  199. unstaged = get_unstaged_changes(self.repo)
  200. self.assertEqual([os.fsencode("foo")], unstaged)
  201. def test_unstage_remove_file(self):
  202. """Test unstaging a removed file."""
  203. file = "foo"
  204. full_path = os.path.join(self.repo.path, file)
  205. with open(full_path, "w") as f:
  206. f.write("hello")
  207. self.worktree.stage([file])
  208. self.worktree.commit(
  209. message=b"unittest",
  210. committer=b"Jane <jane@example.com>",
  211. author=b"John <john@example.com>",
  212. )
  213. os.remove(full_path)
  214. self.worktree.unstage([file])
  215. unstaged = get_unstaged_changes(self.repo)
  216. self.assertEqual([os.fsencode("foo")], unstaged)
  217. class WorkTreeCommitTests(WorkTreeTestCase):
  218. """Tests for WorkTree commit operations."""
  219. def test_commit_modified(self):
  220. """Test committing a modified file."""
  221. r = self.repo
  222. with open(os.path.join(r.path, "a"), "wb") as f:
  223. f.write(b"new contents")
  224. self.worktree.stage(["a"])
  225. commit_sha = self.worktree.commit(
  226. b"modified a",
  227. committer=b"Test Committer <test@nodomain.com>",
  228. author=b"Test Author <test@nodomain.com>",
  229. commit_timestamp=12395,
  230. commit_timezone=0,
  231. author_timestamp=12395,
  232. author_timezone=0,
  233. )
  234. self.assertEqual([self.root_commit], r[commit_sha].parents)
  235. a_mode, a_id = tree_lookup_path(r.get_object, r[commit_sha].tree, b"a")
  236. self.assertEqual(stat.S_IFREG | 0o644, a_mode)
  237. self.assertEqual(b"new contents", r[a_id].data)
  238. @skipIf(not getattr(os, "symlink", None), "Requires symlink support")
  239. def test_commit_symlink(self):
  240. """Test committing a symlink."""
  241. r = self.repo
  242. os.symlink("a", os.path.join(r.path, "b"))
  243. self.worktree.stage(["a", "b"])
  244. commit_sha = self.worktree.commit(
  245. b"Symlink b",
  246. committer=b"Test Committer <test@nodomain.com>",
  247. author=b"Test Author <test@nodomain.com>",
  248. commit_timestamp=12395,
  249. commit_timezone=0,
  250. author_timestamp=12395,
  251. author_timezone=0,
  252. )
  253. self.assertEqual([self.root_commit], r[commit_sha].parents)
  254. b_mode, b_id = tree_lookup_path(r.get_object, r[commit_sha].tree, b"b")
  255. self.assertEqual(stat.S_IFLNK, b_mode)
  256. self.assertEqual(b"a", r[b_id].data)
  257. class WorkTreeResetTests(WorkTreeTestCase):
  258. """Tests for WorkTree reset operations."""
  259. def test_reset_index(self):
  260. """Test resetting the index."""
  261. # Make some changes and stage them
  262. with open(os.path.join(self.repo.path, "a"), "wb") as f:
  263. f.write(b"modified contents")
  264. self.worktree.stage(["a"])
  265. # Reset index should restore to HEAD
  266. self.worktree.reset_index()
  267. # Check that the working tree file was restored
  268. with open(os.path.join(self.repo.path, "a"), "rb") as f:
  269. contents = f.read()
  270. self.assertEqual(b"contents of file a", contents)
  271. class WorkTreeSparseCheckoutTests(WorkTreeTestCase):
  272. """Tests for WorkTree sparse checkout operations."""
  273. def test_get_sparse_checkout_patterns_empty(self):
  274. """Test getting sparse checkout patterns when file doesn't exist."""
  275. patterns = self.worktree.get_sparse_checkout_patterns()
  276. self.assertEqual([], patterns)
  277. def test_set_sparse_checkout_patterns(self):
  278. """Test setting sparse checkout patterns."""
  279. patterns = ["*.py", "docs/"]
  280. self.worktree.set_sparse_checkout_patterns(patterns)
  281. # Read back the patterns
  282. retrieved_patterns = self.worktree.get_sparse_checkout_patterns()
  283. self.assertEqual(patterns, retrieved_patterns)
  284. def test_configure_for_cone_mode(self):
  285. """Test configuring repository for cone mode."""
  286. self.worktree.configure_for_cone_mode()
  287. config = self.repo.get_config()
  288. self.assertEqual(b"true", config.get((b"core",), b"sparseCheckout"))
  289. self.assertEqual(b"true", config.get((b"core",), b"sparseCheckoutCone"))
  290. def test_infer_cone_mode_false(self):
  291. """Test inferring cone mode when not configured."""
  292. self.assertFalse(self.worktree.infer_cone_mode())
  293. def test_infer_cone_mode_true(self):
  294. """Test inferring cone mode when configured."""
  295. self.worktree.configure_for_cone_mode()
  296. self.assertTrue(self.worktree.infer_cone_mode())
  297. def test_set_cone_mode_patterns(self):
  298. """Test setting cone mode patterns."""
  299. dirs = ["src", "tests"]
  300. self.worktree.set_cone_mode_patterns(dirs)
  301. patterns = self.worktree.get_sparse_checkout_patterns()
  302. expected = ["/*", "!/*/", "/src/", "/tests/"]
  303. self.assertEqual(expected, patterns)
  304. def test_set_cone_mode_patterns_empty(self):
  305. """Test setting cone mode patterns with empty list."""
  306. self.worktree.set_cone_mode_patterns([])
  307. patterns = self.worktree.get_sparse_checkout_patterns()
  308. expected = ["/*", "!/*/"]
  309. self.assertEqual(expected, patterns)
  310. def test_set_cone_mode_patterns_duplicates(self):
  311. """Test that duplicate patterns are not added."""
  312. dirs = ["src", "src"] # duplicate
  313. self.worktree.set_cone_mode_patterns(dirs)
  314. patterns = self.worktree.get_sparse_checkout_patterns()
  315. expected = ["/*", "!/*/", "/src/"]
  316. self.assertEqual(expected, patterns)
  317. def test_sparse_checkout_file_path(self):
  318. """Test getting the sparse checkout file path."""
  319. expected_path = os.path.join(self.repo.controldir(), "info", "sparse-checkout")
  320. actual_path = self.worktree._sparse_checkout_file_path()
  321. self.assertEqual(expected_path, actual_path)
  322. class WorkTreeBackwardCompatibilityTests(WorkTreeTestCase):
  323. """Tests for backward compatibility of deprecated Repo methods."""
  324. def test_pre_commit_hook_fail(self):
  325. """Test that failing pre-commit hook raises CommitError."""
  326. if os.name != "posix":
  327. self.skipTest("shell hook tests requires POSIX shell")
  328. # Create a failing pre-commit hook
  329. hooks_dir = os.path.join(self.repo.controldir(), "hooks")
  330. os.makedirs(hooks_dir, exist_ok=True)
  331. hook_path = os.path.join(hooks_dir, "pre-commit")
  332. with open(hook_path, "w") as f:
  333. f.write("#!/bin/sh\nexit 1\n")
  334. os.chmod(hook_path, 0o755)
  335. # Try to commit
  336. worktree = self.repo.get_worktree()
  337. with self.assertRaises(CommitError):
  338. worktree.commit(b"No message")
  339. def write_file(self, filename, content):
  340. """Helper to write a file in the repo."""
  341. with open(os.path.join(self.test_dir, filename), "wb") as f:
  342. f.write(content)
  343. class WorkTreeOperationsTests(WorkTreeTestCase):
  344. """Tests for worktree operations like add, list, remove."""
  345. def test_list_worktrees_single(self) -> None:
  346. """Test listing worktrees when only main worktree exists."""
  347. worktrees = list_worktrees(self.repo)
  348. self.assertEqual(len(worktrees), 1)
  349. self.assertEqual(worktrees[0].path, self.repo.path)
  350. self.assertEqual(worktrees[0].bare, False)
  351. self.assertIsNotNone(worktrees[0].head)
  352. self.assertIsNotNone(worktrees[0].branch)
  353. def test_add_worktree_new_branch(self) -> None:
  354. """Test adding a worktree with a new branch."""
  355. # Create a commit first
  356. worktree = self.repo.get_worktree()
  357. self.write_file("test.txt", b"test content")
  358. worktree.stage(["test.txt"])
  359. commit_id = worktree.commit(message=b"Initial commit")
  360. # Add a new worktree
  361. wt_path = os.path.join(self.tempdir, "new-worktree")
  362. add_worktree(self.repo, wt_path, branch=b"feature-branch")
  363. # Verify worktree was created
  364. self.assertTrue(os.path.exists(wt_path))
  365. self.assertTrue(os.path.exists(os.path.join(wt_path, ".git")))
  366. # Verify it appears in the list
  367. worktrees = list_worktrees(self.repo)
  368. self.assertEqual(len(worktrees), 2)
  369. # Find the new worktree in the list
  370. new_wt = None
  371. for wt in worktrees:
  372. if wt.path == wt_path:
  373. new_wt = wt
  374. break
  375. self.assertIsNotNone(new_wt)
  376. self.assertEqual(new_wt.branch, b"refs/heads/feature-branch")
  377. self.assertEqual(new_wt.head, commit_id)
  378. self.assertFalse(new_wt.detached)
  379. def test_add_worktree_detached(self) -> None:
  380. """Test adding a worktree with detached HEAD."""
  381. # Create a commit
  382. worktree = self.repo.get_worktree()
  383. self.write_file("test.txt", b"test content")
  384. worktree.stage(["test.txt"])
  385. commit_id = worktree.commit(message=b"Initial commit")
  386. # Add a detached worktree
  387. wt_path = os.path.join(self.tempdir, "detached-worktree")
  388. add_worktree(self.repo, wt_path, commit=commit_id, detach=True)
  389. # Verify it's detached
  390. worktrees = list_worktrees(self.repo)
  391. self.assertEqual(len(worktrees), 2)
  392. for wt in worktrees:
  393. if wt.path == wt_path:
  394. self.assertTrue(wt.detached)
  395. self.assertIsNone(wt.branch)
  396. self.assertEqual(wt.head, commit_id)
  397. def test_add_worktree_existing_path(self) -> None:
  398. """Test that adding a worktree to existing path fails."""
  399. wt_path = os.path.join(self.tempdir, "existing")
  400. os.mkdir(wt_path)
  401. with self.assertRaises(ValueError) as cm:
  402. add_worktree(self.repo, wt_path)
  403. self.assertIn("Path already exists", str(cm.exception))
  404. def test_add_worktree_branch_already_checked_out(self) -> None:
  405. """Test that checking out same branch in multiple worktrees fails."""
  406. # Create initial commit
  407. worktree = self.repo.get_worktree()
  408. self.write_file("test.txt", b"test content")
  409. worktree.stage(["test.txt"])
  410. worktree.commit(message=b"Initial commit")
  411. # First worktree should succeed with a new branch
  412. wt_path1 = os.path.join(self.tempdir, "wt1")
  413. add_worktree(self.repo, wt_path1, branch=b"feature")
  414. # Second worktree with same branch should fail
  415. wt_path2 = os.path.join(self.tempdir, "wt2")
  416. with self.assertRaises(ValueError) as cm:
  417. add_worktree(self.repo, wt_path2, branch=b"feature")
  418. self.assertIn("already checked out", str(cm.exception))
  419. # But should work with force=True
  420. add_worktree(self.repo, wt_path2, branch=b"feature", force=True)
  421. def test_remove_worktree(self) -> None:
  422. """Test removing a worktree."""
  423. # Create a worktree
  424. wt_path = os.path.join(self.tempdir, "to-remove")
  425. add_worktree(self.repo, wt_path)
  426. # Verify it exists
  427. self.assertTrue(os.path.exists(wt_path))
  428. self.assertEqual(len(list_worktrees(self.repo)), 2)
  429. # Remove it
  430. remove_worktree(self.repo, wt_path)
  431. # Verify it's gone
  432. self.assertFalse(os.path.exists(wt_path))
  433. self.assertEqual(len(list_worktrees(self.repo)), 1)
  434. def test_remove_main_worktree_fails(self) -> None:
  435. """Test that removing the main worktree fails."""
  436. with self.assertRaises(ValueError) as cm:
  437. remove_worktree(self.repo, self.repo.path)
  438. self.assertIn("Cannot remove the main working tree", str(cm.exception))
  439. def test_remove_nonexistent_worktree(self) -> None:
  440. """Test that removing non-existent worktree fails."""
  441. with self.assertRaises(ValueError) as cm:
  442. remove_worktree(self.repo, "/nonexistent/path")
  443. self.assertIn("Worktree not found", str(cm.exception))
  444. def test_lock_unlock_worktree(self) -> None:
  445. """Test locking and unlocking a worktree."""
  446. # Create a worktree
  447. wt_path = os.path.join(self.tempdir, "lockable")
  448. add_worktree(self.repo, wt_path)
  449. # Lock it
  450. lock_worktree(self.repo, wt_path, reason="Testing lock")
  451. # Verify it's locked
  452. worktrees = list_worktrees(self.repo)
  453. for wt in worktrees:
  454. if wt.path == wt_path:
  455. self.assertTrue(wt.locked)
  456. # Try to remove locked worktree (should fail)
  457. with self.assertRaises(ValueError) as cm:
  458. remove_worktree(self.repo, wt_path)
  459. self.assertIn("locked", str(cm.exception))
  460. # Unlock it
  461. unlock_worktree(self.repo, wt_path)
  462. # Verify it's unlocked
  463. worktrees = list_worktrees(self.repo)
  464. for wt in worktrees:
  465. if wt.path == wt_path:
  466. self.assertFalse(wt.locked)
  467. # Now removal should work
  468. remove_worktree(self.repo, wt_path)
  469. def test_prune_worktrees(self) -> None:
  470. """Test pruning worktrees."""
  471. # Create a worktree
  472. wt_path = os.path.join(self.tempdir, "to-prune")
  473. add_worktree(self.repo, wt_path)
  474. # Manually remove the worktree directory
  475. shutil.rmtree(wt_path)
  476. # Verify it still shows up as prunable
  477. worktrees = list_worktrees(self.repo)
  478. prunable_count = sum(1 for wt in worktrees if wt.prunable)
  479. self.assertEqual(prunable_count, 1)
  480. # Prune it
  481. pruned = prune_worktrees(self.repo)
  482. self.assertEqual(len(pruned), 1)
  483. # Verify it's gone from the list
  484. worktrees = list_worktrees(self.repo)
  485. self.assertEqual(len(worktrees), 1)
  486. def test_prune_dry_run(self) -> None:
  487. """Test prune with dry_run doesn't remove anything."""
  488. # Create and manually remove a worktree
  489. wt_path = os.path.join(self.tempdir, "dry-run-test")
  490. add_worktree(self.repo, wt_path)
  491. shutil.rmtree(wt_path)
  492. # Dry run should report but not remove
  493. pruned = prune_worktrees(self.repo, dry_run=True)
  494. self.assertEqual(len(pruned), 1)
  495. # Worktree should still be in list
  496. worktrees = list_worktrees(self.repo)
  497. self.assertEqual(len(worktrees), 2)
  498. def test_prune_locked_worktree_not_pruned(self) -> None:
  499. """Test that locked worktrees are not pruned."""
  500. # Create and lock a worktree
  501. wt_path = os.path.join(self.tempdir, "locked-prune")
  502. add_worktree(self.repo, wt_path)
  503. lock_worktree(self.repo, wt_path)
  504. # Remove the directory
  505. shutil.rmtree(wt_path)
  506. # Prune should not remove locked worktree
  507. pruned = prune_worktrees(self.repo)
  508. self.assertEqual(len(pruned), 0)
  509. # Worktree should still be in list
  510. worktrees = list_worktrees(self.repo)
  511. self.assertEqual(len(worktrees), 2)
  512. def test_move_worktree(self) -> None:
  513. """Test moving a worktree."""
  514. # Create a worktree
  515. wt_path = os.path.join(self.tempdir, "to-move")
  516. add_worktree(self.repo, wt_path)
  517. # Create a file in the worktree
  518. test_file = os.path.join(wt_path, "test.txt")
  519. with open(test_file, "w") as f:
  520. f.write("test content")
  521. # Move it
  522. new_path = os.path.join(self.tempdir, "moved")
  523. move_worktree(self.repo, wt_path, new_path)
  524. # Verify old path doesn't exist
  525. self.assertFalse(os.path.exists(wt_path))
  526. # Verify new path exists with contents
  527. self.assertTrue(os.path.exists(new_path))
  528. self.assertTrue(os.path.exists(os.path.join(new_path, "test.txt")))
  529. # Verify it's in the list at new location
  530. worktrees = list_worktrees(self.repo)
  531. paths = [wt.path for wt in worktrees]
  532. self.assertIn(new_path, paths)
  533. self.assertNotIn(wt_path, paths)
  534. def test_move_main_worktree_fails(self) -> None:
  535. """Test that moving the main worktree fails."""
  536. new_path = os.path.join(self.tempdir, "new-main")
  537. with self.assertRaises(ValueError) as cm:
  538. move_worktree(self.repo, self.repo.path, new_path)
  539. self.assertIn("Cannot move the main working tree", str(cm.exception))
  540. def test_move_to_existing_path_fails(self) -> None:
  541. """Test that moving to an existing path fails."""
  542. # Create a worktree
  543. wt_path = os.path.join(self.tempdir, "worktree")
  544. add_worktree(self.repo, wt_path)
  545. # Create target directory
  546. new_path = os.path.join(self.tempdir, "existing")
  547. os.makedirs(new_path)
  548. with self.assertRaises(ValueError) as cm:
  549. move_worktree(self.repo, wt_path, new_path)
  550. self.assertIn("Path already exists", str(cm.exception))
  551. def test_repair_worktree_after_manual_move(self) -> None:
  552. """Test repairing a worktree after manually moving it."""
  553. # Create a worktree
  554. wt_path = os.path.join(self.tempdir, "original")
  555. add_worktree(self.repo, wt_path)
  556. # Manually move the worktree directory (simulating external move)
  557. new_path = os.path.join(self.tempdir, "moved")
  558. shutil.move(wt_path, new_path)
  559. # At this point, the connection is broken
  560. # Repair from the moved worktree
  561. repaired = repair_worktree(self.repo, paths=[new_path])
  562. # Should have repaired the worktree
  563. self.assertEqual(len(repaired), 1)
  564. self.assertEqual(repaired[0], new_path)
  565. # Verify the worktree is now properly connected
  566. worktrees = list_worktrees(self.repo)
  567. paths = [wt.path for wt in worktrees]
  568. self.assertIn(new_path, paths)
  569. def test_repair_worktree_from_main_repo(self) -> None:
  570. """Test repairing worktree connections from main repository."""
  571. # Create a worktree
  572. wt_path = os.path.join(self.tempdir, "worktree")
  573. add_worktree(self.repo, wt_path)
  574. # Read the .git file to get the control directory
  575. gitdir_file = os.path.join(wt_path, ".git")
  576. with open(gitdir_file, "rb") as f:
  577. content = f.read().strip()
  578. control_dir = content[8:].decode() # Remove "gitdir: " prefix
  579. # Manually corrupt the .git file to point to wrong location
  580. with open(gitdir_file, "wb") as f:
  581. f.write(b"gitdir: /wrong/path\n")
  582. # Repair from main repository
  583. repaired = repair_worktree(self.repo)
  584. # Should have repaired the connection
  585. self.assertEqual(len(repaired), 1)
  586. self.assertEqual(repaired[0], wt_path)
  587. # Verify .git file now points to correct location
  588. with open(gitdir_file, "rb") as f:
  589. content = f.read().strip()
  590. new_control_dir = content[8:].decode()
  591. self.assertEqual(
  592. os.path.abspath(new_control_dir), os.path.abspath(control_dir)
  593. )
  594. def test_repair_worktree_no_repairs_needed(self) -> None:
  595. """Test repair when no repairs are needed."""
  596. # Create a worktree
  597. wt_path = os.path.join(self.tempdir, "worktree")
  598. add_worktree(self.repo, wt_path)
  599. # Repair - should return empty list since nothing is broken
  600. repaired = repair_worktree(self.repo)
  601. self.assertEqual(len(repaired), 0)
  602. def test_repair_invalid_worktree_path(self) -> None:
  603. """Test that repairing an invalid path raises an error."""
  604. with self.assertRaises(ValueError) as cm:
  605. repair_worktree(self.repo, paths=["/nonexistent/path"])
  606. self.assertIn("Not a valid worktree", str(cm.exception))
  607. def test_repair_multiple_worktrees(self) -> None:
  608. """Test repairing multiple worktrees at once."""
  609. # Create two worktrees
  610. wt_path1 = os.path.join(self.tempdir, "wt1")
  611. wt_path2 = os.path.join(self.tempdir, "wt2")
  612. add_worktree(self.repo, wt_path1, branch=b"branch1")
  613. add_worktree(self.repo, wt_path2, branch=b"branch2")
  614. # Manually move both worktrees
  615. new_path1 = os.path.join(self.tempdir, "moved1")
  616. new_path2 = os.path.join(self.tempdir, "moved2")
  617. shutil.move(wt_path1, new_path1)
  618. shutil.move(wt_path2, new_path2)
  619. # Repair both at once
  620. repaired = repair_worktree(self.repo, paths=[new_path1, new_path2])
  621. # Both should be repaired
  622. self.assertEqual(len(repaired), 2)
  623. self.assertIn(new_path1, repaired)
  624. self.assertIn(new_path2, repaired)
  625. def test_repair_worktree_with_relative_paths(self) -> None:
  626. """Test that repair handles worktrees with relative paths in gitdir."""
  627. # Create a worktree
  628. wt_path = os.path.join(self.tempdir, "worktree")
  629. add_worktree(self.repo, wt_path)
  630. # Manually move the worktree
  631. new_path = os.path.join(self.tempdir, "new-location")
  632. shutil.move(wt_path, new_path)
  633. # Repair from the new location
  634. repaired = repair_worktree(self.repo, paths=[new_path])
  635. # Should have repaired successfully
  636. self.assertEqual(len(repaired), 1)
  637. self.assertEqual(repaired[0], new_path)
  638. # Verify the gitdir pointer was updated
  639. from dulwich.repo import GITDIR, WORKTREES
  640. worktrees_dir = os.path.join(self.repo.controldir(), WORKTREES)
  641. for entry in os.listdir(worktrees_dir):
  642. gitdir_path = os.path.join(worktrees_dir, entry, GITDIR)
  643. if os.path.exists(gitdir_path):
  644. with open(gitdir_path, "rb") as f:
  645. content = f.read().strip()
  646. gitdir_location = os.fsdecode(content)
  647. # Should point to the new .git file location
  648. self.assertTrue(gitdir_location.endswith(".git"))
  649. def test_repair_worktree_container_method(self) -> None:
  650. """Test the WorkTreeContainer.repair() method."""
  651. # Create a worktree
  652. wt_path = os.path.join(self.tempdir, "worktree")
  653. add_worktree(self.repo, wt_path)
  654. # Manually move it
  655. new_path = os.path.join(self.tempdir, "moved")
  656. shutil.move(wt_path, new_path)
  657. # Use the container method to repair
  658. repaired = self.repo.worktrees.repair(paths=[new_path])
  659. # Should have repaired
  660. self.assertEqual(len(repaired), 1)
  661. self.assertEqual(repaired[0], new_path)
  662. def test_repair_with_missing_gitdir_pointer(self) -> None:
  663. """Test repair when gitdir pointer file is missing."""
  664. # Create a worktree
  665. wt_path = os.path.join(self.tempdir, "worktree")
  666. add_worktree(self.repo, wt_path)
  667. # Find and remove the gitdir pointer file
  668. from dulwich.repo import GITDIR, WORKTREES
  669. worktrees_dir = os.path.join(self.repo.controldir(), WORKTREES)
  670. for entry in os.listdir(worktrees_dir):
  671. gitdir_path = os.path.join(worktrees_dir, entry, GITDIR)
  672. if os.path.exists(gitdir_path):
  673. os.remove(gitdir_path)
  674. # Repair should not crash, but won't repair anything
  675. repaired = repair_worktree(self.repo, paths=[wt_path])
  676. self.assertEqual(len(repaired), 0)
  677. def test_repair_worktree_with_corrupted_git_file(self) -> None:
  678. """Test repair with a corrupted .git file."""
  679. # Create a worktree
  680. wt_path = os.path.join(self.tempdir, "worktree")
  681. add_worktree(self.repo, wt_path)
  682. # Corrupt the .git file
  683. gitdir_file = os.path.join(wt_path, ".git")
  684. with open(gitdir_file, "wb") as f:
  685. f.write(b"invalid content\n")
  686. # Attempting to repair should raise an error
  687. with self.assertRaises(ValueError) as cm:
  688. repair_worktree(self.repo, paths=[wt_path])
  689. self.assertIn("Invalid .git file", str(cm.exception))
  690. class TemporaryWorktreeTests(TestCase):
  691. """Tests for temporary_worktree context manager."""
  692. def setUp(self) -> None:
  693. super().setUp()
  694. self.tempdir = tempfile.mkdtemp()
  695. self.addCleanup(shutil.rmtree, self.tempdir)
  696. self.repo_path = os.path.join(self.tempdir, "repo")
  697. self.repo = Repo.init(self.repo_path, mkdir=True)
  698. # Create an initial commit so HEAD exists
  699. readme_path = os.path.join(self.repo_path, "README.md")
  700. with open(readme_path, "w") as f:
  701. f.write("# Test Repository\n")
  702. wt = self.repo.get_worktree()
  703. wt.stage(["README.md"])
  704. wt.commit(message=b"Initial commit")
  705. def test_temporary_worktree_creates_and_cleans_up(self) -> None:
  706. """Test that temporary worktree is created and cleaned up."""
  707. worktree_path = None
  708. # Use the context manager
  709. with temporary_worktree(self.repo) as worktree:
  710. worktree_path = worktree.path
  711. # Check that worktree exists
  712. self.assertTrue(os.path.exists(worktree_path))
  713. # Check that it's in the list of worktrees
  714. worktrees = list_worktrees(self.repo)
  715. paths = [wt.path for wt in worktrees]
  716. self.assertIn(worktree_path, paths)
  717. # Check that .git file exists in worktree
  718. gitdir_file = os.path.join(worktree_path, ".git")
  719. self.assertTrue(os.path.exists(gitdir_file))
  720. # After context manager exits, check cleanup
  721. self.assertFalse(os.path.exists(worktree_path))
  722. # Check that it's no longer in the list of worktrees
  723. worktrees = list_worktrees(self.repo)
  724. paths = [wt.path for wt in worktrees]
  725. self.assertNotIn(worktree_path, paths)
  726. def test_temporary_worktree_with_custom_prefix(self) -> None:
  727. """Test temporary worktree with custom prefix."""
  728. custom_prefix = "my-custom-prefix-"
  729. with temporary_worktree(self.repo, prefix=custom_prefix) as worktree:
  730. # Check that the directory name starts with our prefix
  731. dirname = os.path.basename(worktree.path)
  732. self.assertTrue(dirname.startswith(custom_prefix))
  733. def test_temporary_worktree_cleanup_on_exception(self) -> None:
  734. """Test that cleanup happens even when exception is raised."""
  735. worktree_path = None
  736. class TestException(Exception):
  737. pass
  738. try:
  739. with temporary_worktree(self.repo) as worktree:
  740. worktree_path = worktree.path
  741. self.assertTrue(os.path.exists(worktree_path))
  742. raise TestException("Test exception")
  743. except TestException:
  744. pass
  745. # Cleanup should still happen
  746. self.assertFalse(os.path.exists(worktree_path))
  747. # Check that it's no longer in the list of worktrees
  748. worktrees = list_worktrees(self.repo)
  749. paths = [wt.path for wt in worktrees]
  750. self.assertNotIn(worktree_path, paths)
  751. def test_temporary_worktree_operations(self) -> None:
  752. """Test that operations can be performed in temporary worktree."""
  753. # Create a test file in main repo
  754. test_file = os.path.join(self.repo_path, "test.txt")
  755. with open(test_file, "w") as f:
  756. f.write("Hello, world!")
  757. wt = self.repo.get_worktree()
  758. wt.stage(["test.txt"])
  759. wt.commit(message=b"Initial commit")
  760. with temporary_worktree(self.repo) as worktree:
  761. # Check that the file exists in the worktree
  762. wt_test_file = os.path.join(worktree.path, "test.txt")
  763. self.assertTrue(os.path.exists(wt_test_file))
  764. # Read and verify content
  765. with open(wt_test_file) as f:
  766. content = f.read()
  767. self.assertEqual(content, "Hello, world!")
  768. # Make changes in the worktree
  769. with open(wt_test_file, "w") as f:
  770. f.write("Modified content")
  771. # Changes should be visible as unstaged
  772. unstaged = get_unstaged_changes(worktree)
  773. self.assertIn(os.fsencode("test.txt"), unstaged)