test_worktree.py 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009
  1. # test_worktree.py -- Tests for dulwich.worktree
  2. # Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for dulwich.worktree."""
  22. import os
  23. import shutil
  24. import stat
  25. import tempfile
  26. from unittest import skipIf
  27. from dulwich.errors import CommitError
  28. from dulwich.index import get_unstaged_changes as _get_unstaged_changes
  29. from dulwich.object_store import tree_lookup_path
  30. from dulwich.repo import Repo
  31. from dulwich.worktree import (
  32. WorkTree,
  33. add_worktree,
  34. list_worktrees,
  35. lock_worktree,
  36. move_worktree,
  37. prune_worktrees,
  38. remove_worktree,
  39. repair_worktree,
  40. temporary_worktree,
  41. unlock_worktree,
  42. )
  43. from . import TestCase
  44. def get_unstaged_changes(repo):
  45. """Helper to get unstaged changes for a repo."""
  46. index = repo.open_index()
  47. normalizer = repo.get_blob_normalizer()
  48. filter_callback = normalizer.checkin_normalize if normalizer else None
  49. return list(_get_unstaged_changes(index, repo.path, filter_callback, False))
  50. class WorkTreeTestCase(TestCase):
  51. """Base test case for WorkTree tests."""
  52. def setUp(self):
  53. super().setUp()
  54. self.tempdir = tempfile.mkdtemp()
  55. self.test_dir = os.path.join(self.tempdir, "main")
  56. self.repo = Repo.init(self.test_dir, mkdir=True)
  57. # Create initial commit with a file
  58. with open(os.path.join(self.test_dir, "a"), "wb") as f:
  59. f.write(b"contents of file a")
  60. self.repo.get_worktree().stage(["a"])
  61. self.root_commit = self.repo.get_worktree().commit(
  62. message=b"Initial commit",
  63. committer=b"Test Committer <test@nodomain.com>",
  64. author=b"Test Author <test@nodomain.com>",
  65. commit_timestamp=12345,
  66. commit_timezone=0,
  67. author_timestamp=12345,
  68. author_timezone=0,
  69. )
  70. self.worktree = self.repo.get_worktree()
  71. def tearDown(self):
  72. self.repo.close()
  73. super().tearDown()
  74. def write_file(self, filename, content):
  75. """Helper to write a file in the repo."""
  76. with open(os.path.join(self.test_dir, filename), "wb") as f:
  77. f.write(content)
  78. class WorkTreeInitTests(TestCase):
  79. """Tests for WorkTree initialization."""
  80. def test_init_with_repo_path(self):
  81. """Test WorkTree initialization with same path as repo."""
  82. with tempfile.TemporaryDirectory() as tmpdir:
  83. repo = Repo.init(tmpdir)
  84. worktree = WorkTree(repo, tmpdir)
  85. self.assertEqual(worktree.path, tmpdir)
  86. self.assertEqual(worktree._repo, repo)
  87. self.assertTrue(os.path.isabs(worktree.path))
  88. def test_init_with_different_path(self):
  89. """Test WorkTree initialization with different path from repo."""
  90. with tempfile.TemporaryDirectory() as tmpdir:
  91. repo_path = os.path.join(tmpdir, "repo")
  92. worktree_path = os.path.join(tmpdir, "worktree")
  93. os.makedirs(repo_path)
  94. os.makedirs(worktree_path)
  95. repo = Repo.init(repo_path)
  96. worktree = WorkTree(repo, worktree_path)
  97. self.assertNotEqual(worktree.path, repo.path)
  98. self.assertEqual(worktree.path, worktree_path)
  99. self.assertEqual(worktree._repo, repo)
  100. self.assertTrue(os.path.isabs(worktree.path))
  101. def test_init_with_bytes_path(self):
  102. """Test WorkTree initialization with bytes path."""
  103. with tempfile.TemporaryDirectory() as tmpdir:
  104. repo = Repo.init(tmpdir)
  105. worktree = WorkTree(repo, tmpdir.encode("utf-8"))
  106. self.assertEqual(worktree.path, tmpdir)
  107. self.assertIsInstance(worktree.path, str)
  108. class WorkTreeStagingTests(WorkTreeTestCase):
  109. """Tests for WorkTree staging operations."""
  110. def test_stage_absolute(self):
  111. """Test that staging with absolute paths raises ValueError."""
  112. r = self.repo
  113. os.remove(os.path.join(r.path, "a"))
  114. self.assertRaises(ValueError, self.worktree.stage, [os.path.join(r.path, "a")])
  115. def test_stage_deleted(self):
  116. """Test staging a deleted file."""
  117. r = self.repo
  118. os.remove(os.path.join(r.path, "a"))
  119. self.worktree.stage(["a"])
  120. self.worktree.stage(["a"]) # double-stage a deleted path
  121. self.assertEqual([], list(r.open_index()))
  122. def test_stage_directory(self):
  123. """Test staging a directory."""
  124. r = self.repo
  125. os.mkdir(os.path.join(r.path, "c"))
  126. self.worktree.stage(["c"])
  127. self.assertEqual([b"a"], list(r.open_index()))
  128. def test_stage_submodule(self):
  129. """Test staging a submodule."""
  130. r = self.repo
  131. s = Repo.init(os.path.join(r.path, "sub"), mkdir=True)
  132. s.get_worktree().commit(
  133. message=b"message",
  134. )
  135. self.worktree.stage(["sub"])
  136. self.assertEqual([b"a", b"sub"], list(r.open_index()))
  137. class WorkTreeUnstagingTests(WorkTreeTestCase):
  138. """Tests for WorkTree unstaging operations."""
  139. def test_unstage_modify_file_with_dir(self):
  140. """Test unstaging a modified file in a directory."""
  141. os.mkdir(os.path.join(self.repo.path, "new_dir"))
  142. full_path = os.path.join(self.repo.path, "new_dir", "foo")
  143. with open(full_path, "w") as f:
  144. f.write("hello")
  145. self.worktree.stage(["new_dir/foo"])
  146. self.worktree.commit(
  147. message=b"unittest",
  148. committer=b"Jane <jane@example.com>",
  149. author=b"John <john@example.com>",
  150. )
  151. with open(full_path, "a") as f:
  152. f.write("something new")
  153. self.worktree.unstage(["new_dir/foo"])
  154. unstaged = get_unstaged_changes(self.repo)
  155. self.assertEqual([b"new_dir/foo"], unstaged)
  156. def test_unstage_while_no_commit(self):
  157. """Test unstaging when there are no commits."""
  158. file = "foo"
  159. full_path = os.path.join(self.repo.path, file)
  160. with open(full_path, "w") as f:
  161. f.write("hello")
  162. self.worktree.stage([file])
  163. self.worktree.unstage([file])
  164. # Check that file is no longer in index
  165. index = self.repo.open_index()
  166. self.assertNotIn(b"foo", index)
  167. def test_unstage_add_file(self):
  168. """Test unstaging a newly added file."""
  169. file = "foo"
  170. full_path = os.path.join(self.repo.path, file)
  171. self.worktree.commit(
  172. message=b"unittest",
  173. committer=b"Jane <jane@example.com>",
  174. author=b"John <john@example.com>",
  175. )
  176. with open(full_path, "w") as f:
  177. f.write("hello")
  178. self.worktree.stage([file])
  179. self.worktree.unstage([file])
  180. # Check that file is no longer in index
  181. index = self.repo.open_index()
  182. self.assertNotIn(b"foo", index)
  183. def test_unstage_modify_file(self):
  184. """Test unstaging a modified file."""
  185. file = "foo"
  186. full_path = os.path.join(self.repo.path, file)
  187. with open(full_path, "w") as f:
  188. f.write("hello")
  189. self.worktree.stage([file])
  190. self.worktree.commit(
  191. message=b"unittest",
  192. committer=b"Jane <jane@example.com>",
  193. author=b"John <john@example.com>",
  194. )
  195. with open(full_path, "a") as f:
  196. f.write("broken")
  197. self.worktree.stage([file])
  198. self.worktree.unstage([file])
  199. unstaged = get_unstaged_changes(self.repo)
  200. self.assertEqual([os.fsencode("foo")], unstaged)
  201. def test_unstage_remove_file(self):
  202. """Test unstaging a removed file."""
  203. file = "foo"
  204. full_path = os.path.join(self.repo.path, file)
  205. with open(full_path, "w") as f:
  206. f.write("hello")
  207. self.worktree.stage([file])
  208. self.worktree.commit(
  209. message=b"unittest",
  210. committer=b"Jane <jane@example.com>",
  211. author=b"John <john@example.com>",
  212. )
  213. os.remove(full_path)
  214. self.worktree.unstage([file])
  215. unstaged = get_unstaged_changes(self.repo)
  216. self.assertEqual([os.fsencode("foo")], unstaged)
  217. class WorkTreeCommitTests(WorkTreeTestCase):
  218. """Tests for WorkTree commit operations."""
  219. def test_commit_modified(self):
  220. """Test committing a modified file."""
  221. r = self.repo
  222. with open(os.path.join(r.path, "a"), "wb") as f:
  223. f.write(b"new contents")
  224. self.worktree.stage(["a"])
  225. commit_sha = self.worktree.commit(
  226. b"modified a",
  227. committer=b"Test Committer <test@nodomain.com>",
  228. author=b"Test Author <test@nodomain.com>",
  229. commit_timestamp=12395,
  230. commit_timezone=0,
  231. author_timestamp=12395,
  232. author_timezone=0,
  233. )
  234. self.assertEqual([self.root_commit], r[commit_sha].parents)
  235. a_mode, a_id = tree_lookup_path(r.get_object, r[commit_sha].tree, b"a")
  236. self.assertEqual(stat.S_IFREG | 0o644, a_mode)
  237. self.assertEqual(b"new contents", r[a_id].data)
  238. @skipIf(not getattr(os, "symlink", None), "Requires symlink support")
  239. def test_commit_symlink(self):
  240. """Test committing a symlink."""
  241. r = self.repo
  242. os.symlink("a", os.path.join(r.path, "b"))
  243. self.worktree.stage(["a", "b"])
  244. commit_sha = self.worktree.commit(
  245. b"Symlink b",
  246. committer=b"Test Committer <test@nodomain.com>",
  247. author=b"Test Author <test@nodomain.com>",
  248. commit_timestamp=12395,
  249. commit_timezone=0,
  250. author_timestamp=12395,
  251. author_timezone=0,
  252. )
  253. self.assertEqual([self.root_commit], r[commit_sha].parents)
  254. b_mode, b_id = tree_lookup_path(r.get_object, r[commit_sha].tree, b"b")
  255. self.assertEqual(stat.S_IFLNK, b_mode)
  256. self.assertEqual(b"a", r[b_id].data)
  257. class WorkTreeResetTests(WorkTreeTestCase):
  258. """Tests for WorkTree reset operations."""
  259. def test_reset_index(self):
  260. """Test resetting the index."""
  261. # Make some changes and stage them
  262. with open(os.path.join(self.repo.path, "a"), "wb") as f:
  263. f.write(b"modified contents")
  264. self.worktree.stage(["a"])
  265. # Reset index should restore to HEAD
  266. self.worktree.reset_index()
  267. # Check that the working tree file was restored
  268. with open(os.path.join(self.repo.path, "a"), "rb") as f:
  269. contents = f.read()
  270. self.assertEqual(b"contents of file a", contents)
  271. class WorkTreeSparseCheckoutTests(WorkTreeTestCase):
  272. """Tests for WorkTree sparse checkout operations."""
  273. def test_get_sparse_checkout_patterns_empty(self):
  274. """Test getting sparse checkout patterns when file doesn't exist."""
  275. patterns = self.worktree.get_sparse_checkout_patterns()
  276. self.assertEqual([], patterns)
  277. def test_set_sparse_checkout_patterns(self):
  278. """Test setting sparse checkout patterns."""
  279. patterns = ["*.py", "docs/"]
  280. self.worktree.set_sparse_checkout_patterns(patterns)
  281. # Read back the patterns
  282. retrieved_patterns = self.worktree.get_sparse_checkout_patterns()
  283. self.assertEqual(patterns, retrieved_patterns)
  284. def test_configure_for_cone_mode(self):
  285. """Test configuring repository for cone mode."""
  286. self.worktree.configure_for_cone_mode()
  287. config = self.repo.get_config()
  288. self.assertEqual(b"true", config.get((b"core",), b"sparseCheckout"))
  289. self.assertEqual(b"true", config.get((b"core",), b"sparseCheckoutCone"))
  290. def test_infer_cone_mode_false(self):
  291. """Test inferring cone mode when not configured."""
  292. self.assertFalse(self.worktree.infer_cone_mode())
  293. def test_infer_cone_mode_true(self):
  294. """Test inferring cone mode when configured."""
  295. self.worktree.configure_for_cone_mode()
  296. self.assertTrue(self.worktree.infer_cone_mode())
  297. def test_set_cone_mode_patterns(self):
  298. """Test setting cone mode patterns."""
  299. dirs = ["src", "tests"]
  300. self.worktree.set_cone_mode_patterns(dirs)
  301. patterns = self.worktree.get_sparse_checkout_patterns()
  302. expected = ["/*", "!/*/", "/src/", "/tests/"]
  303. self.assertEqual(expected, patterns)
  304. def test_set_cone_mode_patterns_empty(self):
  305. """Test setting cone mode patterns with empty list."""
  306. self.worktree.set_cone_mode_patterns([])
  307. patterns = self.worktree.get_sparse_checkout_patterns()
  308. expected = ["/*", "!/*/"]
  309. self.assertEqual(expected, patterns)
  310. def test_set_cone_mode_patterns_duplicates(self):
  311. """Test that duplicate patterns are not added."""
  312. dirs = ["src", "src"] # duplicate
  313. self.worktree.set_cone_mode_patterns(dirs)
  314. patterns = self.worktree.get_sparse_checkout_patterns()
  315. expected = ["/*", "!/*/", "/src/"]
  316. self.assertEqual(expected, patterns)
  317. def test_sparse_checkout_file_path(self):
  318. """Test getting the sparse checkout file path."""
  319. expected_path = os.path.join(self.repo.controldir(), "info", "sparse-checkout")
  320. actual_path = self.worktree._sparse_checkout_file_path()
  321. self.assertEqual(expected_path, actual_path)
  322. class WorkTreeBackwardCompatibilityTests(WorkTreeTestCase):
  323. """Tests for backward compatibility of deprecated Repo methods."""
  324. def test_deprecated_stage_delegates_to_worktree(self):
  325. """Test that deprecated Repo.stage delegates to WorkTree."""
  326. with open(os.path.join(self.repo.path, "new_file"), "w") as f:
  327. f.write("test content")
  328. # This should show a deprecation warning but still work
  329. import warnings
  330. with warnings.catch_warnings(record=True) as w:
  331. warnings.simplefilter("always")
  332. self.repo.stage(
  333. ["new_file"]
  334. ) # Call deprecated method on Repo, not WorkTree
  335. self.assertTrue(len(w) > 0)
  336. self.assertTrue(issubclass(w[0].category, DeprecationWarning))
  337. def test_deprecated_unstage_delegates_to_worktree(self):
  338. """Test that deprecated Repo.unstage delegates to WorkTree."""
  339. # This should show a deprecation warning but still work
  340. import warnings
  341. with warnings.catch_warnings(record=True) as w:
  342. warnings.simplefilter("always")
  343. self.repo.unstage(["a"]) # Call deprecated method on Repo, not WorkTree
  344. self.assertTrue(len(w) > 0)
  345. self.assertTrue(issubclass(w[0].category, DeprecationWarning))
  346. def test_deprecated_sparse_checkout_methods(self):
  347. """Test that deprecated sparse checkout methods delegate to WorkTree."""
  348. import warnings
  349. # Test get_sparse_checkout_patterns
  350. with warnings.catch_warnings(record=True) as w:
  351. warnings.simplefilter("always")
  352. patterns = (
  353. self.repo.get_sparse_checkout_patterns()
  354. ) # Call deprecated method on Repo
  355. self.assertEqual([], patterns)
  356. self.assertTrue(len(w) > 0)
  357. self.assertTrue(issubclass(w[0].category, DeprecationWarning))
  358. # Test set_sparse_checkout_patterns
  359. with warnings.catch_warnings(record=True) as w:
  360. warnings.simplefilter("always")
  361. self.repo.set_sparse_checkout_patterns(
  362. ["*.py"]
  363. ) # Call deprecated method on Repo
  364. self.assertTrue(len(w) > 0)
  365. self.assertTrue(issubclass(w[0].category, DeprecationWarning))
  366. def test_pre_commit_hook_fail(self):
  367. """Test that failing pre-commit hook raises CommitError."""
  368. if os.name != "posix":
  369. self.skipTest("shell hook tests requires POSIX shell")
  370. # Create a failing pre-commit hook
  371. hooks_dir = os.path.join(self.repo.controldir(), "hooks")
  372. os.makedirs(hooks_dir, exist_ok=True)
  373. hook_path = os.path.join(hooks_dir, "pre-commit")
  374. with open(hook_path, "w") as f:
  375. f.write("#!/bin/sh\nexit 1\n")
  376. os.chmod(hook_path, 0o755)
  377. # Try to commit
  378. worktree = self.repo.get_worktree()
  379. with self.assertRaises(CommitError):
  380. worktree.commit(b"No message")
  381. def write_file(self, filename, content):
  382. """Helper to write a file in the repo."""
  383. with open(os.path.join(self.test_dir, filename), "wb") as f:
  384. f.write(content)
  385. class WorkTreeOperationsTests(WorkTreeTestCase):
  386. """Tests for worktree operations like add, list, remove."""
  387. def test_list_worktrees_single(self) -> None:
  388. """Test listing worktrees when only main worktree exists."""
  389. worktrees = list_worktrees(self.repo)
  390. self.assertEqual(len(worktrees), 1)
  391. self.assertEqual(worktrees[0].path, self.repo.path)
  392. self.assertEqual(worktrees[0].bare, False)
  393. self.assertIsNotNone(worktrees[0].head)
  394. self.assertIsNotNone(worktrees[0].branch)
  395. def test_add_worktree_new_branch(self) -> None:
  396. """Test adding a worktree with a new branch."""
  397. # Create a commit first
  398. worktree = self.repo.get_worktree()
  399. self.write_file("test.txt", b"test content")
  400. worktree.stage(["test.txt"])
  401. commit_id = worktree.commit(message=b"Initial commit")
  402. # Add a new worktree
  403. wt_path = os.path.join(self.tempdir, "new-worktree")
  404. add_worktree(self.repo, wt_path, branch=b"feature-branch")
  405. # Verify worktree was created
  406. self.assertTrue(os.path.exists(wt_path))
  407. self.assertTrue(os.path.exists(os.path.join(wt_path, ".git")))
  408. # Verify it appears in the list
  409. worktrees = list_worktrees(self.repo)
  410. self.assertEqual(len(worktrees), 2)
  411. # Find the new worktree in the list
  412. new_wt = None
  413. for wt in worktrees:
  414. if wt.path == wt_path:
  415. new_wt = wt
  416. break
  417. self.assertIsNotNone(new_wt)
  418. self.assertEqual(new_wt.branch, b"refs/heads/feature-branch")
  419. self.assertEqual(new_wt.head, commit_id)
  420. self.assertFalse(new_wt.detached)
  421. def test_add_worktree_detached(self) -> None:
  422. """Test adding a worktree with detached HEAD."""
  423. # Create a commit
  424. worktree = self.repo.get_worktree()
  425. self.write_file("test.txt", b"test content")
  426. worktree.stage(["test.txt"])
  427. commit_id = worktree.commit(message=b"Initial commit")
  428. # Add a detached worktree
  429. wt_path = os.path.join(self.tempdir, "detached-worktree")
  430. add_worktree(self.repo, wt_path, commit=commit_id, detach=True)
  431. # Verify it's detached
  432. worktrees = list_worktrees(self.repo)
  433. self.assertEqual(len(worktrees), 2)
  434. for wt in worktrees:
  435. if wt.path == wt_path:
  436. self.assertTrue(wt.detached)
  437. self.assertIsNone(wt.branch)
  438. self.assertEqual(wt.head, commit_id)
  439. def test_add_worktree_existing_path(self) -> None:
  440. """Test that adding a worktree to existing path fails."""
  441. wt_path = os.path.join(self.tempdir, "existing")
  442. os.mkdir(wt_path)
  443. with self.assertRaises(ValueError) as cm:
  444. add_worktree(self.repo, wt_path)
  445. self.assertIn("Path already exists", str(cm.exception))
  446. def test_add_worktree_branch_already_checked_out(self) -> None:
  447. """Test that checking out same branch in multiple worktrees fails."""
  448. # Create initial commit
  449. worktree = self.repo.get_worktree()
  450. self.write_file("test.txt", b"test content")
  451. worktree.stage(["test.txt"])
  452. worktree.commit(message=b"Initial commit")
  453. # First worktree should succeed with a new branch
  454. wt_path1 = os.path.join(self.tempdir, "wt1")
  455. add_worktree(self.repo, wt_path1, branch=b"feature")
  456. # Second worktree with same branch should fail
  457. wt_path2 = os.path.join(self.tempdir, "wt2")
  458. with self.assertRaises(ValueError) as cm:
  459. add_worktree(self.repo, wt_path2, branch=b"feature")
  460. self.assertIn("already checked out", str(cm.exception))
  461. # But should work with force=True
  462. add_worktree(self.repo, wt_path2, branch=b"feature", force=True)
  463. def test_remove_worktree(self) -> None:
  464. """Test removing a worktree."""
  465. # Create a worktree
  466. wt_path = os.path.join(self.tempdir, "to-remove")
  467. add_worktree(self.repo, wt_path)
  468. # Verify it exists
  469. self.assertTrue(os.path.exists(wt_path))
  470. self.assertEqual(len(list_worktrees(self.repo)), 2)
  471. # Remove it
  472. remove_worktree(self.repo, wt_path)
  473. # Verify it's gone
  474. self.assertFalse(os.path.exists(wt_path))
  475. self.assertEqual(len(list_worktrees(self.repo)), 1)
  476. def test_remove_main_worktree_fails(self) -> None:
  477. """Test that removing the main worktree fails."""
  478. with self.assertRaises(ValueError) as cm:
  479. remove_worktree(self.repo, self.repo.path)
  480. self.assertIn("Cannot remove the main working tree", str(cm.exception))
  481. def test_remove_nonexistent_worktree(self) -> None:
  482. """Test that removing non-existent worktree fails."""
  483. with self.assertRaises(ValueError) as cm:
  484. remove_worktree(self.repo, "/nonexistent/path")
  485. self.assertIn("Worktree not found", str(cm.exception))
  486. def test_lock_unlock_worktree(self) -> None:
  487. """Test locking and unlocking a worktree."""
  488. # Create a worktree
  489. wt_path = os.path.join(self.tempdir, "lockable")
  490. add_worktree(self.repo, wt_path)
  491. # Lock it
  492. lock_worktree(self.repo, wt_path, reason="Testing lock")
  493. # Verify it's locked
  494. worktrees = list_worktrees(self.repo)
  495. for wt in worktrees:
  496. if wt.path == wt_path:
  497. self.assertTrue(wt.locked)
  498. # Try to remove locked worktree (should fail)
  499. with self.assertRaises(ValueError) as cm:
  500. remove_worktree(self.repo, wt_path)
  501. self.assertIn("locked", str(cm.exception))
  502. # Unlock it
  503. unlock_worktree(self.repo, wt_path)
  504. # Verify it's unlocked
  505. worktrees = list_worktrees(self.repo)
  506. for wt in worktrees:
  507. if wt.path == wt_path:
  508. self.assertFalse(wt.locked)
  509. # Now removal should work
  510. remove_worktree(self.repo, wt_path)
  511. def test_prune_worktrees(self) -> None:
  512. """Test pruning worktrees."""
  513. # Create a worktree
  514. wt_path = os.path.join(self.tempdir, "to-prune")
  515. add_worktree(self.repo, wt_path)
  516. # Manually remove the worktree directory
  517. shutil.rmtree(wt_path)
  518. # Verify it still shows up as prunable
  519. worktrees = list_worktrees(self.repo)
  520. prunable_count = sum(1 for wt in worktrees if wt.prunable)
  521. self.assertEqual(prunable_count, 1)
  522. # Prune it
  523. pruned = prune_worktrees(self.repo)
  524. self.assertEqual(len(pruned), 1)
  525. # Verify it's gone from the list
  526. worktrees = list_worktrees(self.repo)
  527. self.assertEqual(len(worktrees), 1)
  528. def test_prune_dry_run(self) -> None:
  529. """Test prune with dry_run doesn't remove anything."""
  530. # Create and manually remove a worktree
  531. wt_path = os.path.join(self.tempdir, "dry-run-test")
  532. add_worktree(self.repo, wt_path)
  533. shutil.rmtree(wt_path)
  534. # Dry run should report but not remove
  535. pruned = prune_worktrees(self.repo, dry_run=True)
  536. self.assertEqual(len(pruned), 1)
  537. # Worktree should still be in list
  538. worktrees = list_worktrees(self.repo)
  539. self.assertEqual(len(worktrees), 2)
  540. def test_prune_locked_worktree_not_pruned(self) -> None:
  541. """Test that locked worktrees are not pruned."""
  542. # Create and lock a worktree
  543. wt_path = os.path.join(self.tempdir, "locked-prune")
  544. add_worktree(self.repo, wt_path)
  545. lock_worktree(self.repo, wt_path)
  546. # Remove the directory
  547. shutil.rmtree(wt_path)
  548. # Prune should not remove locked worktree
  549. pruned = prune_worktrees(self.repo)
  550. self.assertEqual(len(pruned), 0)
  551. # Worktree should still be in list
  552. worktrees = list_worktrees(self.repo)
  553. self.assertEqual(len(worktrees), 2)
  554. def test_move_worktree(self) -> None:
  555. """Test moving a worktree."""
  556. # Create a worktree
  557. wt_path = os.path.join(self.tempdir, "to-move")
  558. add_worktree(self.repo, wt_path)
  559. # Create a file in the worktree
  560. test_file = os.path.join(wt_path, "test.txt")
  561. with open(test_file, "w") as f:
  562. f.write("test content")
  563. # Move it
  564. new_path = os.path.join(self.tempdir, "moved")
  565. move_worktree(self.repo, wt_path, new_path)
  566. # Verify old path doesn't exist
  567. self.assertFalse(os.path.exists(wt_path))
  568. # Verify new path exists with contents
  569. self.assertTrue(os.path.exists(new_path))
  570. self.assertTrue(os.path.exists(os.path.join(new_path, "test.txt")))
  571. # Verify it's in the list at new location
  572. worktrees = list_worktrees(self.repo)
  573. paths = [wt.path for wt in worktrees]
  574. self.assertIn(new_path, paths)
  575. self.assertNotIn(wt_path, paths)
  576. def test_move_main_worktree_fails(self) -> None:
  577. """Test that moving the main worktree fails."""
  578. new_path = os.path.join(self.tempdir, "new-main")
  579. with self.assertRaises(ValueError) as cm:
  580. move_worktree(self.repo, self.repo.path, new_path)
  581. self.assertIn("Cannot move the main working tree", str(cm.exception))
  582. def test_move_to_existing_path_fails(self) -> None:
  583. """Test that moving to an existing path fails."""
  584. # Create a worktree
  585. wt_path = os.path.join(self.tempdir, "worktree")
  586. add_worktree(self.repo, wt_path)
  587. # Create target directory
  588. new_path = os.path.join(self.tempdir, "existing")
  589. os.makedirs(new_path)
  590. with self.assertRaises(ValueError) as cm:
  591. move_worktree(self.repo, wt_path, new_path)
  592. self.assertIn("Path already exists", str(cm.exception))
  593. def test_repair_worktree_after_manual_move(self) -> None:
  594. """Test repairing a worktree after manually moving it."""
  595. # Create a worktree
  596. wt_path = os.path.join(self.tempdir, "original")
  597. add_worktree(self.repo, wt_path)
  598. # Manually move the worktree directory (simulating external move)
  599. new_path = os.path.join(self.tempdir, "moved")
  600. shutil.move(wt_path, new_path)
  601. # At this point, the connection is broken
  602. # Repair from the moved worktree
  603. repaired = repair_worktree(self.repo, paths=[new_path])
  604. # Should have repaired the worktree
  605. self.assertEqual(len(repaired), 1)
  606. self.assertEqual(repaired[0], new_path)
  607. # Verify the worktree is now properly connected
  608. worktrees = list_worktrees(self.repo)
  609. paths = [wt.path for wt in worktrees]
  610. self.assertIn(new_path, paths)
  611. def test_repair_worktree_from_main_repo(self) -> None:
  612. """Test repairing worktree connections from main repository."""
  613. # Create a worktree
  614. wt_path = os.path.join(self.tempdir, "worktree")
  615. add_worktree(self.repo, wt_path)
  616. # Read the .git file to get the control directory
  617. gitdir_file = os.path.join(wt_path, ".git")
  618. with open(gitdir_file, "rb") as f:
  619. content = f.read().strip()
  620. control_dir = content[8:].decode() # Remove "gitdir: " prefix
  621. # Manually corrupt the .git file to point to wrong location
  622. with open(gitdir_file, "wb") as f:
  623. f.write(b"gitdir: /wrong/path\n")
  624. # Repair from main repository
  625. repaired = repair_worktree(self.repo)
  626. # Should have repaired the connection
  627. self.assertEqual(len(repaired), 1)
  628. self.assertEqual(repaired[0], wt_path)
  629. # Verify .git file now points to correct location
  630. with open(gitdir_file, "rb") as f:
  631. content = f.read().strip()
  632. new_control_dir = content[8:].decode()
  633. self.assertEqual(
  634. os.path.abspath(new_control_dir), os.path.abspath(control_dir)
  635. )
  636. def test_repair_worktree_no_repairs_needed(self) -> None:
  637. """Test repair when no repairs are needed."""
  638. # Create a worktree
  639. wt_path = os.path.join(self.tempdir, "worktree")
  640. add_worktree(self.repo, wt_path)
  641. # Repair - should return empty list since nothing is broken
  642. repaired = repair_worktree(self.repo)
  643. self.assertEqual(len(repaired), 0)
  644. def test_repair_invalid_worktree_path(self) -> None:
  645. """Test that repairing an invalid path raises an error."""
  646. with self.assertRaises(ValueError) as cm:
  647. repair_worktree(self.repo, paths=["/nonexistent/path"])
  648. self.assertIn("Not a valid worktree", str(cm.exception))
  649. def test_repair_multiple_worktrees(self) -> None:
  650. """Test repairing multiple worktrees at once."""
  651. # Create two worktrees
  652. wt_path1 = os.path.join(self.tempdir, "wt1")
  653. wt_path2 = os.path.join(self.tempdir, "wt2")
  654. add_worktree(self.repo, wt_path1, branch=b"branch1")
  655. add_worktree(self.repo, wt_path2, branch=b"branch2")
  656. # Manually move both worktrees
  657. new_path1 = os.path.join(self.tempdir, "moved1")
  658. new_path2 = os.path.join(self.tempdir, "moved2")
  659. shutil.move(wt_path1, new_path1)
  660. shutil.move(wt_path2, new_path2)
  661. # Repair both at once
  662. repaired = repair_worktree(self.repo, paths=[new_path1, new_path2])
  663. # Both should be repaired
  664. self.assertEqual(len(repaired), 2)
  665. self.assertIn(new_path1, repaired)
  666. self.assertIn(new_path2, repaired)
  667. def test_repair_worktree_with_relative_paths(self) -> None:
  668. """Test that repair handles worktrees with relative paths in gitdir."""
  669. # Create a worktree
  670. wt_path = os.path.join(self.tempdir, "worktree")
  671. add_worktree(self.repo, wt_path)
  672. # Manually move the worktree
  673. new_path = os.path.join(self.tempdir, "new-location")
  674. shutil.move(wt_path, new_path)
  675. # Repair from the new location
  676. repaired = repair_worktree(self.repo, paths=[new_path])
  677. # Should have repaired successfully
  678. self.assertEqual(len(repaired), 1)
  679. self.assertEqual(repaired[0], new_path)
  680. # Verify the gitdir pointer was updated
  681. from dulwich.repo import GITDIR, WORKTREES
  682. worktrees_dir = os.path.join(self.repo.controldir(), WORKTREES)
  683. for entry in os.listdir(worktrees_dir):
  684. gitdir_path = os.path.join(worktrees_dir, entry, GITDIR)
  685. if os.path.exists(gitdir_path):
  686. with open(gitdir_path, "rb") as f:
  687. content = f.read().strip()
  688. gitdir_location = os.fsdecode(content)
  689. # Should point to the new .git file location
  690. self.assertTrue(gitdir_location.endswith(".git"))
  691. def test_repair_worktree_container_method(self) -> None:
  692. """Test the WorkTreeContainer.repair() method."""
  693. # Create a worktree
  694. wt_path = os.path.join(self.tempdir, "worktree")
  695. add_worktree(self.repo, wt_path)
  696. # Manually move it
  697. new_path = os.path.join(self.tempdir, "moved")
  698. shutil.move(wt_path, new_path)
  699. # Use the container method to repair
  700. repaired = self.repo.worktrees.repair(paths=[new_path])
  701. # Should have repaired
  702. self.assertEqual(len(repaired), 1)
  703. self.assertEqual(repaired[0], new_path)
  704. def test_repair_with_missing_gitdir_pointer(self) -> None:
  705. """Test repair when gitdir pointer file is missing."""
  706. # Create a worktree
  707. wt_path = os.path.join(self.tempdir, "worktree")
  708. add_worktree(self.repo, wt_path)
  709. # Find and remove the gitdir pointer file
  710. from dulwich.repo import GITDIR, WORKTREES
  711. worktrees_dir = os.path.join(self.repo.controldir(), WORKTREES)
  712. for entry in os.listdir(worktrees_dir):
  713. gitdir_path = os.path.join(worktrees_dir, entry, GITDIR)
  714. if os.path.exists(gitdir_path):
  715. os.remove(gitdir_path)
  716. # Repair should not crash, but won't repair anything
  717. repaired = repair_worktree(self.repo, paths=[wt_path])
  718. self.assertEqual(len(repaired), 0)
  719. def test_repair_worktree_with_corrupted_git_file(self) -> None:
  720. """Test repair with a corrupted .git file."""
  721. # Create a worktree
  722. wt_path = os.path.join(self.tempdir, "worktree")
  723. add_worktree(self.repo, wt_path)
  724. # Corrupt the .git file
  725. gitdir_file = os.path.join(wt_path, ".git")
  726. with open(gitdir_file, "wb") as f:
  727. f.write(b"invalid content\n")
  728. # Attempting to repair should raise an error
  729. with self.assertRaises(ValueError) as cm:
  730. repair_worktree(self.repo, paths=[wt_path])
  731. self.assertIn("Invalid .git file", str(cm.exception))
  732. class TemporaryWorktreeTests(TestCase):
  733. """Tests for temporary_worktree context manager."""
  734. def setUp(self) -> None:
  735. super().setUp()
  736. self.tempdir = tempfile.mkdtemp()
  737. self.addCleanup(shutil.rmtree, self.tempdir)
  738. self.repo_path = os.path.join(self.tempdir, "repo")
  739. self.repo = Repo.init(self.repo_path, mkdir=True)
  740. # Create an initial commit so HEAD exists
  741. readme_path = os.path.join(self.repo_path, "README.md")
  742. with open(readme_path, "w") as f:
  743. f.write("# Test Repository\n")
  744. wt = self.repo.get_worktree()
  745. wt.stage(["README.md"])
  746. wt.commit(message=b"Initial commit")
  747. def test_temporary_worktree_creates_and_cleans_up(self) -> None:
  748. """Test that temporary worktree is created and cleaned up."""
  749. worktree_path = None
  750. # Use the context manager
  751. with temporary_worktree(self.repo) as worktree:
  752. worktree_path = worktree.path
  753. # Check that worktree exists
  754. self.assertTrue(os.path.exists(worktree_path))
  755. # Check that it's in the list of worktrees
  756. worktrees = list_worktrees(self.repo)
  757. paths = [wt.path for wt in worktrees]
  758. self.assertIn(worktree_path, paths)
  759. # Check that .git file exists in worktree
  760. gitdir_file = os.path.join(worktree_path, ".git")
  761. self.assertTrue(os.path.exists(gitdir_file))
  762. # After context manager exits, check cleanup
  763. self.assertFalse(os.path.exists(worktree_path))
  764. # Check that it's no longer in the list of worktrees
  765. worktrees = list_worktrees(self.repo)
  766. paths = [wt.path for wt in worktrees]
  767. self.assertNotIn(worktree_path, paths)
  768. def test_temporary_worktree_with_custom_prefix(self) -> None:
  769. """Test temporary worktree with custom prefix."""
  770. custom_prefix = "my-custom-prefix-"
  771. with temporary_worktree(self.repo, prefix=custom_prefix) as worktree:
  772. # Check that the directory name starts with our prefix
  773. dirname = os.path.basename(worktree.path)
  774. self.assertTrue(dirname.startswith(custom_prefix))
  775. def test_temporary_worktree_cleanup_on_exception(self) -> None:
  776. """Test that cleanup happens even when exception is raised."""
  777. worktree_path = None
  778. class TestException(Exception):
  779. pass
  780. try:
  781. with temporary_worktree(self.repo) as worktree:
  782. worktree_path = worktree.path
  783. self.assertTrue(os.path.exists(worktree_path))
  784. raise TestException("Test exception")
  785. except TestException:
  786. pass
  787. # Cleanup should still happen
  788. self.assertFalse(os.path.exists(worktree_path))
  789. # Check that it's no longer in the list of worktrees
  790. worktrees = list_worktrees(self.repo)
  791. paths = [wt.path for wt in worktrees]
  792. self.assertNotIn(worktree_path, paths)
  793. def test_temporary_worktree_operations(self) -> None:
  794. """Test that operations can be performed in temporary worktree."""
  795. # Create a test file in main repo
  796. test_file = os.path.join(self.repo_path, "test.txt")
  797. with open(test_file, "w") as f:
  798. f.write("Hello, world!")
  799. wt = self.repo.get_worktree()
  800. wt.stage(["test.txt"])
  801. wt.commit(message=b"Initial commit")
  802. with temporary_worktree(self.repo) as worktree:
  803. # Check that the file exists in the worktree
  804. wt_test_file = os.path.join(worktree.path, "test.txt")
  805. self.assertTrue(os.path.exists(wt_test_file))
  806. # Read and verify content
  807. with open(wt_test_file) as f:
  808. content = f.read()
  809. self.assertEqual(content, "Hello, world!")
  810. # Make changes in the worktree
  811. with open(wt_test_file, "w") as f:
  812. f.write("Modified content")
  813. # Changes should be visible as unstaged
  814. unstaged = get_unstaged_changes(worktree)
  815. self.assertIn(os.fsencode("test.txt"), unstaged)