test_worktree.py 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003
  1. # test_worktree.py -- Tests for dulwich.worktree
  2. # Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for dulwich.worktree."""
  22. import os
  23. import shutil
  24. import stat
  25. import tempfile
  26. from unittest import skipIf
  27. from dulwich import porcelain
  28. from dulwich.errors import CommitError
  29. from dulwich.object_store import tree_lookup_path
  30. from dulwich.repo import Repo
  31. from dulwich.worktree import (
  32. WorkTree,
  33. add_worktree,
  34. list_worktrees,
  35. lock_worktree,
  36. move_worktree,
  37. prune_worktrees,
  38. remove_worktree,
  39. repair_worktree,
  40. temporary_worktree,
  41. unlock_worktree,
  42. )
  43. from . import TestCase
  44. class WorkTreeTestCase(TestCase):
  45. """Base test case for WorkTree tests."""
  46. def setUp(self):
  47. super().setUp()
  48. self.tempdir = tempfile.mkdtemp()
  49. self.test_dir = os.path.join(self.tempdir, "main")
  50. self.repo = Repo.init(self.test_dir, mkdir=True)
  51. # Create initial commit with a file
  52. with open(os.path.join(self.test_dir, "a"), "wb") as f:
  53. f.write(b"contents of file a")
  54. self.repo.get_worktree().stage(["a"])
  55. self.root_commit = self.repo.get_worktree().commit(
  56. message=b"Initial commit",
  57. committer=b"Test Committer <test@nodomain.com>",
  58. author=b"Test Author <test@nodomain.com>",
  59. commit_timestamp=12345,
  60. commit_timezone=0,
  61. author_timestamp=12345,
  62. author_timezone=0,
  63. )
  64. self.worktree = self.repo.get_worktree()
  65. def tearDown(self):
  66. self.repo.close()
  67. super().tearDown()
  68. def write_file(self, filename, content):
  69. """Helper to write a file in the repo."""
  70. with open(os.path.join(self.test_dir, filename), "wb") as f:
  71. f.write(content)
  72. class WorkTreeInitTests(TestCase):
  73. """Tests for WorkTree initialization."""
  74. def test_init_with_repo_path(self):
  75. """Test WorkTree initialization with same path as repo."""
  76. with tempfile.TemporaryDirectory() as tmpdir:
  77. repo = Repo.init(tmpdir)
  78. worktree = WorkTree(repo, tmpdir)
  79. self.assertEqual(worktree.path, tmpdir)
  80. self.assertEqual(worktree._repo, repo)
  81. self.assertTrue(os.path.isabs(worktree.path))
  82. def test_init_with_different_path(self):
  83. """Test WorkTree initialization with different path from repo."""
  84. with tempfile.TemporaryDirectory() as tmpdir:
  85. repo_path = os.path.join(tmpdir, "repo")
  86. worktree_path = os.path.join(tmpdir, "worktree")
  87. os.makedirs(repo_path)
  88. os.makedirs(worktree_path)
  89. repo = Repo.init(repo_path)
  90. worktree = WorkTree(repo, worktree_path)
  91. self.assertNotEqual(worktree.path, repo.path)
  92. self.assertEqual(worktree.path, worktree_path)
  93. self.assertEqual(worktree._repo, repo)
  94. self.assertTrue(os.path.isabs(worktree.path))
  95. def test_init_with_bytes_path(self):
  96. """Test WorkTree initialization with bytes path."""
  97. with tempfile.TemporaryDirectory() as tmpdir:
  98. repo = Repo.init(tmpdir)
  99. worktree = WorkTree(repo, tmpdir.encode("utf-8"))
  100. self.assertEqual(worktree.path, tmpdir)
  101. self.assertIsInstance(worktree.path, str)
  102. class WorkTreeStagingTests(WorkTreeTestCase):
  103. """Tests for WorkTree staging operations."""
  104. def test_stage_absolute(self):
  105. """Test that staging with absolute paths raises ValueError."""
  106. r = self.repo
  107. os.remove(os.path.join(r.path, "a"))
  108. self.assertRaises(ValueError, self.worktree.stage, [os.path.join(r.path, "a")])
  109. def test_stage_deleted(self):
  110. """Test staging a deleted file."""
  111. r = self.repo
  112. os.remove(os.path.join(r.path, "a"))
  113. self.worktree.stage(["a"])
  114. self.worktree.stage(["a"]) # double-stage a deleted path
  115. self.assertEqual([], list(r.open_index()))
  116. def test_stage_directory(self):
  117. """Test staging a directory."""
  118. r = self.repo
  119. os.mkdir(os.path.join(r.path, "c"))
  120. self.worktree.stage(["c"])
  121. self.assertEqual([b"a"], list(r.open_index()))
  122. def test_stage_submodule(self):
  123. """Test staging a submodule."""
  124. r = self.repo
  125. s = Repo.init(os.path.join(r.path, "sub"), mkdir=True)
  126. s.get_worktree().commit(
  127. message=b"message",
  128. )
  129. self.worktree.stage(["sub"])
  130. self.assertEqual([b"a", b"sub"], list(r.open_index()))
  131. class WorkTreeUnstagingTests(WorkTreeTestCase):
  132. """Tests for WorkTree unstaging operations."""
  133. def test_unstage_modify_file_with_dir(self):
  134. """Test unstaging a modified file in a directory."""
  135. os.mkdir(os.path.join(self.repo.path, "new_dir"))
  136. full_path = os.path.join(self.repo.path, "new_dir", "foo")
  137. with open(full_path, "w") as f:
  138. f.write("hello")
  139. porcelain.add(self.repo, paths=[full_path])
  140. porcelain.commit(
  141. self.repo,
  142. message=b"unittest",
  143. committer=b"Jane <jane@example.com>",
  144. author=b"John <john@example.com>",
  145. )
  146. with open(full_path, "a") as f:
  147. f.write("something new")
  148. self.worktree.unstage(["new_dir/foo"])
  149. status = list(porcelain.status(self.repo))
  150. self.assertEqual(
  151. [{"add": [], "delete": [], "modify": []}, [b"new_dir/foo"], []], status
  152. )
  153. def test_unstage_while_no_commit(self):
  154. """Test unstaging when there are no commits."""
  155. file = "foo"
  156. full_path = os.path.join(self.repo.path, file)
  157. with open(full_path, "w") as f:
  158. f.write("hello")
  159. porcelain.add(self.repo, paths=[full_path])
  160. self.worktree.unstage([file])
  161. status = list(porcelain.status(self.repo))
  162. self.assertEqual([{"add": [], "delete": [], "modify": []}, [], ["foo"]], status)
  163. def test_unstage_add_file(self):
  164. """Test unstaging a newly added file."""
  165. file = "foo"
  166. full_path = os.path.join(self.repo.path, file)
  167. porcelain.commit(
  168. self.repo,
  169. message=b"unittest",
  170. committer=b"Jane <jane@example.com>",
  171. author=b"John <john@example.com>",
  172. )
  173. with open(full_path, "w") as f:
  174. f.write("hello")
  175. porcelain.add(self.repo, paths=[full_path])
  176. self.worktree.unstage([file])
  177. status = list(porcelain.status(self.repo))
  178. self.assertEqual([{"add": [], "delete": [], "modify": []}, [], ["foo"]], status)
  179. def test_unstage_modify_file(self):
  180. """Test unstaging a modified file."""
  181. file = "foo"
  182. full_path = os.path.join(self.repo.path, file)
  183. with open(full_path, "w") as f:
  184. f.write("hello")
  185. porcelain.add(self.repo, paths=[full_path])
  186. porcelain.commit(
  187. self.repo,
  188. message=b"unittest",
  189. committer=b"Jane <jane@example.com>",
  190. author=b"John <john@example.com>",
  191. )
  192. with open(full_path, "a") as f:
  193. f.write("broken")
  194. porcelain.add(self.repo, paths=[full_path])
  195. self.worktree.unstage([file])
  196. status = list(porcelain.status(self.repo))
  197. self.assertEqual(
  198. [{"add": [], "delete": [], "modify": []}, [b"foo"], []], status
  199. )
  200. def test_unstage_remove_file(self):
  201. """Test unstaging a removed file."""
  202. file = "foo"
  203. full_path = os.path.join(self.repo.path, file)
  204. with open(full_path, "w") as f:
  205. f.write("hello")
  206. porcelain.add(self.repo, paths=[full_path])
  207. porcelain.commit(
  208. self.repo,
  209. message=b"unittest",
  210. committer=b"Jane <jane@example.com>",
  211. author=b"John <john@example.com>",
  212. )
  213. os.remove(full_path)
  214. self.worktree.unstage([file])
  215. status = list(porcelain.status(self.repo))
  216. self.assertEqual(
  217. [{"add": [], "delete": [], "modify": []}, [b"foo"], []], status
  218. )
  219. class WorkTreeCommitTests(WorkTreeTestCase):
  220. """Tests for WorkTree commit operations."""
  221. def test_commit_modified(self):
  222. """Test committing a modified file."""
  223. r = self.repo
  224. with open(os.path.join(r.path, "a"), "wb") as f:
  225. f.write(b"new contents")
  226. self.worktree.stage(["a"])
  227. commit_sha = self.worktree.commit(
  228. b"modified a",
  229. committer=b"Test Committer <test@nodomain.com>",
  230. author=b"Test Author <test@nodomain.com>",
  231. commit_timestamp=12395,
  232. commit_timezone=0,
  233. author_timestamp=12395,
  234. author_timezone=0,
  235. )
  236. self.assertEqual([self.root_commit], r[commit_sha].parents)
  237. a_mode, a_id = tree_lookup_path(r.get_object, r[commit_sha].tree, b"a")
  238. self.assertEqual(stat.S_IFREG | 0o644, a_mode)
  239. self.assertEqual(b"new contents", r[a_id].data)
  240. @skipIf(not getattr(os, "symlink", None), "Requires symlink support")
  241. def test_commit_symlink(self):
  242. """Test committing a symlink."""
  243. r = self.repo
  244. os.symlink("a", os.path.join(r.path, "b"))
  245. self.worktree.stage(["a", "b"])
  246. commit_sha = self.worktree.commit(
  247. b"Symlink b",
  248. committer=b"Test Committer <test@nodomain.com>",
  249. author=b"Test Author <test@nodomain.com>",
  250. commit_timestamp=12395,
  251. commit_timezone=0,
  252. author_timestamp=12395,
  253. author_timezone=0,
  254. )
  255. self.assertEqual([self.root_commit], r[commit_sha].parents)
  256. b_mode, b_id = tree_lookup_path(r.get_object, r[commit_sha].tree, b"b")
  257. self.assertEqual(stat.S_IFLNK, b_mode)
  258. self.assertEqual(b"a", r[b_id].data)
  259. class WorkTreeResetTests(WorkTreeTestCase):
  260. """Tests for WorkTree reset operations."""
  261. def test_reset_index(self):
  262. """Test resetting the index."""
  263. # Make some changes and stage them
  264. with open(os.path.join(self.repo.path, "a"), "wb") as f:
  265. f.write(b"modified contents")
  266. self.worktree.stage(["a"])
  267. # Reset index should restore to HEAD
  268. self.worktree.reset_index()
  269. # Check that the working tree file was restored
  270. with open(os.path.join(self.repo.path, "a"), "rb") as f:
  271. contents = f.read()
  272. self.assertEqual(b"contents of file a", contents)
  273. class WorkTreeSparseCheckoutTests(WorkTreeTestCase):
  274. """Tests for WorkTree sparse checkout operations."""
  275. def test_get_sparse_checkout_patterns_empty(self):
  276. """Test getting sparse checkout patterns when file doesn't exist."""
  277. patterns = self.worktree.get_sparse_checkout_patterns()
  278. self.assertEqual([], patterns)
  279. def test_set_sparse_checkout_patterns(self):
  280. """Test setting sparse checkout patterns."""
  281. patterns = ["*.py", "docs/"]
  282. self.worktree.set_sparse_checkout_patterns(patterns)
  283. # Read back the patterns
  284. retrieved_patterns = self.worktree.get_sparse_checkout_patterns()
  285. self.assertEqual(patterns, retrieved_patterns)
  286. def test_configure_for_cone_mode(self):
  287. """Test configuring repository for cone mode."""
  288. self.worktree.configure_for_cone_mode()
  289. config = self.repo.get_config()
  290. self.assertEqual(b"true", config.get((b"core",), b"sparseCheckout"))
  291. self.assertEqual(b"true", config.get((b"core",), b"sparseCheckoutCone"))
  292. def test_infer_cone_mode_false(self):
  293. """Test inferring cone mode when not configured."""
  294. self.assertFalse(self.worktree.infer_cone_mode())
  295. def test_infer_cone_mode_true(self):
  296. """Test inferring cone mode when configured."""
  297. self.worktree.configure_for_cone_mode()
  298. self.assertTrue(self.worktree.infer_cone_mode())
  299. def test_set_cone_mode_patterns(self):
  300. """Test setting cone mode patterns."""
  301. dirs = ["src", "tests"]
  302. self.worktree.set_cone_mode_patterns(dirs)
  303. patterns = self.worktree.get_sparse_checkout_patterns()
  304. expected = ["/*", "!/*/", "/src/", "/tests/"]
  305. self.assertEqual(expected, patterns)
  306. def test_set_cone_mode_patterns_empty(self):
  307. """Test setting cone mode patterns with empty list."""
  308. self.worktree.set_cone_mode_patterns([])
  309. patterns = self.worktree.get_sparse_checkout_patterns()
  310. expected = ["/*", "!/*/"]
  311. self.assertEqual(expected, patterns)
  312. def test_set_cone_mode_patterns_duplicates(self):
  313. """Test that duplicate patterns are not added."""
  314. dirs = ["src", "src"] # duplicate
  315. self.worktree.set_cone_mode_patterns(dirs)
  316. patterns = self.worktree.get_sparse_checkout_patterns()
  317. expected = ["/*", "!/*/", "/src/"]
  318. self.assertEqual(expected, patterns)
  319. def test_sparse_checkout_file_path(self):
  320. """Test getting the sparse checkout file path."""
  321. expected_path = os.path.join(self.repo.controldir(), "info", "sparse-checkout")
  322. actual_path = self.worktree._sparse_checkout_file_path()
  323. self.assertEqual(expected_path, actual_path)
  324. class WorkTreeBackwardCompatibilityTests(WorkTreeTestCase):
  325. """Tests for backward compatibility of deprecated Repo methods."""
  326. def test_deprecated_stage_delegates_to_worktree(self):
  327. """Test that deprecated Repo.stage delegates to WorkTree."""
  328. with open(os.path.join(self.repo.path, "new_file"), "w") as f:
  329. f.write("test content")
  330. # This should show a deprecation warning but still work
  331. import warnings
  332. with warnings.catch_warnings(record=True) as w:
  333. warnings.simplefilter("always")
  334. self.repo.stage(
  335. ["new_file"]
  336. ) # Call deprecated method on Repo, not WorkTree
  337. self.assertTrue(len(w) > 0)
  338. self.assertTrue(issubclass(w[0].category, DeprecationWarning))
  339. def test_deprecated_unstage_delegates_to_worktree(self):
  340. """Test that deprecated Repo.unstage delegates to WorkTree."""
  341. # This should show a deprecation warning but still work
  342. import warnings
  343. with warnings.catch_warnings(record=True) as w:
  344. warnings.simplefilter("always")
  345. self.repo.unstage(["a"]) # Call deprecated method on Repo, not WorkTree
  346. self.assertTrue(len(w) > 0)
  347. self.assertTrue(issubclass(w[0].category, DeprecationWarning))
  348. def test_deprecated_sparse_checkout_methods(self):
  349. """Test that deprecated sparse checkout methods delegate to WorkTree."""
  350. import warnings
  351. # Test get_sparse_checkout_patterns
  352. with warnings.catch_warnings(record=True) as w:
  353. warnings.simplefilter("always")
  354. patterns = (
  355. self.repo.get_sparse_checkout_patterns()
  356. ) # Call deprecated method on Repo
  357. self.assertEqual([], patterns)
  358. self.assertTrue(len(w) > 0)
  359. self.assertTrue(issubclass(w[0].category, DeprecationWarning))
  360. # Test set_sparse_checkout_patterns
  361. with warnings.catch_warnings(record=True) as w:
  362. warnings.simplefilter("always")
  363. self.repo.set_sparse_checkout_patterns(
  364. ["*.py"]
  365. ) # Call deprecated method on Repo
  366. self.assertTrue(len(w) > 0)
  367. self.assertTrue(issubclass(w[0].category, DeprecationWarning))
  368. def test_pre_commit_hook_fail(self):
  369. """Test that failing pre-commit hook raises CommitError."""
  370. if os.name != "posix":
  371. self.skipTest("shell hook tests requires POSIX shell")
  372. # Create a failing pre-commit hook
  373. hooks_dir = os.path.join(self.repo.controldir(), "hooks")
  374. os.makedirs(hooks_dir, exist_ok=True)
  375. hook_path = os.path.join(hooks_dir, "pre-commit")
  376. with open(hook_path, "w") as f:
  377. f.write("#!/bin/sh\nexit 1\n")
  378. os.chmod(hook_path, 0o755)
  379. # Try to commit
  380. worktree = self.repo.get_worktree()
  381. with self.assertRaises(CommitError):
  382. worktree.commit(b"No message")
  383. def write_file(self, filename, content):
  384. """Helper to write a file in the repo."""
  385. with open(os.path.join(self.test_dir, filename), "wb") as f:
  386. f.write(content)
  387. class WorkTreeOperationsTests(WorkTreeTestCase):
  388. """Tests for worktree operations like add, list, remove."""
  389. def test_list_worktrees_single(self) -> None:
  390. """Test listing worktrees when only main worktree exists."""
  391. worktrees = list_worktrees(self.repo)
  392. self.assertEqual(len(worktrees), 1)
  393. self.assertEqual(worktrees[0].path, self.repo.path)
  394. self.assertEqual(worktrees[0].bare, False)
  395. self.assertIsNotNone(worktrees[0].head)
  396. self.assertIsNotNone(worktrees[0].branch)
  397. def test_add_worktree_new_branch(self) -> None:
  398. """Test adding a worktree with a new branch."""
  399. # Create a commit first
  400. worktree = self.repo.get_worktree()
  401. self.write_file("test.txt", b"test content")
  402. worktree.stage(["test.txt"])
  403. commit_id = worktree.commit(message=b"Initial commit")
  404. # Add a new worktree
  405. wt_path = os.path.join(self.tempdir, "new-worktree")
  406. add_worktree(self.repo, wt_path, branch=b"feature-branch")
  407. # Verify worktree was created
  408. self.assertTrue(os.path.exists(wt_path))
  409. self.assertTrue(os.path.exists(os.path.join(wt_path, ".git")))
  410. # Verify it appears in the list
  411. worktrees = list_worktrees(self.repo)
  412. self.assertEqual(len(worktrees), 2)
  413. # Find the new worktree in the list
  414. new_wt = None
  415. for wt in worktrees:
  416. if wt.path == wt_path:
  417. new_wt = wt
  418. break
  419. self.assertIsNotNone(new_wt)
  420. self.assertEqual(new_wt.branch, b"refs/heads/feature-branch")
  421. self.assertEqual(new_wt.head, commit_id)
  422. self.assertFalse(new_wt.detached)
  423. def test_add_worktree_detached(self) -> None:
  424. """Test adding a worktree with detached HEAD."""
  425. # Create a commit
  426. worktree = self.repo.get_worktree()
  427. self.write_file("test.txt", b"test content")
  428. worktree.stage(["test.txt"])
  429. commit_id = worktree.commit(message=b"Initial commit")
  430. # Add a detached worktree
  431. wt_path = os.path.join(self.tempdir, "detached-worktree")
  432. add_worktree(self.repo, wt_path, commit=commit_id, detach=True)
  433. # Verify it's detached
  434. worktrees = list_worktrees(self.repo)
  435. self.assertEqual(len(worktrees), 2)
  436. for wt in worktrees:
  437. if wt.path == wt_path:
  438. self.assertTrue(wt.detached)
  439. self.assertIsNone(wt.branch)
  440. self.assertEqual(wt.head, commit_id)
  441. def test_add_worktree_existing_path(self) -> None:
  442. """Test that adding a worktree to existing path fails."""
  443. wt_path = os.path.join(self.tempdir, "existing")
  444. os.mkdir(wt_path)
  445. with self.assertRaises(ValueError) as cm:
  446. add_worktree(self.repo, wt_path)
  447. self.assertIn("Path already exists", str(cm.exception))
  448. def test_add_worktree_branch_already_checked_out(self) -> None:
  449. """Test that checking out same branch in multiple worktrees fails."""
  450. # Create initial commit
  451. worktree = self.repo.get_worktree()
  452. self.write_file("test.txt", b"test content")
  453. worktree.stage(["test.txt"])
  454. worktree.commit(message=b"Initial commit")
  455. # First worktree should succeed with a new branch
  456. wt_path1 = os.path.join(self.tempdir, "wt1")
  457. add_worktree(self.repo, wt_path1, branch=b"feature")
  458. # Second worktree with same branch should fail
  459. wt_path2 = os.path.join(self.tempdir, "wt2")
  460. with self.assertRaises(ValueError) as cm:
  461. add_worktree(self.repo, wt_path2, branch=b"feature")
  462. self.assertIn("already checked out", str(cm.exception))
  463. # But should work with force=True
  464. add_worktree(self.repo, wt_path2, branch=b"feature", force=True)
  465. def test_remove_worktree(self) -> None:
  466. """Test removing a worktree."""
  467. # Create a worktree
  468. wt_path = os.path.join(self.tempdir, "to-remove")
  469. add_worktree(self.repo, wt_path)
  470. # Verify it exists
  471. self.assertTrue(os.path.exists(wt_path))
  472. self.assertEqual(len(list_worktrees(self.repo)), 2)
  473. # Remove it
  474. remove_worktree(self.repo, wt_path)
  475. # Verify it's gone
  476. self.assertFalse(os.path.exists(wt_path))
  477. self.assertEqual(len(list_worktrees(self.repo)), 1)
  478. def test_remove_main_worktree_fails(self) -> None:
  479. """Test that removing the main worktree fails."""
  480. with self.assertRaises(ValueError) as cm:
  481. remove_worktree(self.repo, self.repo.path)
  482. self.assertIn("Cannot remove the main working tree", str(cm.exception))
  483. def test_remove_nonexistent_worktree(self) -> None:
  484. """Test that removing non-existent worktree fails."""
  485. with self.assertRaises(ValueError) as cm:
  486. remove_worktree(self.repo, "/nonexistent/path")
  487. self.assertIn("Worktree not found", str(cm.exception))
  488. def test_lock_unlock_worktree(self) -> None:
  489. """Test locking and unlocking a worktree."""
  490. # Create a worktree
  491. wt_path = os.path.join(self.tempdir, "lockable")
  492. add_worktree(self.repo, wt_path)
  493. # Lock it
  494. lock_worktree(self.repo, wt_path, reason="Testing lock")
  495. # Verify it's locked
  496. worktrees = list_worktrees(self.repo)
  497. for wt in worktrees:
  498. if wt.path == wt_path:
  499. self.assertTrue(wt.locked)
  500. # Try to remove locked worktree (should fail)
  501. with self.assertRaises(ValueError) as cm:
  502. remove_worktree(self.repo, wt_path)
  503. self.assertIn("locked", str(cm.exception))
  504. # Unlock it
  505. unlock_worktree(self.repo, wt_path)
  506. # Verify it's unlocked
  507. worktrees = list_worktrees(self.repo)
  508. for wt in worktrees:
  509. if wt.path == wt_path:
  510. self.assertFalse(wt.locked)
  511. # Now removal should work
  512. remove_worktree(self.repo, wt_path)
  513. def test_prune_worktrees(self) -> None:
  514. """Test pruning worktrees."""
  515. # Create a worktree
  516. wt_path = os.path.join(self.tempdir, "to-prune")
  517. add_worktree(self.repo, wt_path)
  518. # Manually remove the worktree directory
  519. shutil.rmtree(wt_path)
  520. # Verify it still shows up as prunable
  521. worktrees = list_worktrees(self.repo)
  522. prunable_count = sum(1 for wt in worktrees if wt.prunable)
  523. self.assertEqual(prunable_count, 1)
  524. # Prune it
  525. pruned = prune_worktrees(self.repo)
  526. self.assertEqual(len(pruned), 1)
  527. # Verify it's gone from the list
  528. worktrees = list_worktrees(self.repo)
  529. self.assertEqual(len(worktrees), 1)
  530. def test_prune_dry_run(self) -> None:
  531. """Test prune with dry_run doesn't remove anything."""
  532. # Create and manually remove a worktree
  533. wt_path = os.path.join(self.tempdir, "dry-run-test")
  534. add_worktree(self.repo, wt_path)
  535. shutil.rmtree(wt_path)
  536. # Dry run should report but not remove
  537. pruned = prune_worktrees(self.repo, dry_run=True)
  538. self.assertEqual(len(pruned), 1)
  539. # Worktree should still be in list
  540. worktrees = list_worktrees(self.repo)
  541. self.assertEqual(len(worktrees), 2)
  542. def test_prune_locked_worktree_not_pruned(self) -> None:
  543. """Test that locked worktrees are not pruned."""
  544. # Create and lock a worktree
  545. wt_path = os.path.join(self.tempdir, "locked-prune")
  546. add_worktree(self.repo, wt_path)
  547. lock_worktree(self.repo, wt_path)
  548. # Remove the directory
  549. shutil.rmtree(wt_path)
  550. # Prune should not remove locked worktree
  551. pruned = prune_worktrees(self.repo)
  552. self.assertEqual(len(pruned), 0)
  553. # Worktree should still be in list
  554. worktrees = list_worktrees(self.repo)
  555. self.assertEqual(len(worktrees), 2)
  556. def test_move_worktree(self) -> None:
  557. """Test moving a worktree."""
  558. # Create a worktree
  559. wt_path = os.path.join(self.tempdir, "to-move")
  560. add_worktree(self.repo, wt_path)
  561. # Create a file in the worktree
  562. test_file = os.path.join(wt_path, "test.txt")
  563. with open(test_file, "w") as f:
  564. f.write("test content")
  565. # Move it
  566. new_path = os.path.join(self.tempdir, "moved")
  567. move_worktree(self.repo, wt_path, new_path)
  568. # Verify old path doesn't exist
  569. self.assertFalse(os.path.exists(wt_path))
  570. # Verify new path exists with contents
  571. self.assertTrue(os.path.exists(new_path))
  572. self.assertTrue(os.path.exists(os.path.join(new_path, "test.txt")))
  573. # Verify it's in the list at new location
  574. worktrees = list_worktrees(self.repo)
  575. paths = [wt.path for wt in worktrees]
  576. self.assertIn(new_path, paths)
  577. self.assertNotIn(wt_path, paths)
  578. def test_move_main_worktree_fails(self) -> None:
  579. """Test that moving the main worktree fails."""
  580. new_path = os.path.join(self.tempdir, "new-main")
  581. with self.assertRaises(ValueError) as cm:
  582. move_worktree(self.repo, self.repo.path, new_path)
  583. self.assertIn("Cannot move the main working tree", str(cm.exception))
  584. def test_move_to_existing_path_fails(self) -> None:
  585. """Test that moving to an existing path fails."""
  586. # Create a worktree
  587. wt_path = os.path.join(self.tempdir, "worktree")
  588. add_worktree(self.repo, wt_path)
  589. # Create target directory
  590. new_path = os.path.join(self.tempdir, "existing")
  591. os.makedirs(new_path)
  592. with self.assertRaises(ValueError) as cm:
  593. move_worktree(self.repo, wt_path, new_path)
  594. self.assertIn("Path already exists", str(cm.exception))
  595. def test_repair_worktree_after_manual_move(self) -> None:
  596. """Test repairing a worktree after manually moving it."""
  597. # Create a worktree
  598. wt_path = os.path.join(self.tempdir, "original")
  599. add_worktree(self.repo, wt_path)
  600. # Manually move the worktree directory (simulating external move)
  601. new_path = os.path.join(self.tempdir, "moved")
  602. shutil.move(wt_path, new_path)
  603. # At this point, the connection is broken
  604. # Repair from the moved worktree
  605. repaired = repair_worktree(self.repo, paths=[new_path])
  606. # Should have repaired the worktree
  607. self.assertEqual(len(repaired), 1)
  608. self.assertEqual(repaired[0], new_path)
  609. # Verify the worktree is now properly connected
  610. worktrees = list_worktrees(self.repo)
  611. paths = [wt.path for wt in worktrees]
  612. self.assertIn(new_path, paths)
  613. def test_repair_worktree_from_main_repo(self) -> None:
  614. """Test repairing worktree connections from main repository."""
  615. # Create a worktree
  616. wt_path = os.path.join(self.tempdir, "worktree")
  617. add_worktree(self.repo, wt_path)
  618. # Read the .git file to get the control directory
  619. gitdir_file = os.path.join(wt_path, ".git")
  620. with open(gitdir_file, "rb") as f:
  621. content = f.read().strip()
  622. control_dir = content[8:].decode() # Remove "gitdir: " prefix
  623. # Manually corrupt the .git file to point to wrong location
  624. with open(gitdir_file, "wb") as f:
  625. f.write(b"gitdir: /wrong/path\n")
  626. # Repair from main repository
  627. repaired = repair_worktree(self.repo)
  628. # Should have repaired the connection
  629. self.assertEqual(len(repaired), 1)
  630. self.assertEqual(repaired[0], wt_path)
  631. # Verify .git file now points to correct location
  632. with open(gitdir_file, "rb") as f:
  633. content = f.read().strip()
  634. new_control_dir = content[8:].decode()
  635. self.assertEqual(
  636. os.path.abspath(new_control_dir), os.path.abspath(control_dir)
  637. )
  638. def test_repair_worktree_no_repairs_needed(self) -> None:
  639. """Test repair when no repairs are needed."""
  640. # Create a worktree
  641. wt_path = os.path.join(self.tempdir, "worktree")
  642. add_worktree(self.repo, wt_path)
  643. # Repair - should return empty list since nothing is broken
  644. repaired = repair_worktree(self.repo)
  645. self.assertEqual(len(repaired), 0)
  646. def test_repair_invalid_worktree_path(self) -> None:
  647. """Test that repairing an invalid path raises an error."""
  648. with self.assertRaises(ValueError) as cm:
  649. repair_worktree(self.repo, paths=["/nonexistent/path"])
  650. self.assertIn("Not a valid worktree", str(cm.exception))
  651. def test_repair_multiple_worktrees(self) -> None:
  652. """Test repairing multiple worktrees at once."""
  653. # Create two worktrees
  654. wt_path1 = os.path.join(self.tempdir, "wt1")
  655. wt_path2 = os.path.join(self.tempdir, "wt2")
  656. add_worktree(self.repo, wt_path1, branch=b"branch1")
  657. add_worktree(self.repo, wt_path2, branch=b"branch2")
  658. # Manually move both worktrees
  659. new_path1 = os.path.join(self.tempdir, "moved1")
  660. new_path2 = os.path.join(self.tempdir, "moved2")
  661. shutil.move(wt_path1, new_path1)
  662. shutil.move(wt_path2, new_path2)
  663. # Repair both at once
  664. repaired = repair_worktree(self.repo, paths=[new_path1, new_path2])
  665. # Both should be repaired
  666. self.assertEqual(len(repaired), 2)
  667. self.assertIn(new_path1, repaired)
  668. self.assertIn(new_path2, repaired)
  669. def test_repair_worktree_with_relative_paths(self) -> None:
  670. """Test that repair handles worktrees with relative paths in gitdir."""
  671. # Create a worktree
  672. wt_path = os.path.join(self.tempdir, "worktree")
  673. add_worktree(self.repo, wt_path)
  674. # Manually move the worktree
  675. new_path = os.path.join(self.tempdir, "new-location")
  676. shutil.move(wt_path, new_path)
  677. # Repair from the new location
  678. repaired = repair_worktree(self.repo, paths=[new_path])
  679. # Should have repaired successfully
  680. self.assertEqual(len(repaired), 1)
  681. self.assertEqual(repaired[0], new_path)
  682. # Verify the gitdir pointer was updated
  683. from dulwich.repo import GITDIR, WORKTREES
  684. worktrees_dir = os.path.join(self.repo.controldir(), WORKTREES)
  685. for entry in os.listdir(worktrees_dir):
  686. gitdir_path = os.path.join(worktrees_dir, entry, GITDIR)
  687. if os.path.exists(gitdir_path):
  688. with open(gitdir_path, "rb") as f:
  689. content = f.read().strip()
  690. gitdir_location = os.fsdecode(content)
  691. # Should point to the new .git file location
  692. self.assertTrue(gitdir_location.endswith(".git"))
  693. def test_repair_worktree_container_method(self) -> None:
  694. """Test the WorkTreeContainer.repair() method."""
  695. # Create a worktree
  696. wt_path = os.path.join(self.tempdir, "worktree")
  697. add_worktree(self.repo, wt_path)
  698. # Manually move it
  699. new_path = os.path.join(self.tempdir, "moved")
  700. shutil.move(wt_path, new_path)
  701. # Use the container method to repair
  702. repaired = self.repo.worktrees.repair(paths=[new_path])
  703. # Should have repaired
  704. self.assertEqual(len(repaired), 1)
  705. self.assertEqual(repaired[0], new_path)
  706. def test_repair_with_missing_gitdir_pointer(self) -> None:
  707. """Test repair when gitdir pointer file is missing."""
  708. # Create a worktree
  709. wt_path = os.path.join(self.tempdir, "worktree")
  710. add_worktree(self.repo, wt_path)
  711. # Find and remove the gitdir pointer file
  712. from dulwich.repo import GITDIR, WORKTREES
  713. worktrees_dir = os.path.join(self.repo.controldir(), WORKTREES)
  714. for entry in os.listdir(worktrees_dir):
  715. gitdir_path = os.path.join(worktrees_dir, entry, GITDIR)
  716. if os.path.exists(gitdir_path):
  717. os.remove(gitdir_path)
  718. # Repair should not crash, but won't repair anything
  719. repaired = repair_worktree(self.repo, paths=[wt_path])
  720. self.assertEqual(len(repaired), 0)
  721. def test_repair_worktree_with_corrupted_git_file(self) -> None:
  722. """Test repair with a corrupted .git file."""
  723. # Create a worktree
  724. wt_path = os.path.join(self.tempdir, "worktree")
  725. add_worktree(self.repo, wt_path)
  726. # Corrupt the .git file
  727. gitdir_file = os.path.join(wt_path, ".git")
  728. with open(gitdir_file, "wb") as f:
  729. f.write(b"invalid content\n")
  730. # Attempting to repair should raise an error
  731. with self.assertRaises(ValueError) as cm:
  732. repair_worktree(self.repo, paths=[wt_path])
  733. self.assertIn("Invalid .git file", str(cm.exception))
  734. class TemporaryWorktreeTests(TestCase):
  735. """Tests for temporary_worktree context manager."""
  736. def setUp(self) -> None:
  737. super().setUp()
  738. self.tempdir = tempfile.mkdtemp()
  739. self.addCleanup(shutil.rmtree, self.tempdir)
  740. self.repo_path = os.path.join(self.tempdir, "repo")
  741. self.repo = Repo.init(self.repo_path, mkdir=True)
  742. # Create an initial commit so HEAD exists
  743. readme_path = os.path.join(self.repo_path, "README.md")
  744. with open(readme_path, "w") as f:
  745. f.write("# Test Repository\n")
  746. porcelain.add(self.repo, [readme_path])
  747. porcelain.commit(self.repo, message=b"Initial commit")
  748. def test_temporary_worktree_creates_and_cleans_up(self) -> None:
  749. """Test that temporary worktree is created and cleaned up."""
  750. worktree_path = None
  751. # Use the context manager
  752. with temporary_worktree(self.repo) as worktree:
  753. worktree_path = worktree.path
  754. # Check that worktree exists
  755. self.assertTrue(os.path.exists(worktree_path))
  756. # Check that it's in the list of worktrees
  757. worktrees = list_worktrees(self.repo)
  758. paths = [wt.path for wt in worktrees]
  759. self.assertIn(worktree_path, paths)
  760. # Check that .git file exists in worktree
  761. gitdir_file = os.path.join(worktree_path, ".git")
  762. self.assertTrue(os.path.exists(gitdir_file))
  763. # After context manager exits, check cleanup
  764. self.assertFalse(os.path.exists(worktree_path))
  765. # Check that it's no longer in the list of worktrees
  766. worktrees = list_worktrees(self.repo)
  767. paths = [wt.path for wt in worktrees]
  768. self.assertNotIn(worktree_path, paths)
  769. def test_temporary_worktree_with_custom_prefix(self) -> None:
  770. """Test temporary worktree with custom prefix."""
  771. custom_prefix = "my-custom-prefix-"
  772. with temporary_worktree(self.repo, prefix=custom_prefix) as worktree:
  773. # Check that the directory name starts with our prefix
  774. dirname = os.path.basename(worktree.path)
  775. self.assertTrue(dirname.startswith(custom_prefix))
  776. def test_temporary_worktree_cleanup_on_exception(self) -> None:
  777. """Test that cleanup happens even when exception is raised."""
  778. worktree_path = None
  779. class TestException(Exception):
  780. pass
  781. try:
  782. with temporary_worktree(self.repo) as worktree:
  783. worktree_path = worktree.path
  784. self.assertTrue(os.path.exists(worktree_path))
  785. raise TestException("Test exception")
  786. except TestException:
  787. pass
  788. # Cleanup should still happen
  789. self.assertFalse(os.path.exists(worktree_path))
  790. # Check that it's no longer in the list of worktrees
  791. worktrees = list_worktrees(self.repo)
  792. paths = [wt.path for wt in worktrees]
  793. self.assertNotIn(worktree_path, paths)
  794. def test_temporary_worktree_operations(self) -> None:
  795. """Test that operations can be performed in temporary worktree."""
  796. # Create a test file in main repo
  797. test_file = os.path.join(self.repo_path, "test.txt")
  798. with open(test_file, "w") as f:
  799. f.write("Hello, world!")
  800. porcelain.add(self.repo, [test_file])
  801. porcelain.commit(self.repo, message=b"Initial commit")
  802. with temporary_worktree(self.repo) as worktree:
  803. # Check that the file exists in the worktree
  804. wt_test_file = os.path.join(worktree.path, "test.txt")
  805. self.assertTrue(os.path.exists(wt_test_file))
  806. # Read and verify content
  807. with open(wt_test_file) as f:
  808. content = f.read()
  809. self.assertEqual(content, "Hello, world!")
  810. # Make changes in the worktree
  811. with open(wt_test_file, "w") as f:
  812. f.write("Modified content")
  813. # Changes should be visible in status
  814. status = porcelain.status(worktree)
  815. self.assertIn(b"test.txt", status.unstaged)