test_worktree.py 38 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012
  1. # test_worktree.py -- Tests for dulwich.worktree
  2. # Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for dulwich.worktree."""
  22. import os
  23. import shutil
  24. import stat
  25. import tempfile
  26. from unittest import skipIf
  27. from dulwich import porcelain
  28. from dulwich.errors import CommitError
  29. from dulwich.object_store import tree_lookup_path
  30. from dulwich.repo import Repo
  31. from dulwich.worktree import (
  32. WorkTree,
  33. add_worktree,
  34. list_worktrees,
  35. lock_worktree,
  36. move_worktree,
  37. prune_worktrees,
  38. remove_worktree,
  39. repair_worktree,
  40. temporary_worktree,
  41. unlock_worktree,
  42. )
  43. from . import TestCase
  44. class WorkTreeTestCase(TestCase):
  45. """Base test case for WorkTree tests."""
  46. def setUp(self):
  47. super().setUp()
  48. self.tempdir = tempfile.mkdtemp()
  49. self.test_dir = os.path.join(self.tempdir, "main")
  50. self.repo = Repo.init(self.test_dir, mkdir=True)
  51. # Create initial commit with a file
  52. with open(os.path.join(self.test_dir, "a"), "wb") as f:
  53. f.write(b"contents of file a")
  54. self.repo.get_worktree().stage(["a"])
  55. self.root_commit = self.repo.get_worktree().commit(
  56. message=b"Initial commit",
  57. committer=b"Test Committer <test@nodomain.com>",
  58. author=b"Test Author <test@nodomain.com>",
  59. commit_timestamp=12345,
  60. commit_timezone=0,
  61. author_timestamp=12345,
  62. author_timezone=0,
  63. )
  64. self.worktree = self.repo.get_worktree()
  65. def tearDown(self):
  66. self.repo.close()
  67. super().tearDown()
  68. def write_file(self, filename, content):
  69. """Helper to write a file in the repo."""
  70. with open(os.path.join(self.test_dir, filename), "wb") as f:
  71. f.write(content)
  72. class WorkTreeInitTests(TestCase):
  73. """Tests for WorkTree initialization."""
  74. def test_init_with_repo_path(self):
  75. """Test WorkTree initialization with same path as repo."""
  76. with tempfile.TemporaryDirectory() as tmpdir:
  77. repo = Repo.init(tmpdir)
  78. worktree = WorkTree(repo, tmpdir)
  79. self.assertEqual(worktree.path, tmpdir)
  80. self.assertEqual(worktree._repo, repo)
  81. self.assertTrue(os.path.isabs(worktree.path))
  82. def test_init_with_different_path(self):
  83. """Test WorkTree initialization with different path from repo."""
  84. with tempfile.TemporaryDirectory() as tmpdir:
  85. repo_path = os.path.join(tmpdir, "repo")
  86. worktree_path = os.path.join(tmpdir, "worktree")
  87. os.makedirs(repo_path)
  88. os.makedirs(worktree_path)
  89. repo = Repo.init(repo_path)
  90. worktree = WorkTree(repo, worktree_path)
  91. self.assertNotEqual(worktree.path, repo.path)
  92. self.assertEqual(worktree.path, worktree_path)
  93. self.assertEqual(worktree._repo, repo)
  94. self.assertTrue(os.path.isabs(worktree.path))
  95. def test_init_with_bytes_path(self):
  96. """Test WorkTree initialization with bytes path."""
  97. with tempfile.TemporaryDirectory() as tmpdir:
  98. repo = Repo.init(tmpdir)
  99. worktree = WorkTree(repo, tmpdir.encode("utf-8"))
  100. self.assertEqual(worktree.path, tmpdir)
  101. self.assertIsInstance(worktree.path, str)
  102. class WorkTreeStagingTests(WorkTreeTestCase):
  103. """Tests for WorkTree staging operations."""
  104. def test_stage_absolute(self):
  105. """Test that staging with absolute paths raises ValueError."""
  106. r = self.repo
  107. os.remove(os.path.join(r.path, "a"))
  108. self.assertRaises(ValueError, self.worktree.stage, [os.path.join(r.path, "a")])
  109. def test_stage_deleted(self):
  110. """Test staging a deleted file."""
  111. r = self.repo
  112. os.remove(os.path.join(r.path, "a"))
  113. self.worktree.stage(["a"])
  114. self.worktree.stage(["a"]) # double-stage a deleted path
  115. self.assertEqual([], list(r.open_index()))
  116. def test_stage_directory(self):
  117. """Test staging a directory."""
  118. r = self.repo
  119. os.mkdir(os.path.join(r.path, "c"))
  120. self.worktree.stage(["c"])
  121. self.assertEqual([b"a"], list(r.open_index()))
  122. def test_stage_submodule(self):
  123. """Test staging a submodule."""
  124. r = self.repo
  125. s = Repo.init(os.path.join(r.path, "sub"), mkdir=True)
  126. s.get_worktree().commit(
  127. message=b"message",
  128. )
  129. self.worktree.stage(["sub"])
  130. self.assertEqual([b"a", b"sub"], list(r.open_index()))
  131. class WorkTreeUnstagingTests(WorkTreeTestCase):
  132. """Tests for WorkTree unstaging operations."""
  133. def test_unstage_modify_file_with_dir(self):
  134. """Test unstaging a modified file in a directory."""
  135. os.mkdir(os.path.join(self.repo.path, "new_dir"))
  136. full_path = os.path.join(self.repo.path, "new_dir", "foo")
  137. with open(full_path, "w") as f:
  138. f.write("hello")
  139. porcelain.add(self.repo, paths=[full_path])
  140. porcelain.commit(
  141. self.repo,
  142. message=b"unittest",
  143. committer=b"Jane <jane@example.com>",
  144. author=b"John <john@example.com>",
  145. )
  146. with open(full_path, "a") as f:
  147. f.write("something new")
  148. self.worktree.unstage(["new_dir/foo"])
  149. status = list(porcelain.status(self.repo))
  150. self.assertEqual(
  151. [
  152. {"add": [], "delete": [], "modify": []},
  153. [os.fsencode(os.path.join("new_dir", "foo"))],
  154. [],
  155. ],
  156. status,
  157. )
  158. def test_unstage_while_no_commit(self):
  159. """Test unstaging when there are no commits."""
  160. file = "foo"
  161. full_path = os.path.join(self.repo.path, file)
  162. with open(full_path, "w") as f:
  163. f.write("hello")
  164. porcelain.add(self.repo, paths=[full_path])
  165. self.worktree.unstage([file])
  166. status = list(porcelain.status(self.repo))
  167. self.assertEqual(
  168. [{"add": [], "delete": [], "modify": []}, [], [os.fsencode("foo")]], status
  169. )
  170. def test_unstage_add_file(self):
  171. """Test unstaging a newly added file."""
  172. file = "foo"
  173. full_path = os.path.join(self.repo.path, file)
  174. porcelain.commit(
  175. self.repo,
  176. message=b"unittest",
  177. committer=b"Jane <jane@example.com>",
  178. author=b"John <john@example.com>",
  179. )
  180. with open(full_path, "w") as f:
  181. f.write("hello")
  182. porcelain.add(self.repo, paths=[full_path])
  183. self.worktree.unstage([file])
  184. status = list(porcelain.status(self.repo))
  185. self.assertEqual(
  186. [{"add": [], "delete": [], "modify": []}, [], [os.fsencode("foo")]], status
  187. )
  188. def test_unstage_modify_file(self):
  189. """Test unstaging a modified file."""
  190. file = "foo"
  191. full_path = os.path.join(self.repo.path, file)
  192. with open(full_path, "w") as f:
  193. f.write("hello")
  194. porcelain.add(self.repo, paths=[full_path])
  195. porcelain.commit(
  196. self.repo,
  197. message=b"unittest",
  198. committer=b"Jane <jane@example.com>",
  199. author=b"John <john@example.com>",
  200. )
  201. with open(full_path, "a") as f:
  202. f.write("broken")
  203. porcelain.add(self.repo, paths=[full_path])
  204. self.worktree.unstage([file])
  205. status = list(porcelain.status(self.repo))
  206. self.assertEqual(
  207. [{"add": [], "delete": [], "modify": []}, [os.fsencode("foo")], []], status
  208. )
  209. def test_unstage_remove_file(self):
  210. """Test unstaging a removed file."""
  211. file = "foo"
  212. full_path = os.path.join(self.repo.path, file)
  213. with open(full_path, "w") as f:
  214. f.write("hello")
  215. porcelain.add(self.repo, paths=[full_path])
  216. porcelain.commit(
  217. self.repo,
  218. message=b"unittest",
  219. committer=b"Jane <jane@example.com>",
  220. author=b"John <john@example.com>",
  221. )
  222. os.remove(full_path)
  223. self.worktree.unstage([file])
  224. status = list(porcelain.status(self.repo))
  225. self.assertEqual(
  226. [{"add": [], "delete": [], "modify": []}, [os.fsencode("foo")], []], status
  227. )
  228. class WorkTreeCommitTests(WorkTreeTestCase):
  229. """Tests for WorkTree commit operations."""
  230. def test_commit_modified(self):
  231. """Test committing a modified file."""
  232. r = self.repo
  233. with open(os.path.join(r.path, "a"), "wb") as f:
  234. f.write(b"new contents")
  235. self.worktree.stage(["a"])
  236. commit_sha = self.worktree.commit(
  237. b"modified a",
  238. committer=b"Test Committer <test@nodomain.com>",
  239. author=b"Test Author <test@nodomain.com>",
  240. commit_timestamp=12395,
  241. commit_timezone=0,
  242. author_timestamp=12395,
  243. author_timezone=0,
  244. )
  245. self.assertEqual([self.root_commit], r[commit_sha].parents)
  246. a_mode, a_id = tree_lookup_path(r.get_object, r[commit_sha].tree, b"a")
  247. self.assertEqual(stat.S_IFREG | 0o644, a_mode)
  248. self.assertEqual(b"new contents", r[a_id].data)
  249. @skipIf(not getattr(os, "symlink", None), "Requires symlink support")
  250. def test_commit_symlink(self):
  251. """Test committing a symlink."""
  252. r = self.repo
  253. os.symlink("a", os.path.join(r.path, "b"))
  254. self.worktree.stage(["a", "b"])
  255. commit_sha = self.worktree.commit(
  256. b"Symlink b",
  257. committer=b"Test Committer <test@nodomain.com>",
  258. author=b"Test Author <test@nodomain.com>",
  259. commit_timestamp=12395,
  260. commit_timezone=0,
  261. author_timestamp=12395,
  262. author_timezone=0,
  263. )
  264. self.assertEqual([self.root_commit], r[commit_sha].parents)
  265. b_mode, b_id = tree_lookup_path(r.get_object, r[commit_sha].tree, b"b")
  266. self.assertEqual(stat.S_IFLNK, b_mode)
  267. self.assertEqual(b"a", r[b_id].data)
  268. class WorkTreeResetTests(WorkTreeTestCase):
  269. """Tests for WorkTree reset operations."""
  270. def test_reset_index(self):
  271. """Test resetting the index."""
  272. # Make some changes and stage them
  273. with open(os.path.join(self.repo.path, "a"), "wb") as f:
  274. f.write(b"modified contents")
  275. self.worktree.stage(["a"])
  276. # Reset index should restore to HEAD
  277. self.worktree.reset_index()
  278. # Check that the working tree file was restored
  279. with open(os.path.join(self.repo.path, "a"), "rb") as f:
  280. contents = f.read()
  281. self.assertEqual(b"contents of file a", contents)
  282. class WorkTreeSparseCheckoutTests(WorkTreeTestCase):
  283. """Tests for WorkTree sparse checkout operations."""
  284. def test_get_sparse_checkout_patterns_empty(self):
  285. """Test getting sparse checkout patterns when file doesn't exist."""
  286. patterns = self.worktree.get_sparse_checkout_patterns()
  287. self.assertEqual([], patterns)
  288. def test_set_sparse_checkout_patterns(self):
  289. """Test setting sparse checkout patterns."""
  290. patterns = ["*.py", "docs/"]
  291. self.worktree.set_sparse_checkout_patterns(patterns)
  292. # Read back the patterns
  293. retrieved_patterns = self.worktree.get_sparse_checkout_patterns()
  294. self.assertEqual(patterns, retrieved_patterns)
  295. def test_configure_for_cone_mode(self):
  296. """Test configuring repository for cone mode."""
  297. self.worktree.configure_for_cone_mode()
  298. config = self.repo.get_config()
  299. self.assertEqual(b"true", config.get((b"core",), b"sparseCheckout"))
  300. self.assertEqual(b"true", config.get((b"core",), b"sparseCheckoutCone"))
  301. def test_infer_cone_mode_false(self):
  302. """Test inferring cone mode when not configured."""
  303. self.assertFalse(self.worktree.infer_cone_mode())
  304. def test_infer_cone_mode_true(self):
  305. """Test inferring cone mode when configured."""
  306. self.worktree.configure_for_cone_mode()
  307. self.assertTrue(self.worktree.infer_cone_mode())
  308. def test_set_cone_mode_patterns(self):
  309. """Test setting cone mode patterns."""
  310. dirs = ["src", "tests"]
  311. self.worktree.set_cone_mode_patterns(dirs)
  312. patterns = self.worktree.get_sparse_checkout_patterns()
  313. expected = ["/*", "!/*/", "/src/", "/tests/"]
  314. self.assertEqual(expected, patterns)
  315. def test_set_cone_mode_patterns_empty(self):
  316. """Test setting cone mode patterns with empty list."""
  317. self.worktree.set_cone_mode_patterns([])
  318. patterns = self.worktree.get_sparse_checkout_patterns()
  319. expected = ["/*", "!/*/"]
  320. self.assertEqual(expected, patterns)
  321. def test_set_cone_mode_patterns_duplicates(self):
  322. """Test that duplicate patterns are not added."""
  323. dirs = ["src", "src"] # duplicate
  324. self.worktree.set_cone_mode_patterns(dirs)
  325. patterns = self.worktree.get_sparse_checkout_patterns()
  326. expected = ["/*", "!/*/", "/src/"]
  327. self.assertEqual(expected, patterns)
  328. def test_sparse_checkout_file_path(self):
  329. """Test getting the sparse checkout file path."""
  330. expected_path = os.path.join(self.repo.controldir(), "info", "sparse-checkout")
  331. actual_path = self.worktree._sparse_checkout_file_path()
  332. self.assertEqual(expected_path, actual_path)
  333. class WorkTreeBackwardCompatibilityTests(WorkTreeTestCase):
  334. """Tests for backward compatibility of deprecated Repo methods."""
  335. def test_deprecated_stage_delegates_to_worktree(self):
  336. """Test that deprecated Repo.stage delegates to WorkTree."""
  337. with open(os.path.join(self.repo.path, "new_file"), "w") as f:
  338. f.write("test content")
  339. # This should show a deprecation warning but still work
  340. import warnings
  341. with warnings.catch_warnings(record=True) as w:
  342. warnings.simplefilter("always")
  343. self.repo.stage(
  344. ["new_file"]
  345. ) # Call deprecated method on Repo, not WorkTree
  346. self.assertTrue(len(w) > 0)
  347. self.assertTrue(issubclass(w[0].category, DeprecationWarning))
  348. def test_deprecated_unstage_delegates_to_worktree(self):
  349. """Test that deprecated Repo.unstage delegates to WorkTree."""
  350. # This should show a deprecation warning but still work
  351. import warnings
  352. with warnings.catch_warnings(record=True) as w:
  353. warnings.simplefilter("always")
  354. self.repo.unstage(["a"]) # Call deprecated method on Repo, not WorkTree
  355. self.assertTrue(len(w) > 0)
  356. self.assertTrue(issubclass(w[0].category, DeprecationWarning))
  357. def test_deprecated_sparse_checkout_methods(self):
  358. """Test that deprecated sparse checkout methods delegate to WorkTree."""
  359. import warnings
  360. # Test get_sparse_checkout_patterns
  361. with warnings.catch_warnings(record=True) as w:
  362. warnings.simplefilter("always")
  363. patterns = (
  364. self.repo.get_sparse_checkout_patterns()
  365. ) # Call deprecated method on Repo
  366. self.assertEqual([], patterns)
  367. self.assertTrue(len(w) > 0)
  368. self.assertTrue(issubclass(w[0].category, DeprecationWarning))
  369. # Test set_sparse_checkout_patterns
  370. with warnings.catch_warnings(record=True) as w:
  371. warnings.simplefilter("always")
  372. self.repo.set_sparse_checkout_patterns(
  373. ["*.py"]
  374. ) # Call deprecated method on Repo
  375. self.assertTrue(len(w) > 0)
  376. self.assertTrue(issubclass(w[0].category, DeprecationWarning))
  377. def test_pre_commit_hook_fail(self):
  378. """Test that failing pre-commit hook raises CommitError."""
  379. if os.name != "posix":
  380. self.skipTest("shell hook tests requires POSIX shell")
  381. # Create a failing pre-commit hook
  382. hooks_dir = os.path.join(self.repo.controldir(), "hooks")
  383. os.makedirs(hooks_dir, exist_ok=True)
  384. hook_path = os.path.join(hooks_dir, "pre-commit")
  385. with open(hook_path, "w") as f:
  386. f.write("#!/bin/sh\nexit 1\n")
  387. os.chmod(hook_path, 0o755)
  388. # Try to commit
  389. worktree = self.repo.get_worktree()
  390. with self.assertRaises(CommitError):
  391. worktree.commit(b"No message")
  392. def write_file(self, filename, content):
  393. """Helper to write a file in the repo."""
  394. with open(os.path.join(self.test_dir, filename), "wb") as f:
  395. f.write(content)
  396. class WorkTreeOperationsTests(WorkTreeTestCase):
  397. """Tests for worktree operations like add, list, remove."""
  398. def test_list_worktrees_single(self) -> None:
  399. """Test listing worktrees when only main worktree exists."""
  400. worktrees = list_worktrees(self.repo)
  401. self.assertEqual(len(worktrees), 1)
  402. self.assertEqual(worktrees[0].path, self.repo.path)
  403. self.assertEqual(worktrees[0].bare, False)
  404. self.assertIsNotNone(worktrees[0].head)
  405. self.assertIsNotNone(worktrees[0].branch)
  406. def test_add_worktree_new_branch(self) -> None:
  407. """Test adding a worktree with a new branch."""
  408. # Create a commit first
  409. worktree = self.repo.get_worktree()
  410. self.write_file("test.txt", b"test content")
  411. worktree.stage(["test.txt"])
  412. commit_id = worktree.commit(message=b"Initial commit")
  413. # Add a new worktree
  414. wt_path = os.path.join(self.tempdir, "new-worktree")
  415. add_worktree(self.repo, wt_path, branch=b"feature-branch")
  416. # Verify worktree was created
  417. self.assertTrue(os.path.exists(wt_path))
  418. self.assertTrue(os.path.exists(os.path.join(wt_path, ".git")))
  419. # Verify it appears in the list
  420. worktrees = list_worktrees(self.repo)
  421. self.assertEqual(len(worktrees), 2)
  422. # Find the new worktree in the list
  423. new_wt = None
  424. for wt in worktrees:
  425. if wt.path == wt_path:
  426. new_wt = wt
  427. break
  428. self.assertIsNotNone(new_wt)
  429. self.assertEqual(new_wt.branch, b"refs/heads/feature-branch")
  430. self.assertEqual(new_wt.head, commit_id)
  431. self.assertFalse(new_wt.detached)
  432. def test_add_worktree_detached(self) -> None:
  433. """Test adding a worktree with detached HEAD."""
  434. # Create a commit
  435. worktree = self.repo.get_worktree()
  436. self.write_file("test.txt", b"test content")
  437. worktree.stage(["test.txt"])
  438. commit_id = worktree.commit(message=b"Initial commit")
  439. # Add a detached worktree
  440. wt_path = os.path.join(self.tempdir, "detached-worktree")
  441. add_worktree(self.repo, wt_path, commit=commit_id, detach=True)
  442. # Verify it's detached
  443. worktrees = list_worktrees(self.repo)
  444. self.assertEqual(len(worktrees), 2)
  445. for wt in worktrees:
  446. if wt.path == wt_path:
  447. self.assertTrue(wt.detached)
  448. self.assertIsNone(wt.branch)
  449. self.assertEqual(wt.head, commit_id)
  450. def test_add_worktree_existing_path(self) -> None:
  451. """Test that adding a worktree to existing path fails."""
  452. wt_path = os.path.join(self.tempdir, "existing")
  453. os.mkdir(wt_path)
  454. with self.assertRaises(ValueError) as cm:
  455. add_worktree(self.repo, wt_path)
  456. self.assertIn("Path already exists", str(cm.exception))
  457. def test_add_worktree_branch_already_checked_out(self) -> None:
  458. """Test that checking out same branch in multiple worktrees fails."""
  459. # Create initial commit
  460. worktree = self.repo.get_worktree()
  461. self.write_file("test.txt", b"test content")
  462. worktree.stage(["test.txt"])
  463. worktree.commit(message=b"Initial commit")
  464. # First worktree should succeed with a new branch
  465. wt_path1 = os.path.join(self.tempdir, "wt1")
  466. add_worktree(self.repo, wt_path1, branch=b"feature")
  467. # Second worktree with same branch should fail
  468. wt_path2 = os.path.join(self.tempdir, "wt2")
  469. with self.assertRaises(ValueError) as cm:
  470. add_worktree(self.repo, wt_path2, branch=b"feature")
  471. self.assertIn("already checked out", str(cm.exception))
  472. # But should work with force=True
  473. add_worktree(self.repo, wt_path2, branch=b"feature", force=True)
  474. def test_remove_worktree(self) -> None:
  475. """Test removing a worktree."""
  476. # Create a worktree
  477. wt_path = os.path.join(self.tempdir, "to-remove")
  478. add_worktree(self.repo, wt_path)
  479. # Verify it exists
  480. self.assertTrue(os.path.exists(wt_path))
  481. self.assertEqual(len(list_worktrees(self.repo)), 2)
  482. # Remove it
  483. remove_worktree(self.repo, wt_path)
  484. # Verify it's gone
  485. self.assertFalse(os.path.exists(wt_path))
  486. self.assertEqual(len(list_worktrees(self.repo)), 1)
  487. def test_remove_main_worktree_fails(self) -> None:
  488. """Test that removing the main worktree fails."""
  489. with self.assertRaises(ValueError) as cm:
  490. remove_worktree(self.repo, self.repo.path)
  491. self.assertIn("Cannot remove the main working tree", str(cm.exception))
  492. def test_remove_nonexistent_worktree(self) -> None:
  493. """Test that removing non-existent worktree fails."""
  494. with self.assertRaises(ValueError) as cm:
  495. remove_worktree(self.repo, "/nonexistent/path")
  496. self.assertIn("Worktree not found", str(cm.exception))
  497. def test_lock_unlock_worktree(self) -> None:
  498. """Test locking and unlocking a worktree."""
  499. # Create a worktree
  500. wt_path = os.path.join(self.tempdir, "lockable")
  501. add_worktree(self.repo, wt_path)
  502. # Lock it
  503. lock_worktree(self.repo, wt_path, reason="Testing lock")
  504. # Verify it's locked
  505. worktrees = list_worktrees(self.repo)
  506. for wt in worktrees:
  507. if wt.path == wt_path:
  508. self.assertTrue(wt.locked)
  509. # Try to remove locked worktree (should fail)
  510. with self.assertRaises(ValueError) as cm:
  511. remove_worktree(self.repo, wt_path)
  512. self.assertIn("locked", str(cm.exception))
  513. # Unlock it
  514. unlock_worktree(self.repo, wt_path)
  515. # Verify it's unlocked
  516. worktrees = list_worktrees(self.repo)
  517. for wt in worktrees:
  518. if wt.path == wt_path:
  519. self.assertFalse(wt.locked)
  520. # Now removal should work
  521. remove_worktree(self.repo, wt_path)
  522. def test_prune_worktrees(self) -> None:
  523. """Test pruning worktrees."""
  524. # Create a worktree
  525. wt_path = os.path.join(self.tempdir, "to-prune")
  526. add_worktree(self.repo, wt_path)
  527. # Manually remove the worktree directory
  528. shutil.rmtree(wt_path)
  529. # Verify it still shows up as prunable
  530. worktrees = list_worktrees(self.repo)
  531. prunable_count = sum(1 for wt in worktrees if wt.prunable)
  532. self.assertEqual(prunable_count, 1)
  533. # Prune it
  534. pruned = prune_worktrees(self.repo)
  535. self.assertEqual(len(pruned), 1)
  536. # Verify it's gone from the list
  537. worktrees = list_worktrees(self.repo)
  538. self.assertEqual(len(worktrees), 1)
  539. def test_prune_dry_run(self) -> None:
  540. """Test prune with dry_run doesn't remove anything."""
  541. # Create and manually remove a worktree
  542. wt_path = os.path.join(self.tempdir, "dry-run-test")
  543. add_worktree(self.repo, wt_path)
  544. shutil.rmtree(wt_path)
  545. # Dry run should report but not remove
  546. pruned = prune_worktrees(self.repo, dry_run=True)
  547. self.assertEqual(len(pruned), 1)
  548. # Worktree should still be in list
  549. worktrees = list_worktrees(self.repo)
  550. self.assertEqual(len(worktrees), 2)
  551. def test_prune_locked_worktree_not_pruned(self) -> None:
  552. """Test that locked worktrees are not pruned."""
  553. # Create and lock a worktree
  554. wt_path = os.path.join(self.tempdir, "locked-prune")
  555. add_worktree(self.repo, wt_path)
  556. lock_worktree(self.repo, wt_path)
  557. # Remove the directory
  558. shutil.rmtree(wt_path)
  559. # Prune should not remove locked worktree
  560. pruned = prune_worktrees(self.repo)
  561. self.assertEqual(len(pruned), 0)
  562. # Worktree should still be in list
  563. worktrees = list_worktrees(self.repo)
  564. self.assertEqual(len(worktrees), 2)
  565. def test_move_worktree(self) -> None:
  566. """Test moving a worktree."""
  567. # Create a worktree
  568. wt_path = os.path.join(self.tempdir, "to-move")
  569. add_worktree(self.repo, wt_path)
  570. # Create a file in the worktree
  571. test_file = os.path.join(wt_path, "test.txt")
  572. with open(test_file, "w") as f:
  573. f.write("test content")
  574. # Move it
  575. new_path = os.path.join(self.tempdir, "moved")
  576. move_worktree(self.repo, wt_path, new_path)
  577. # Verify old path doesn't exist
  578. self.assertFalse(os.path.exists(wt_path))
  579. # Verify new path exists with contents
  580. self.assertTrue(os.path.exists(new_path))
  581. self.assertTrue(os.path.exists(os.path.join(new_path, "test.txt")))
  582. # Verify it's in the list at new location
  583. worktrees = list_worktrees(self.repo)
  584. paths = [wt.path for wt in worktrees]
  585. self.assertIn(new_path, paths)
  586. self.assertNotIn(wt_path, paths)
  587. def test_move_main_worktree_fails(self) -> None:
  588. """Test that moving the main worktree fails."""
  589. new_path = os.path.join(self.tempdir, "new-main")
  590. with self.assertRaises(ValueError) as cm:
  591. move_worktree(self.repo, self.repo.path, new_path)
  592. self.assertIn("Cannot move the main working tree", str(cm.exception))
  593. def test_move_to_existing_path_fails(self) -> None:
  594. """Test that moving to an existing path fails."""
  595. # Create a worktree
  596. wt_path = os.path.join(self.tempdir, "worktree")
  597. add_worktree(self.repo, wt_path)
  598. # Create target directory
  599. new_path = os.path.join(self.tempdir, "existing")
  600. os.makedirs(new_path)
  601. with self.assertRaises(ValueError) as cm:
  602. move_worktree(self.repo, wt_path, new_path)
  603. self.assertIn("Path already exists", str(cm.exception))
  604. def test_repair_worktree_after_manual_move(self) -> None:
  605. """Test repairing a worktree after manually moving it."""
  606. # Create a worktree
  607. wt_path = os.path.join(self.tempdir, "original")
  608. add_worktree(self.repo, wt_path)
  609. # Manually move the worktree directory (simulating external move)
  610. new_path = os.path.join(self.tempdir, "moved")
  611. shutil.move(wt_path, new_path)
  612. # At this point, the connection is broken
  613. # Repair from the moved worktree
  614. repaired = repair_worktree(self.repo, paths=[new_path])
  615. # Should have repaired the worktree
  616. self.assertEqual(len(repaired), 1)
  617. self.assertEqual(repaired[0], new_path)
  618. # Verify the worktree is now properly connected
  619. worktrees = list_worktrees(self.repo)
  620. paths = [wt.path for wt in worktrees]
  621. self.assertIn(new_path, paths)
  622. def test_repair_worktree_from_main_repo(self) -> None:
  623. """Test repairing worktree connections from main repository."""
  624. # Create a worktree
  625. wt_path = os.path.join(self.tempdir, "worktree")
  626. add_worktree(self.repo, wt_path)
  627. # Read the .git file to get the control directory
  628. gitdir_file = os.path.join(wt_path, ".git")
  629. with open(gitdir_file, "rb") as f:
  630. content = f.read().strip()
  631. control_dir = content[8:].decode() # Remove "gitdir: " prefix
  632. # Manually corrupt the .git file to point to wrong location
  633. with open(gitdir_file, "wb") as f:
  634. f.write(b"gitdir: /wrong/path\n")
  635. # Repair from main repository
  636. repaired = repair_worktree(self.repo)
  637. # Should have repaired the connection
  638. self.assertEqual(len(repaired), 1)
  639. self.assertEqual(repaired[0], wt_path)
  640. # Verify .git file now points to correct location
  641. with open(gitdir_file, "rb") as f:
  642. content = f.read().strip()
  643. new_control_dir = content[8:].decode()
  644. self.assertEqual(
  645. os.path.abspath(new_control_dir), os.path.abspath(control_dir)
  646. )
  647. def test_repair_worktree_no_repairs_needed(self) -> None:
  648. """Test repair when no repairs are needed."""
  649. # Create a worktree
  650. wt_path = os.path.join(self.tempdir, "worktree")
  651. add_worktree(self.repo, wt_path)
  652. # Repair - should return empty list since nothing is broken
  653. repaired = repair_worktree(self.repo)
  654. self.assertEqual(len(repaired), 0)
  655. def test_repair_invalid_worktree_path(self) -> None:
  656. """Test that repairing an invalid path raises an error."""
  657. with self.assertRaises(ValueError) as cm:
  658. repair_worktree(self.repo, paths=["/nonexistent/path"])
  659. self.assertIn("Not a valid worktree", str(cm.exception))
  660. def test_repair_multiple_worktrees(self) -> None:
  661. """Test repairing multiple worktrees at once."""
  662. # Create two worktrees
  663. wt_path1 = os.path.join(self.tempdir, "wt1")
  664. wt_path2 = os.path.join(self.tempdir, "wt2")
  665. add_worktree(self.repo, wt_path1, branch=b"branch1")
  666. add_worktree(self.repo, wt_path2, branch=b"branch2")
  667. # Manually move both worktrees
  668. new_path1 = os.path.join(self.tempdir, "moved1")
  669. new_path2 = os.path.join(self.tempdir, "moved2")
  670. shutil.move(wt_path1, new_path1)
  671. shutil.move(wt_path2, new_path2)
  672. # Repair both at once
  673. repaired = repair_worktree(self.repo, paths=[new_path1, new_path2])
  674. # Both should be repaired
  675. self.assertEqual(len(repaired), 2)
  676. self.assertIn(new_path1, repaired)
  677. self.assertIn(new_path2, repaired)
  678. def test_repair_worktree_with_relative_paths(self) -> None:
  679. """Test that repair handles worktrees with relative paths in gitdir."""
  680. # Create a worktree
  681. wt_path = os.path.join(self.tempdir, "worktree")
  682. add_worktree(self.repo, wt_path)
  683. # Manually move the worktree
  684. new_path = os.path.join(self.tempdir, "new-location")
  685. shutil.move(wt_path, new_path)
  686. # Repair from the new location
  687. repaired = repair_worktree(self.repo, paths=[new_path])
  688. # Should have repaired successfully
  689. self.assertEqual(len(repaired), 1)
  690. self.assertEqual(repaired[0], new_path)
  691. # Verify the gitdir pointer was updated
  692. from dulwich.repo import GITDIR, WORKTREES
  693. worktrees_dir = os.path.join(self.repo.controldir(), WORKTREES)
  694. for entry in os.listdir(worktrees_dir):
  695. gitdir_path = os.path.join(worktrees_dir, entry, GITDIR)
  696. if os.path.exists(gitdir_path):
  697. with open(gitdir_path, "rb") as f:
  698. content = f.read().strip()
  699. gitdir_location = os.fsdecode(content)
  700. # Should point to the new .git file location
  701. self.assertTrue(gitdir_location.endswith(".git"))
  702. def test_repair_worktree_container_method(self) -> None:
  703. """Test the WorkTreeContainer.repair() method."""
  704. # Create a worktree
  705. wt_path = os.path.join(self.tempdir, "worktree")
  706. add_worktree(self.repo, wt_path)
  707. # Manually move it
  708. new_path = os.path.join(self.tempdir, "moved")
  709. shutil.move(wt_path, new_path)
  710. # Use the container method to repair
  711. repaired = self.repo.worktrees.repair(paths=[new_path])
  712. # Should have repaired
  713. self.assertEqual(len(repaired), 1)
  714. self.assertEqual(repaired[0], new_path)
  715. def test_repair_with_missing_gitdir_pointer(self) -> None:
  716. """Test repair when gitdir pointer file is missing."""
  717. # Create a worktree
  718. wt_path = os.path.join(self.tempdir, "worktree")
  719. add_worktree(self.repo, wt_path)
  720. # Find and remove the gitdir pointer file
  721. from dulwich.repo import GITDIR, WORKTREES
  722. worktrees_dir = os.path.join(self.repo.controldir(), WORKTREES)
  723. for entry in os.listdir(worktrees_dir):
  724. gitdir_path = os.path.join(worktrees_dir, entry, GITDIR)
  725. if os.path.exists(gitdir_path):
  726. os.remove(gitdir_path)
  727. # Repair should not crash, but won't repair anything
  728. repaired = repair_worktree(self.repo, paths=[wt_path])
  729. self.assertEqual(len(repaired), 0)
  730. def test_repair_worktree_with_corrupted_git_file(self) -> None:
  731. """Test repair with a corrupted .git file."""
  732. # Create a worktree
  733. wt_path = os.path.join(self.tempdir, "worktree")
  734. add_worktree(self.repo, wt_path)
  735. # Corrupt the .git file
  736. gitdir_file = os.path.join(wt_path, ".git")
  737. with open(gitdir_file, "wb") as f:
  738. f.write(b"invalid content\n")
  739. # Attempting to repair should raise an error
  740. with self.assertRaises(ValueError) as cm:
  741. repair_worktree(self.repo, paths=[wt_path])
  742. self.assertIn("Invalid .git file", str(cm.exception))
  743. class TemporaryWorktreeTests(TestCase):
  744. """Tests for temporary_worktree context manager."""
  745. def setUp(self) -> None:
  746. super().setUp()
  747. self.tempdir = tempfile.mkdtemp()
  748. self.addCleanup(shutil.rmtree, self.tempdir)
  749. self.repo_path = os.path.join(self.tempdir, "repo")
  750. self.repo = Repo.init(self.repo_path, mkdir=True)
  751. # Create an initial commit so HEAD exists
  752. readme_path = os.path.join(self.repo_path, "README.md")
  753. with open(readme_path, "w") as f:
  754. f.write("# Test Repository\n")
  755. porcelain.add(self.repo, [readme_path])
  756. porcelain.commit(self.repo, message=b"Initial commit")
  757. def test_temporary_worktree_creates_and_cleans_up(self) -> None:
  758. """Test that temporary worktree is created and cleaned up."""
  759. worktree_path = None
  760. # Use the context manager
  761. with temporary_worktree(self.repo) as worktree:
  762. worktree_path = worktree.path
  763. # Check that worktree exists
  764. self.assertTrue(os.path.exists(worktree_path))
  765. # Check that it's in the list of worktrees
  766. worktrees = list_worktrees(self.repo)
  767. paths = [wt.path for wt in worktrees]
  768. self.assertIn(worktree_path, paths)
  769. # Check that .git file exists in worktree
  770. gitdir_file = os.path.join(worktree_path, ".git")
  771. self.assertTrue(os.path.exists(gitdir_file))
  772. # After context manager exits, check cleanup
  773. self.assertFalse(os.path.exists(worktree_path))
  774. # Check that it's no longer in the list of worktrees
  775. worktrees = list_worktrees(self.repo)
  776. paths = [wt.path for wt in worktrees]
  777. self.assertNotIn(worktree_path, paths)
  778. def test_temporary_worktree_with_custom_prefix(self) -> None:
  779. """Test temporary worktree with custom prefix."""
  780. custom_prefix = "my-custom-prefix-"
  781. with temporary_worktree(self.repo, prefix=custom_prefix) as worktree:
  782. # Check that the directory name starts with our prefix
  783. dirname = os.path.basename(worktree.path)
  784. self.assertTrue(dirname.startswith(custom_prefix))
  785. def test_temporary_worktree_cleanup_on_exception(self) -> None:
  786. """Test that cleanup happens even when exception is raised."""
  787. worktree_path = None
  788. class TestException(Exception):
  789. pass
  790. try:
  791. with temporary_worktree(self.repo) as worktree:
  792. worktree_path = worktree.path
  793. self.assertTrue(os.path.exists(worktree_path))
  794. raise TestException("Test exception")
  795. except TestException:
  796. pass
  797. # Cleanup should still happen
  798. self.assertFalse(os.path.exists(worktree_path))
  799. # Check that it's no longer in the list of worktrees
  800. worktrees = list_worktrees(self.repo)
  801. paths = [wt.path for wt in worktrees]
  802. self.assertNotIn(worktree_path, paths)
  803. def test_temporary_worktree_operations(self) -> None:
  804. """Test that operations can be performed in temporary worktree."""
  805. # Create a test file in main repo
  806. test_file = os.path.join(self.repo_path, "test.txt")
  807. with open(test_file, "w") as f:
  808. f.write("Hello, world!")
  809. porcelain.add(self.repo, [test_file])
  810. porcelain.commit(self.repo, message=b"Initial commit")
  811. with temporary_worktree(self.repo) as worktree:
  812. # Check that the file exists in the worktree
  813. wt_test_file = os.path.join(worktree.path, "test.txt")
  814. self.assertTrue(os.path.exists(wt_test_file))
  815. # Read and verify content
  816. with open(wt_test_file) as f:
  817. content = f.read()
  818. self.assertEqual(content, "Hello, world!")
  819. # Make changes in the worktree
  820. with open(wt_test_file, "w") as f:
  821. f.write("Modified content")
  822. # Changes should be visible in status
  823. status = porcelain.status(worktree)
  824. self.assertIn(os.fsencode("test.txt"), status.unstaged)