test_worktree.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828
  1. # test_worktree.py -- Tests for dulwich.worktree
  2. # Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for dulwich.worktree."""
  22. import os
  23. import shutil
  24. import stat
  25. import tempfile
  26. from unittest import skipIf
  27. from dulwich import porcelain
  28. from dulwich.errors import CommitError
  29. from dulwich.object_store import tree_lookup_path
  30. from dulwich.repo import Repo
  31. from dulwich.worktree import (
  32. WorkTree,
  33. add_worktree,
  34. list_worktrees,
  35. lock_worktree,
  36. move_worktree,
  37. prune_worktrees,
  38. remove_worktree,
  39. temporary_worktree,
  40. unlock_worktree,
  41. )
  42. from . import TestCase
  43. class WorkTreeTestCase(TestCase):
  44. """Base test case for WorkTree tests."""
  45. def setUp(self):
  46. super().setUp()
  47. self.tempdir = tempfile.mkdtemp()
  48. self.test_dir = os.path.join(self.tempdir, "main")
  49. self.repo = Repo.init(self.test_dir, mkdir=True)
  50. # Create initial commit with a file
  51. with open(os.path.join(self.test_dir, "a"), "wb") as f:
  52. f.write(b"contents of file a")
  53. self.repo.get_worktree().stage(["a"])
  54. self.root_commit = self.repo.get_worktree().commit(
  55. message=b"Initial commit",
  56. committer=b"Test Committer <test@nodomain.com>",
  57. author=b"Test Author <test@nodomain.com>",
  58. commit_timestamp=12345,
  59. commit_timezone=0,
  60. author_timestamp=12345,
  61. author_timezone=0,
  62. )
  63. self.worktree = self.repo.get_worktree()
  64. def tearDown(self):
  65. self.repo.close()
  66. super().tearDown()
  67. def write_file(self, filename, content):
  68. """Helper to write a file in the repo."""
  69. with open(os.path.join(self.test_dir, filename), "wb") as f:
  70. f.write(content)
  71. class WorkTreeInitTests(TestCase):
  72. """Tests for WorkTree initialization."""
  73. def test_init_with_repo_path(self):
  74. """Test WorkTree initialization with same path as repo."""
  75. with tempfile.TemporaryDirectory() as tmpdir:
  76. repo = Repo.init(tmpdir)
  77. worktree = WorkTree(repo, tmpdir)
  78. self.assertEqual(worktree.path, tmpdir)
  79. self.assertEqual(worktree._repo, repo)
  80. self.assertTrue(os.path.isabs(worktree.path))
  81. def test_init_with_different_path(self):
  82. """Test WorkTree initialization with different path from repo."""
  83. with tempfile.TemporaryDirectory() as tmpdir:
  84. repo_path = os.path.join(tmpdir, "repo")
  85. worktree_path = os.path.join(tmpdir, "worktree")
  86. os.makedirs(repo_path)
  87. os.makedirs(worktree_path)
  88. repo = Repo.init(repo_path)
  89. worktree = WorkTree(repo, worktree_path)
  90. self.assertNotEqual(worktree.path, repo.path)
  91. self.assertEqual(worktree.path, worktree_path)
  92. self.assertEqual(worktree._repo, repo)
  93. self.assertTrue(os.path.isabs(worktree.path))
  94. def test_init_with_bytes_path(self):
  95. """Test WorkTree initialization with bytes path."""
  96. with tempfile.TemporaryDirectory() as tmpdir:
  97. repo = Repo.init(tmpdir)
  98. worktree = WorkTree(repo, tmpdir.encode("utf-8"))
  99. self.assertEqual(worktree.path, tmpdir)
  100. self.assertIsInstance(worktree.path, str)
  101. class WorkTreeStagingTests(WorkTreeTestCase):
  102. """Tests for WorkTree staging operations."""
  103. def test_stage_absolute(self):
  104. """Test that staging with absolute paths raises ValueError."""
  105. r = self.repo
  106. os.remove(os.path.join(r.path, "a"))
  107. self.assertRaises(ValueError, self.worktree.stage, [os.path.join(r.path, "a")])
  108. def test_stage_deleted(self):
  109. """Test staging a deleted file."""
  110. r = self.repo
  111. os.remove(os.path.join(r.path, "a"))
  112. self.worktree.stage(["a"])
  113. self.worktree.stage(["a"]) # double-stage a deleted path
  114. self.assertEqual([], list(r.open_index()))
  115. def test_stage_directory(self):
  116. """Test staging a directory."""
  117. r = self.repo
  118. os.mkdir(os.path.join(r.path, "c"))
  119. self.worktree.stage(["c"])
  120. self.assertEqual([b"a"], list(r.open_index()))
  121. def test_stage_submodule(self):
  122. """Test staging a submodule."""
  123. r = self.repo
  124. s = Repo.init(os.path.join(r.path, "sub"), mkdir=True)
  125. s.get_worktree().commit(
  126. message=b"message",
  127. )
  128. self.worktree.stage(["sub"])
  129. self.assertEqual([b"a", b"sub"], list(r.open_index()))
  130. class WorkTreeUnstagingTests(WorkTreeTestCase):
  131. """Tests for WorkTree unstaging operations."""
  132. def test_unstage_modify_file_with_dir(self):
  133. """Test unstaging a modified file in a directory."""
  134. os.mkdir(os.path.join(self.repo.path, "new_dir"))
  135. full_path = os.path.join(self.repo.path, "new_dir", "foo")
  136. with open(full_path, "w") as f:
  137. f.write("hello")
  138. porcelain.add(self.repo, paths=[full_path])
  139. porcelain.commit(
  140. self.repo,
  141. message=b"unittest",
  142. committer=b"Jane <jane@example.com>",
  143. author=b"John <john@example.com>",
  144. )
  145. with open(full_path, "a") as f:
  146. f.write("something new")
  147. self.worktree.unstage(["new_dir/foo"])
  148. status = list(porcelain.status(self.repo))
  149. self.assertEqual(
  150. [{"add": [], "delete": [], "modify": []}, [b"new_dir/foo"], []], status
  151. )
  152. def test_unstage_while_no_commit(self):
  153. """Test unstaging when there are no commits."""
  154. file = "foo"
  155. full_path = os.path.join(self.repo.path, file)
  156. with open(full_path, "w") as f:
  157. f.write("hello")
  158. porcelain.add(self.repo, paths=[full_path])
  159. self.worktree.unstage([file])
  160. status = list(porcelain.status(self.repo))
  161. self.assertEqual([{"add": [], "delete": [], "modify": []}, [], ["foo"]], status)
  162. def test_unstage_add_file(self):
  163. """Test unstaging a newly added file."""
  164. file = "foo"
  165. full_path = os.path.join(self.repo.path, file)
  166. porcelain.commit(
  167. self.repo,
  168. message=b"unittest",
  169. committer=b"Jane <jane@example.com>",
  170. author=b"John <john@example.com>",
  171. )
  172. with open(full_path, "w") as f:
  173. f.write("hello")
  174. porcelain.add(self.repo, paths=[full_path])
  175. self.worktree.unstage([file])
  176. status = list(porcelain.status(self.repo))
  177. self.assertEqual([{"add": [], "delete": [], "modify": []}, [], ["foo"]], status)
  178. def test_unstage_modify_file(self):
  179. """Test unstaging a modified file."""
  180. file = "foo"
  181. full_path = os.path.join(self.repo.path, file)
  182. with open(full_path, "w") as f:
  183. f.write("hello")
  184. porcelain.add(self.repo, paths=[full_path])
  185. porcelain.commit(
  186. self.repo,
  187. message=b"unittest",
  188. committer=b"Jane <jane@example.com>",
  189. author=b"John <john@example.com>",
  190. )
  191. with open(full_path, "a") as f:
  192. f.write("broken")
  193. porcelain.add(self.repo, paths=[full_path])
  194. self.worktree.unstage([file])
  195. status = list(porcelain.status(self.repo))
  196. self.assertEqual(
  197. [{"add": [], "delete": [], "modify": []}, [b"foo"], []], status
  198. )
  199. def test_unstage_remove_file(self):
  200. """Test unstaging a removed file."""
  201. file = "foo"
  202. full_path = os.path.join(self.repo.path, file)
  203. with open(full_path, "w") as f:
  204. f.write("hello")
  205. porcelain.add(self.repo, paths=[full_path])
  206. porcelain.commit(
  207. self.repo,
  208. message=b"unittest",
  209. committer=b"Jane <jane@example.com>",
  210. author=b"John <john@example.com>",
  211. )
  212. os.remove(full_path)
  213. self.worktree.unstage([file])
  214. status = list(porcelain.status(self.repo))
  215. self.assertEqual(
  216. [{"add": [], "delete": [], "modify": []}, [b"foo"], []], status
  217. )
  218. class WorkTreeCommitTests(WorkTreeTestCase):
  219. """Tests for WorkTree commit operations."""
  220. def test_commit_modified(self):
  221. """Test committing a modified file."""
  222. r = self.repo
  223. with open(os.path.join(r.path, "a"), "wb") as f:
  224. f.write(b"new contents")
  225. self.worktree.stage(["a"])
  226. commit_sha = self.worktree.commit(
  227. b"modified a",
  228. committer=b"Test Committer <test@nodomain.com>",
  229. author=b"Test Author <test@nodomain.com>",
  230. commit_timestamp=12395,
  231. commit_timezone=0,
  232. author_timestamp=12395,
  233. author_timezone=0,
  234. )
  235. self.assertEqual([self.root_commit], r[commit_sha].parents)
  236. a_mode, a_id = tree_lookup_path(r.get_object, r[commit_sha].tree, b"a")
  237. self.assertEqual(stat.S_IFREG | 0o644, a_mode)
  238. self.assertEqual(b"new contents", r[a_id].data)
  239. @skipIf(not getattr(os, "symlink", None), "Requires symlink support")
  240. def test_commit_symlink(self):
  241. """Test committing a symlink."""
  242. r = self.repo
  243. os.symlink("a", os.path.join(r.path, "b"))
  244. self.worktree.stage(["a", "b"])
  245. commit_sha = self.worktree.commit(
  246. b"Symlink b",
  247. committer=b"Test Committer <test@nodomain.com>",
  248. author=b"Test Author <test@nodomain.com>",
  249. commit_timestamp=12395,
  250. commit_timezone=0,
  251. author_timestamp=12395,
  252. author_timezone=0,
  253. )
  254. self.assertEqual([self.root_commit], r[commit_sha].parents)
  255. b_mode, b_id = tree_lookup_path(r.get_object, r[commit_sha].tree, b"b")
  256. self.assertEqual(stat.S_IFLNK, b_mode)
  257. self.assertEqual(b"a", r[b_id].data)
  258. class WorkTreeResetTests(WorkTreeTestCase):
  259. """Tests for WorkTree reset operations."""
  260. def test_reset_index(self):
  261. """Test resetting the index."""
  262. # Make some changes and stage them
  263. with open(os.path.join(self.repo.path, "a"), "wb") as f:
  264. f.write(b"modified contents")
  265. self.worktree.stage(["a"])
  266. # Reset index should restore to HEAD
  267. self.worktree.reset_index()
  268. # Check that the working tree file was restored
  269. with open(os.path.join(self.repo.path, "a"), "rb") as f:
  270. contents = f.read()
  271. self.assertEqual(b"contents of file a", contents)
  272. class WorkTreeSparseCheckoutTests(WorkTreeTestCase):
  273. """Tests for WorkTree sparse checkout operations."""
  274. def test_get_sparse_checkout_patterns_empty(self):
  275. """Test getting sparse checkout patterns when file doesn't exist."""
  276. patterns = self.worktree.get_sparse_checkout_patterns()
  277. self.assertEqual([], patterns)
  278. def test_set_sparse_checkout_patterns(self):
  279. """Test setting sparse checkout patterns."""
  280. patterns = ["*.py", "docs/"]
  281. self.worktree.set_sparse_checkout_patterns(patterns)
  282. # Read back the patterns
  283. retrieved_patterns = self.worktree.get_sparse_checkout_patterns()
  284. self.assertEqual(patterns, retrieved_patterns)
  285. def test_configure_for_cone_mode(self):
  286. """Test configuring repository for cone mode."""
  287. self.worktree.configure_for_cone_mode()
  288. config = self.repo.get_config()
  289. self.assertEqual(b"true", config.get((b"core",), b"sparseCheckout"))
  290. self.assertEqual(b"true", config.get((b"core",), b"sparseCheckoutCone"))
  291. def test_infer_cone_mode_false(self):
  292. """Test inferring cone mode when not configured."""
  293. self.assertFalse(self.worktree.infer_cone_mode())
  294. def test_infer_cone_mode_true(self):
  295. """Test inferring cone mode when configured."""
  296. self.worktree.configure_for_cone_mode()
  297. self.assertTrue(self.worktree.infer_cone_mode())
  298. def test_set_cone_mode_patterns(self):
  299. """Test setting cone mode patterns."""
  300. dirs = ["src", "tests"]
  301. self.worktree.set_cone_mode_patterns(dirs)
  302. patterns = self.worktree.get_sparse_checkout_patterns()
  303. expected = ["/*", "!/*/", "/src/", "/tests/"]
  304. self.assertEqual(expected, patterns)
  305. def test_set_cone_mode_patterns_empty(self):
  306. """Test setting cone mode patterns with empty list."""
  307. self.worktree.set_cone_mode_patterns([])
  308. patterns = self.worktree.get_sparse_checkout_patterns()
  309. expected = ["/*", "!/*/"]
  310. self.assertEqual(expected, patterns)
  311. def test_set_cone_mode_patterns_duplicates(self):
  312. """Test that duplicate patterns are not added."""
  313. dirs = ["src", "src"] # duplicate
  314. self.worktree.set_cone_mode_patterns(dirs)
  315. patterns = self.worktree.get_sparse_checkout_patterns()
  316. expected = ["/*", "!/*/", "/src/"]
  317. self.assertEqual(expected, patterns)
  318. def test_sparse_checkout_file_path(self):
  319. """Test getting the sparse checkout file path."""
  320. expected_path = os.path.join(self.repo.controldir(), "info", "sparse-checkout")
  321. actual_path = self.worktree._sparse_checkout_file_path()
  322. self.assertEqual(expected_path, actual_path)
  323. class WorkTreeBackwardCompatibilityTests(WorkTreeTestCase):
  324. """Tests for backward compatibility of deprecated Repo methods."""
  325. def test_deprecated_stage_delegates_to_worktree(self):
  326. """Test that deprecated Repo.stage delegates to WorkTree."""
  327. with open(os.path.join(self.repo.path, "new_file"), "w") as f:
  328. f.write("test content")
  329. # This should show a deprecation warning but still work
  330. import warnings
  331. with warnings.catch_warnings(record=True) as w:
  332. warnings.simplefilter("always")
  333. self.repo.stage(
  334. ["new_file"]
  335. ) # Call deprecated method on Repo, not WorkTree
  336. self.assertTrue(len(w) > 0)
  337. self.assertTrue(issubclass(w[0].category, DeprecationWarning))
  338. def test_deprecated_unstage_delegates_to_worktree(self):
  339. """Test that deprecated Repo.unstage delegates to WorkTree."""
  340. # This should show a deprecation warning but still work
  341. import warnings
  342. with warnings.catch_warnings(record=True) as w:
  343. warnings.simplefilter("always")
  344. self.repo.unstage(["a"]) # Call deprecated method on Repo, not WorkTree
  345. self.assertTrue(len(w) > 0)
  346. self.assertTrue(issubclass(w[0].category, DeprecationWarning))
  347. def test_deprecated_sparse_checkout_methods(self):
  348. """Test that deprecated sparse checkout methods delegate to WorkTree."""
  349. import warnings
  350. # Test get_sparse_checkout_patterns
  351. with warnings.catch_warnings(record=True) as w:
  352. warnings.simplefilter("always")
  353. patterns = (
  354. self.repo.get_sparse_checkout_patterns()
  355. ) # Call deprecated method on Repo
  356. self.assertEqual([], patterns)
  357. self.assertTrue(len(w) > 0)
  358. self.assertTrue(issubclass(w[0].category, DeprecationWarning))
  359. # Test set_sparse_checkout_patterns
  360. with warnings.catch_warnings(record=True) as w:
  361. warnings.simplefilter("always")
  362. self.repo.set_sparse_checkout_patterns(
  363. ["*.py"]
  364. ) # Call deprecated method on Repo
  365. self.assertTrue(len(w) > 0)
  366. self.assertTrue(issubclass(w[0].category, DeprecationWarning))
  367. def test_pre_commit_hook_fail(self):
  368. """Test that failing pre-commit hook raises CommitError."""
  369. if os.name != "posix":
  370. self.skipTest("shell hook tests requires POSIX shell")
  371. # Create a failing pre-commit hook
  372. hooks_dir = os.path.join(self.repo.controldir(), "hooks")
  373. os.makedirs(hooks_dir, exist_ok=True)
  374. hook_path = os.path.join(hooks_dir, "pre-commit")
  375. with open(hook_path, "w") as f:
  376. f.write("#!/bin/sh\nexit 1\n")
  377. os.chmod(hook_path, 0o755)
  378. # Try to commit
  379. worktree = self.repo.get_worktree()
  380. with self.assertRaises(CommitError):
  381. worktree.commit(b"No message")
  382. def write_file(self, filename, content):
  383. """Helper to write a file in the repo."""
  384. with open(os.path.join(self.test_dir, filename), "wb") as f:
  385. f.write(content)
  386. class WorkTreeOperationsTests(WorkTreeTestCase):
  387. """Tests for worktree operations like add, list, remove."""
  388. def test_list_worktrees_single(self) -> None:
  389. """Test listing worktrees when only main worktree exists."""
  390. worktrees = list_worktrees(self.repo)
  391. self.assertEqual(len(worktrees), 1)
  392. self.assertEqual(worktrees[0].path, self.repo.path)
  393. self.assertEqual(worktrees[0].bare, False)
  394. self.assertIsNotNone(worktrees[0].head)
  395. self.assertIsNotNone(worktrees[0].branch)
  396. def test_add_worktree_new_branch(self) -> None:
  397. """Test adding a worktree with a new branch."""
  398. # Create a commit first
  399. worktree = self.repo.get_worktree()
  400. self.write_file("test.txt", b"test content")
  401. worktree.stage(["test.txt"])
  402. commit_id = worktree.commit(message=b"Initial commit")
  403. # Add a new worktree
  404. wt_path = os.path.join(self.tempdir, "new-worktree")
  405. add_worktree(self.repo, wt_path, branch=b"feature-branch")
  406. # Verify worktree was created
  407. self.assertTrue(os.path.exists(wt_path))
  408. self.assertTrue(os.path.exists(os.path.join(wt_path, ".git")))
  409. # Verify it appears in the list
  410. worktrees = list_worktrees(self.repo)
  411. self.assertEqual(len(worktrees), 2)
  412. # Find the new worktree in the list
  413. new_wt = None
  414. for wt in worktrees:
  415. if wt.path == wt_path:
  416. new_wt = wt
  417. break
  418. self.assertIsNotNone(new_wt)
  419. self.assertEqual(new_wt.branch, b"refs/heads/feature-branch")
  420. self.assertEqual(new_wt.head, commit_id)
  421. self.assertFalse(new_wt.detached)
  422. def test_add_worktree_detached(self) -> None:
  423. """Test adding a worktree with detached HEAD."""
  424. # Create a commit
  425. worktree = self.repo.get_worktree()
  426. self.write_file("test.txt", b"test content")
  427. worktree.stage(["test.txt"])
  428. commit_id = worktree.commit(message=b"Initial commit")
  429. # Add a detached worktree
  430. wt_path = os.path.join(self.tempdir, "detached-worktree")
  431. add_worktree(self.repo, wt_path, commit=commit_id, detach=True)
  432. # Verify it's detached
  433. worktrees = list_worktrees(self.repo)
  434. self.assertEqual(len(worktrees), 2)
  435. for wt in worktrees:
  436. if wt.path == wt_path:
  437. self.assertTrue(wt.detached)
  438. self.assertIsNone(wt.branch)
  439. self.assertEqual(wt.head, commit_id)
  440. def test_add_worktree_existing_path(self) -> None:
  441. """Test that adding a worktree to existing path fails."""
  442. wt_path = os.path.join(self.tempdir, "existing")
  443. os.mkdir(wt_path)
  444. with self.assertRaises(ValueError) as cm:
  445. add_worktree(self.repo, wt_path)
  446. self.assertIn("Path already exists", str(cm.exception))
  447. def test_add_worktree_branch_already_checked_out(self) -> None:
  448. """Test that checking out same branch in multiple worktrees fails."""
  449. # Create initial commit
  450. worktree = self.repo.get_worktree()
  451. self.write_file("test.txt", b"test content")
  452. worktree.stage(["test.txt"])
  453. worktree.commit(message=b"Initial commit")
  454. # First worktree should succeed with a new branch
  455. wt_path1 = os.path.join(self.tempdir, "wt1")
  456. add_worktree(self.repo, wt_path1, branch=b"feature")
  457. # Second worktree with same branch should fail
  458. wt_path2 = os.path.join(self.tempdir, "wt2")
  459. with self.assertRaises(ValueError) as cm:
  460. add_worktree(self.repo, wt_path2, branch=b"feature")
  461. self.assertIn("already checked out", str(cm.exception))
  462. # But should work with force=True
  463. add_worktree(self.repo, wt_path2, branch=b"feature", force=True)
  464. def test_remove_worktree(self) -> None:
  465. """Test removing a worktree."""
  466. # Create a worktree
  467. wt_path = os.path.join(self.tempdir, "to-remove")
  468. add_worktree(self.repo, wt_path)
  469. # Verify it exists
  470. self.assertTrue(os.path.exists(wt_path))
  471. self.assertEqual(len(list_worktrees(self.repo)), 2)
  472. # Remove it
  473. remove_worktree(self.repo, wt_path)
  474. # Verify it's gone
  475. self.assertFalse(os.path.exists(wt_path))
  476. self.assertEqual(len(list_worktrees(self.repo)), 1)
  477. def test_remove_main_worktree_fails(self) -> None:
  478. """Test that removing the main worktree fails."""
  479. with self.assertRaises(ValueError) as cm:
  480. remove_worktree(self.repo, self.repo.path)
  481. self.assertIn("Cannot remove the main working tree", str(cm.exception))
  482. def test_remove_nonexistent_worktree(self) -> None:
  483. """Test that removing non-existent worktree fails."""
  484. with self.assertRaises(ValueError) as cm:
  485. remove_worktree(self.repo, "/nonexistent/path")
  486. self.assertIn("Worktree not found", str(cm.exception))
  487. def test_lock_unlock_worktree(self) -> None:
  488. """Test locking and unlocking a worktree."""
  489. # Create a worktree
  490. wt_path = os.path.join(self.tempdir, "lockable")
  491. add_worktree(self.repo, wt_path)
  492. # Lock it
  493. lock_worktree(self.repo, wt_path, reason="Testing lock")
  494. # Verify it's locked
  495. worktrees = list_worktrees(self.repo)
  496. for wt in worktrees:
  497. if wt.path == wt_path:
  498. self.assertTrue(wt.locked)
  499. # Try to remove locked worktree (should fail)
  500. with self.assertRaises(ValueError) as cm:
  501. remove_worktree(self.repo, wt_path)
  502. self.assertIn("locked", str(cm.exception))
  503. # Unlock it
  504. unlock_worktree(self.repo, wt_path)
  505. # Verify it's unlocked
  506. worktrees = list_worktrees(self.repo)
  507. for wt in worktrees:
  508. if wt.path == wt_path:
  509. self.assertFalse(wt.locked)
  510. # Now removal should work
  511. remove_worktree(self.repo, wt_path)
  512. def test_prune_worktrees(self) -> None:
  513. """Test pruning worktrees."""
  514. # Create a worktree
  515. wt_path = os.path.join(self.tempdir, "to-prune")
  516. add_worktree(self.repo, wt_path)
  517. # Manually remove the worktree directory
  518. shutil.rmtree(wt_path)
  519. # Verify it still shows up as prunable
  520. worktrees = list_worktrees(self.repo)
  521. prunable_count = sum(1 for wt in worktrees if wt.prunable)
  522. self.assertEqual(prunable_count, 1)
  523. # Prune it
  524. pruned = prune_worktrees(self.repo)
  525. self.assertEqual(len(pruned), 1)
  526. # Verify it's gone from the list
  527. worktrees = list_worktrees(self.repo)
  528. self.assertEqual(len(worktrees), 1)
  529. def test_prune_dry_run(self) -> None:
  530. """Test prune with dry_run doesn't remove anything."""
  531. # Create and manually remove a worktree
  532. wt_path = os.path.join(self.tempdir, "dry-run-test")
  533. add_worktree(self.repo, wt_path)
  534. shutil.rmtree(wt_path)
  535. # Dry run should report but not remove
  536. pruned = prune_worktrees(self.repo, dry_run=True)
  537. self.assertEqual(len(pruned), 1)
  538. # Worktree should still be in list
  539. worktrees = list_worktrees(self.repo)
  540. self.assertEqual(len(worktrees), 2)
  541. def test_prune_locked_worktree_not_pruned(self) -> None:
  542. """Test that locked worktrees are not pruned."""
  543. # Create and lock a worktree
  544. wt_path = os.path.join(self.tempdir, "locked-prune")
  545. add_worktree(self.repo, wt_path)
  546. lock_worktree(self.repo, wt_path)
  547. # Remove the directory
  548. shutil.rmtree(wt_path)
  549. # Prune should not remove locked worktree
  550. pruned = prune_worktrees(self.repo)
  551. self.assertEqual(len(pruned), 0)
  552. # Worktree should still be in list
  553. worktrees = list_worktrees(self.repo)
  554. self.assertEqual(len(worktrees), 2)
  555. def test_move_worktree(self) -> None:
  556. """Test moving a worktree."""
  557. # Create a worktree
  558. wt_path = os.path.join(self.tempdir, "to-move")
  559. add_worktree(self.repo, wt_path)
  560. # Create a file in the worktree
  561. test_file = os.path.join(wt_path, "test.txt")
  562. with open(test_file, "w") as f:
  563. f.write("test content")
  564. # Move it
  565. new_path = os.path.join(self.tempdir, "moved")
  566. move_worktree(self.repo, wt_path, new_path)
  567. # Verify old path doesn't exist
  568. self.assertFalse(os.path.exists(wt_path))
  569. # Verify new path exists with contents
  570. self.assertTrue(os.path.exists(new_path))
  571. self.assertTrue(os.path.exists(os.path.join(new_path, "test.txt")))
  572. # Verify it's in the list at new location
  573. worktrees = list_worktrees(self.repo)
  574. paths = [wt.path for wt in worktrees]
  575. self.assertIn(new_path, paths)
  576. self.assertNotIn(wt_path, paths)
  577. def test_move_main_worktree_fails(self) -> None:
  578. """Test that moving the main worktree fails."""
  579. new_path = os.path.join(self.tempdir, "new-main")
  580. with self.assertRaises(ValueError) as cm:
  581. move_worktree(self.repo, self.repo.path, new_path)
  582. self.assertIn("Cannot move the main working tree", str(cm.exception))
  583. def test_move_to_existing_path_fails(self) -> None:
  584. """Test that moving to an existing path fails."""
  585. # Create a worktree
  586. wt_path = os.path.join(self.tempdir, "worktree")
  587. add_worktree(self.repo, wt_path)
  588. # Create target directory
  589. new_path = os.path.join(self.tempdir, "existing")
  590. os.makedirs(new_path)
  591. with self.assertRaises(ValueError) as cm:
  592. move_worktree(self.repo, wt_path, new_path)
  593. self.assertIn("Path already exists", str(cm.exception))
  594. class TemporaryWorktreeTests(TestCase):
  595. """Tests for temporary_worktree context manager."""
  596. def setUp(self) -> None:
  597. super().setUp()
  598. self.tempdir = tempfile.mkdtemp()
  599. self.addCleanup(shutil.rmtree, self.tempdir)
  600. self.repo_path = os.path.join(self.tempdir, "repo")
  601. self.repo = Repo.init(self.repo_path, mkdir=True)
  602. # Create an initial commit so HEAD exists
  603. readme_path = os.path.join(self.repo_path, "README.md")
  604. with open(readme_path, "w") as f:
  605. f.write("# Test Repository\n")
  606. porcelain.add(self.repo, [readme_path])
  607. porcelain.commit(self.repo, message=b"Initial commit")
  608. def test_temporary_worktree_creates_and_cleans_up(self) -> None:
  609. """Test that temporary worktree is created and cleaned up."""
  610. worktree_path = None
  611. # Use the context manager
  612. with temporary_worktree(self.repo) as worktree:
  613. worktree_path = worktree.path
  614. # Check that worktree exists
  615. self.assertTrue(os.path.exists(worktree_path))
  616. # Check that it's in the list of worktrees
  617. worktrees = list_worktrees(self.repo)
  618. paths = [wt.path for wt in worktrees]
  619. self.assertIn(worktree_path, paths)
  620. # Check that .git file exists in worktree
  621. gitdir_file = os.path.join(worktree_path, ".git")
  622. self.assertTrue(os.path.exists(gitdir_file))
  623. # After context manager exits, check cleanup
  624. self.assertFalse(os.path.exists(worktree_path))
  625. # Check that it's no longer in the list of worktrees
  626. worktrees = list_worktrees(self.repo)
  627. paths = [wt.path for wt in worktrees]
  628. self.assertNotIn(worktree_path, paths)
  629. def test_temporary_worktree_with_custom_prefix(self) -> None:
  630. """Test temporary worktree with custom prefix."""
  631. custom_prefix = "my-custom-prefix-"
  632. with temporary_worktree(self.repo, prefix=custom_prefix) as worktree:
  633. # Check that the directory name starts with our prefix
  634. dirname = os.path.basename(worktree.path)
  635. self.assertTrue(dirname.startswith(custom_prefix))
  636. def test_temporary_worktree_cleanup_on_exception(self) -> None:
  637. """Test that cleanup happens even when exception is raised."""
  638. worktree_path = None
  639. class TestException(Exception):
  640. pass
  641. try:
  642. with temporary_worktree(self.repo) as worktree:
  643. worktree_path = worktree.path
  644. self.assertTrue(os.path.exists(worktree_path))
  645. raise TestException("Test exception")
  646. except TestException:
  647. pass
  648. # Cleanup should still happen
  649. self.assertFalse(os.path.exists(worktree_path))
  650. # Check that it's no longer in the list of worktrees
  651. worktrees = list_worktrees(self.repo)
  652. paths = [wt.path for wt in worktrees]
  653. self.assertNotIn(worktree_path, paths)
  654. def test_temporary_worktree_operations(self) -> None:
  655. """Test that operations can be performed in temporary worktree."""
  656. # Create a test file in main repo
  657. test_file = os.path.join(self.repo_path, "test.txt")
  658. with open(test_file, "w") as f:
  659. f.write("Hello, world!")
  660. porcelain.add(self.repo, [test_file])
  661. porcelain.commit(self.repo, message=b"Initial commit")
  662. with temporary_worktree(self.repo) as worktree:
  663. # Check that the file exists in the worktree
  664. wt_test_file = os.path.join(worktree.path, "test.txt")
  665. self.assertTrue(os.path.exists(wt_test_file))
  666. # Read and verify content
  667. with open(wt_test_file) as f:
  668. content = f.read()
  669. self.assertEqual(content, "Hello, world!")
  670. # Make changes in the worktree
  671. with open(wt_test_file, "w") as f:
  672. f.write("Modified content")
  673. # Changes should be visible in status
  674. status = porcelain.status(worktree)
  675. self.assertIn(b"test.txt", status.unstaged)