test_worktree.py 38 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010
  1. # test_worktree.py -- Tests for dulwich.worktree
  2. # Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for dulwich.worktree."""
  22. import os
  23. import shutil
  24. import stat
  25. import tempfile
  26. from unittest import skipIf
  27. from dulwich.errors import CommitError
  28. from dulwich.index import get_unstaged_changes as _get_unstaged_changes
  29. from dulwich.objects import Commit
  30. from dulwich.object_store import tree_lookup_path
  31. from dulwich.repo import Repo
  32. from dulwich.worktree import (
  33. WorkTree,
  34. add_worktree,
  35. list_worktrees,
  36. lock_worktree,
  37. move_worktree,
  38. prune_worktrees,
  39. remove_worktree,
  40. repair_worktree,
  41. temporary_worktree,
  42. unlock_worktree,
  43. )
  44. from . import TestCase
  45. def get_unstaged_changes(repo):
  46. """Helper to get unstaged changes for a repo."""
  47. index = repo.open_index()
  48. normalizer = repo.get_blob_normalizer()
  49. filter_callback = normalizer.checkin_normalize if normalizer else None
  50. return list(_get_unstaged_changes(index, repo.path, filter_callback, False))
  51. class WorkTreeTestCase(TestCase):
  52. """Base test case for WorkTree tests."""
  53. def setUp(self):
  54. super().setUp()
  55. self.tempdir = tempfile.mkdtemp()
  56. self.test_dir = os.path.join(self.tempdir, "main")
  57. self.repo = Repo.init(self.test_dir, mkdir=True)
  58. # Create initial commit with a file
  59. with open(os.path.join(self.test_dir, "a"), "wb") as f:
  60. f.write(b"contents of file a")
  61. self.repo.get_worktree().stage(["a"])
  62. self.root_commit = self.repo.get_worktree().commit(
  63. message=b"Initial commit",
  64. committer=b"Test Committer <test@nodomain.com>",
  65. author=b"Test Author <test@nodomain.com>",
  66. commit_timestamp=12345,
  67. commit_timezone=0,
  68. author_timestamp=12345,
  69. author_timezone=0,
  70. )
  71. self.worktree = self.repo.get_worktree()
  72. def tearDown(self):
  73. self.repo.close()
  74. super().tearDown()
  75. def write_file(self, filename, content):
  76. """Helper to write a file in the repo."""
  77. with open(os.path.join(self.test_dir, filename), "wb") as f:
  78. f.write(content)
  79. class WorkTreeInitTests(TestCase):
  80. """Tests for WorkTree initialization."""
  81. def test_init_with_repo_path(self):
  82. """Test WorkTree initialization with same path as repo."""
  83. with tempfile.TemporaryDirectory() as tmpdir:
  84. repo = Repo.init(tmpdir)
  85. worktree = WorkTree(repo, tmpdir)
  86. self.assertEqual(worktree.path, tmpdir)
  87. self.assertEqual(worktree._repo, repo)
  88. self.assertTrue(os.path.isabs(worktree.path))
  89. def test_init_with_different_path(self):
  90. """Test WorkTree initialization with different path from repo."""
  91. with tempfile.TemporaryDirectory() as tmpdir:
  92. repo_path = os.path.join(tmpdir, "repo")
  93. worktree_path = os.path.join(tmpdir, "worktree")
  94. os.makedirs(repo_path)
  95. os.makedirs(worktree_path)
  96. repo = Repo.init(repo_path)
  97. worktree = WorkTree(repo, worktree_path)
  98. self.assertNotEqual(worktree.path, repo.path)
  99. self.assertEqual(worktree.path, worktree_path)
  100. self.assertEqual(worktree._repo, repo)
  101. self.assertTrue(os.path.isabs(worktree.path))
  102. def test_init_with_bytes_path(self):
  103. """Test WorkTree initialization with bytes path."""
  104. with tempfile.TemporaryDirectory() as tmpdir:
  105. repo = Repo.init(tmpdir)
  106. worktree = WorkTree(repo, tmpdir.encode("utf-8"))
  107. self.assertEqual(worktree.path, tmpdir)
  108. self.assertIsInstance(worktree.path, str)
  109. class WorkTreeStagingTests(WorkTreeTestCase):
  110. """Tests for WorkTree staging operations."""
  111. def test_stage_absolute(self):
  112. """Test that staging with absolute paths raises ValueError."""
  113. r = self.repo
  114. os.remove(os.path.join(r.path, "a"))
  115. self.assertRaises(ValueError, self.worktree.stage, [os.path.join(r.path, "a")])
  116. def test_stage_deleted(self):
  117. """Test staging a deleted file."""
  118. r = self.repo
  119. os.remove(os.path.join(r.path, "a"))
  120. self.worktree.stage(["a"])
  121. self.worktree.stage(["a"]) # double-stage a deleted path
  122. self.assertEqual([], list(r.open_index()))
  123. def test_stage_directory(self):
  124. """Test staging a directory."""
  125. r = self.repo
  126. os.mkdir(os.path.join(r.path, "c"))
  127. self.worktree.stage(["c"])
  128. self.assertEqual([b"a"], list(r.open_index()))
  129. def test_stage_submodule(self):
  130. """Test staging a submodule."""
  131. r = self.repo
  132. s = Repo.init(os.path.join(r.path, "sub"), mkdir=True)
  133. s.get_worktree().commit(
  134. message=b"message",
  135. )
  136. self.worktree.stage(["sub"])
  137. self.assertEqual([b"a", b"sub"], list(r.open_index()))
  138. class WorkTreeUnstagingTests(WorkTreeTestCase):
  139. """Tests for WorkTree unstaging operations."""
  140. def test_unstage_modify_file_with_dir(self):
  141. """Test unstaging a modified file in a directory."""
  142. os.mkdir(os.path.join(self.repo.path, "new_dir"))
  143. full_path = os.path.join(self.repo.path, "new_dir", "foo")
  144. with open(full_path, "w") as f:
  145. f.write("hello")
  146. self.worktree.stage(["new_dir/foo"])
  147. self.worktree.commit(
  148. message=b"unittest",
  149. committer=b"Jane <jane@example.com>",
  150. author=b"John <john@example.com>",
  151. )
  152. with open(full_path, "a") as f:
  153. f.write("something new")
  154. self.worktree.unstage(["new_dir/foo"])
  155. unstaged = get_unstaged_changes(self.repo)
  156. self.assertEqual([os.fsencode(os.path.join("new_dir", "foo"))], unstaged)
  157. def test_unstage_while_no_commit(self):
  158. """Test unstaging when there are no commits."""
  159. file = "foo"
  160. full_path = os.path.join(self.repo.path, file)
  161. with open(full_path, "w") as f:
  162. f.write("hello")
  163. self.worktree.stage([file])
  164. self.worktree.unstage([file])
  165. # Check that file is no longer in index
  166. index = self.repo.open_index()
  167. self.assertNotIn(b"foo", index)
  168. def test_unstage_add_file(self):
  169. """Test unstaging a newly added file."""
  170. file = "foo"
  171. full_path = os.path.join(self.repo.path, file)
  172. self.worktree.commit(
  173. message=b"unittest",
  174. committer=b"Jane <jane@example.com>",
  175. author=b"John <john@example.com>",
  176. )
  177. with open(full_path, "w") as f:
  178. f.write("hello")
  179. self.worktree.stage([file])
  180. self.worktree.unstage([file])
  181. # Check that file is no longer in index
  182. index = self.repo.open_index()
  183. self.assertNotIn(b"foo", index)
  184. def test_unstage_modify_file(self):
  185. """Test unstaging a modified file."""
  186. file = "foo"
  187. full_path = os.path.join(self.repo.path, file)
  188. with open(full_path, "w") as f:
  189. f.write("hello")
  190. self.worktree.stage([file])
  191. self.worktree.commit(
  192. message=b"unittest",
  193. committer=b"Jane <jane@example.com>",
  194. author=b"John <john@example.com>",
  195. )
  196. with open(full_path, "a") as f:
  197. f.write("broken")
  198. self.worktree.stage([file])
  199. self.worktree.unstage([file])
  200. unstaged = get_unstaged_changes(self.repo)
  201. self.assertEqual([os.fsencode("foo")], unstaged)
  202. def test_unstage_remove_file(self):
  203. """Test unstaging a removed file."""
  204. file = "foo"
  205. full_path = os.path.join(self.repo.path, file)
  206. with open(full_path, "w") as f:
  207. f.write("hello")
  208. self.worktree.stage([file])
  209. self.worktree.commit(
  210. message=b"unittest",
  211. committer=b"Jane <jane@example.com>",
  212. author=b"John <john@example.com>",
  213. )
  214. os.remove(full_path)
  215. self.worktree.unstage([file])
  216. unstaged = get_unstaged_changes(self.repo)
  217. self.assertEqual([os.fsencode("foo")], unstaged)
  218. class WorkTreeCommitTests(WorkTreeTestCase):
  219. """Tests for WorkTree commit operations."""
  220. def test_commit_modified(self):
  221. """Test committing a modified file."""
  222. r = self.repo
  223. with open(os.path.join(r.path, "a"), "wb") as f:
  224. f.write(b"new contents")
  225. self.worktree.stage(["a"])
  226. commit_sha = self.worktree.commit(
  227. b"modified a",
  228. committer=b"Test Committer <test@nodomain.com>",
  229. author=b"Test Author <test@nodomain.com>",
  230. commit_timestamp=12395,
  231. commit_timezone=0,
  232. author_timestamp=12395,
  233. author_timezone=0,
  234. )
  235. self.assertEqual([self.root_commit], r[commit_sha].parents)
  236. a_mode, a_id = tree_lookup_path(r.get_object, r[commit_sha].tree, b"a")
  237. self.assertEqual(stat.S_IFREG | 0o644, a_mode)
  238. self.assertEqual(b"new contents", r[a_id].data)
  239. @skipIf(not getattr(os, "symlink", None), "Requires symlink support")
  240. def test_commit_symlink(self):
  241. """Test committing a symlink."""
  242. r = self.repo
  243. os.symlink("a", os.path.join(r.path, "b"))
  244. self.worktree.stage(["a", "b"])
  245. commit_sha = self.worktree.commit(
  246. b"Symlink b",
  247. committer=b"Test Committer <test@nodomain.com>",
  248. author=b"Test Author <test@nodomain.com>",
  249. commit_timestamp=12395,
  250. commit_timezone=0,
  251. author_timestamp=12395,
  252. author_timezone=0,
  253. )
  254. self.assertEqual([self.root_commit], r[commit_sha].parents)
  255. b_mode, b_id = tree_lookup_path(r.get_object, r[commit_sha].tree, b"b")
  256. self.assertEqual(stat.S_IFLNK, b_mode)
  257. self.assertEqual(b"a", r[b_id].data)
  258. class WorkTreeResetTests(WorkTreeTestCase):
  259. """Tests for WorkTree reset operations."""
  260. def test_reset_index(self):
  261. """Test resetting the index."""
  262. # Make some changes and stage them
  263. with open(os.path.join(self.repo.path, "a"), "wb") as f:
  264. f.write(b"modified contents")
  265. self.worktree.stage(["a"])
  266. # Reset index should restore to HEAD
  267. self.worktree.reset_index()
  268. # Check that the working tree file was restored
  269. with open(os.path.join(self.repo.path, "a"), "rb") as f:
  270. contents = f.read()
  271. self.assertEqual(b"contents of file a", contents)
  272. class WorkTreeSparseCheckoutTests(WorkTreeTestCase):
  273. """Tests for WorkTree sparse checkout operations."""
  274. def test_get_sparse_checkout_patterns_empty(self):
  275. """Test getting sparse checkout patterns when file doesn't exist."""
  276. patterns = self.worktree.get_sparse_checkout_patterns()
  277. self.assertEqual([], patterns)
  278. def test_set_sparse_checkout_patterns(self):
  279. """Test setting sparse checkout patterns."""
  280. patterns = ["*.py", "docs/"]
  281. self.worktree.set_sparse_checkout_patterns(patterns)
  282. # Read back the patterns
  283. retrieved_patterns = self.worktree.get_sparse_checkout_patterns()
  284. self.assertEqual(patterns, retrieved_patterns)
  285. def test_configure_for_cone_mode(self):
  286. """Test configuring repository for cone mode."""
  287. self.worktree.configure_for_cone_mode()
  288. config = self.repo.get_config()
  289. self.assertEqual(b"true", config.get((b"core",), b"sparseCheckout"))
  290. self.assertEqual(b"true", config.get((b"core",), b"sparseCheckoutCone"))
  291. def test_infer_cone_mode_false(self):
  292. """Test inferring cone mode when not configured."""
  293. self.assertFalse(self.worktree.infer_cone_mode())
  294. def test_infer_cone_mode_true(self):
  295. """Test inferring cone mode when configured."""
  296. self.worktree.configure_for_cone_mode()
  297. self.assertTrue(self.worktree.infer_cone_mode())
  298. def test_set_cone_mode_patterns(self):
  299. """Test setting cone mode patterns."""
  300. dirs = ["src", "tests"]
  301. self.worktree.set_cone_mode_patterns(dirs)
  302. patterns = self.worktree.get_sparse_checkout_patterns()
  303. expected = ["/*", "!/*/", "/src/", "/tests/"]
  304. self.assertEqual(expected, patterns)
  305. def test_set_cone_mode_patterns_empty(self):
  306. """Test setting cone mode patterns with empty list."""
  307. self.worktree.set_cone_mode_patterns([])
  308. patterns = self.worktree.get_sparse_checkout_patterns()
  309. expected = ["/*", "!/*/"]
  310. self.assertEqual(expected, patterns)
  311. def test_set_cone_mode_patterns_duplicates(self):
  312. """Test that duplicate patterns are not added."""
  313. dirs = ["src", "src"] # duplicate
  314. self.worktree.set_cone_mode_patterns(dirs)
  315. patterns = self.worktree.get_sparse_checkout_patterns()
  316. expected = ["/*", "!/*/", "/src/"]
  317. self.assertEqual(expected, patterns)
  318. def test_sparse_checkout_file_path(self):
  319. """Test getting the sparse checkout file path."""
  320. expected_path = os.path.join(self.repo.controldir(), "info", "sparse-checkout")
  321. actual_path = self.worktree._sparse_checkout_file_path()
  322. self.assertEqual(expected_path, actual_path)
  323. class WorkTreeBackwardCompatibilityTests(WorkTreeTestCase):
  324. """Tests for backward compatibility of deprecated Repo methods."""
  325. def test_deprecated_stage_delegates_to_worktree(self):
  326. """Test that deprecated Repo.stage delegates to WorkTree."""
  327. with open(os.path.join(self.repo.path, "new_file"), "w") as f:
  328. f.write("test content")
  329. # This should show a deprecation warning but still work
  330. import warnings
  331. with warnings.catch_warnings(record=True) as w:
  332. warnings.simplefilter("always")
  333. self.repo.stage(
  334. ["new_file"]
  335. ) # Call deprecated method on Repo, not WorkTree
  336. self.assertTrue(len(w) > 0)
  337. self.assertTrue(issubclass(w[0].category, DeprecationWarning))
  338. def test_deprecated_unstage_delegates_to_worktree(self):
  339. """Test that deprecated Repo.unstage delegates to WorkTree."""
  340. # This should show a deprecation warning but still work
  341. import warnings
  342. with warnings.catch_warnings(record=True) as w:
  343. warnings.simplefilter("always")
  344. self.repo.unstage(["a"]) # Call deprecated method on Repo, not WorkTree
  345. self.assertTrue(len(w) > 0)
  346. self.assertTrue(issubclass(w[0].category, DeprecationWarning))
  347. def test_deprecated_sparse_checkout_methods(self):
  348. """Test that deprecated sparse checkout methods delegate to WorkTree."""
  349. import warnings
  350. # Test get_sparse_checkout_patterns
  351. with warnings.catch_warnings(record=True) as w:
  352. warnings.simplefilter("always")
  353. patterns = (
  354. self.repo.get_sparse_checkout_patterns()
  355. ) # Call deprecated method on Repo
  356. self.assertEqual([], patterns)
  357. self.assertTrue(len(w) > 0)
  358. self.assertTrue(issubclass(w[0].category, DeprecationWarning))
  359. # Test set_sparse_checkout_patterns
  360. with warnings.catch_warnings(record=True) as w:
  361. warnings.simplefilter("always")
  362. self.repo.set_sparse_checkout_patterns(
  363. ["*.py"]
  364. ) # Call deprecated method on Repo
  365. self.assertTrue(len(w) > 0)
  366. self.assertTrue(issubclass(w[0].category, DeprecationWarning))
  367. def test_pre_commit_hook_fail(self):
  368. """Test that failing pre-commit hook raises CommitError."""
  369. if os.name != "posix":
  370. self.skipTest("shell hook tests requires POSIX shell")
  371. # Create a failing pre-commit hook
  372. hooks_dir = os.path.join(self.repo.controldir(), "hooks")
  373. os.makedirs(hooks_dir, exist_ok=True)
  374. hook_path = os.path.join(hooks_dir, "pre-commit")
  375. with open(hook_path, "w") as f:
  376. f.write("#!/bin/sh\nexit 1\n")
  377. os.chmod(hook_path, 0o755)
  378. # Try to commit
  379. worktree = self.repo.get_worktree()
  380. with self.assertRaises(CommitError):
  381. worktree.commit(b"No message")
  382. def write_file(self, filename, content):
  383. """Helper to write a file in the repo."""
  384. with open(os.path.join(self.test_dir, filename), "wb") as f:
  385. f.write(content)
  386. class WorkTreeOperationsTests(WorkTreeTestCase):
  387. """Tests for worktree operations like add, list, remove."""
  388. def test_list_worktrees_single(self) -> None:
  389. """Test listing worktrees when only main worktree exists."""
  390. worktrees = list_worktrees(self.repo)
  391. self.assertEqual(len(worktrees), 1)
  392. self.assertEqual(worktrees[0].path, self.repo.path)
  393. self.assertEqual(worktrees[0].bare, False)
  394. self.assertIsNotNone(worktrees[0].head)
  395. self.assertIsNotNone(worktrees[0].branch)
  396. def test_add_worktree_new_branch(self) -> None:
  397. """Test adding a worktree with a new branch."""
  398. # Create a commit first
  399. worktree = self.repo.get_worktree()
  400. self.write_file("test.txt", b"test content")
  401. worktree.stage(["test.txt"])
  402. commit_id = worktree.commit(message=b"Initial commit")
  403. # Add a new worktree
  404. wt_path = os.path.join(self.tempdir, "new-worktree")
  405. add_worktree(self.repo, wt_path, branch=b"feature-branch")
  406. # Verify worktree was created
  407. self.assertTrue(os.path.exists(wt_path))
  408. self.assertTrue(os.path.exists(os.path.join(wt_path, ".git")))
  409. # Verify it appears in the list
  410. worktrees = list_worktrees(self.repo)
  411. self.assertEqual(len(worktrees), 2)
  412. # Find the new worktree in the list
  413. new_wt = None
  414. for wt in worktrees:
  415. if wt.path == wt_path:
  416. new_wt = wt
  417. break
  418. self.assertIsNotNone(new_wt)
  419. self.assertEqual(new_wt.branch, b"refs/heads/feature-branch")
  420. self.assertEqual(new_wt.head, commit_id)
  421. self.assertFalse(new_wt.detached)
  422. def test_add_worktree_detached(self) -> None:
  423. """Test adding a worktree with detached HEAD."""
  424. # Create a commit
  425. worktree = self.repo.get_worktree()
  426. self.write_file("test.txt", b"test content")
  427. worktree.stage(["test.txt"])
  428. commit_id = worktree.commit(message=b"Initial commit")
  429. # Add a detached worktree
  430. wt_path = os.path.join(self.tempdir, "detached-worktree")
  431. add_worktree(self.repo, wt_path, commit=commit_id, detach=True)
  432. # Verify it's detached
  433. worktrees = list_worktrees(self.repo)
  434. self.assertEqual(len(worktrees), 2)
  435. for wt in worktrees:
  436. if wt.path == wt_path:
  437. self.assertTrue(wt.detached)
  438. self.assertIsNone(wt.branch)
  439. self.assertEqual(wt.head, commit_id)
  440. def test_add_worktree_existing_path(self) -> None:
  441. """Test that adding a worktree to existing path fails."""
  442. wt_path = os.path.join(self.tempdir, "existing")
  443. os.mkdir(wt_path)
  444. with self.assertRaises(ValueError) as cm:
  445. add_worktree(self.repo, wt_path)
  446. self.assertIn("Path already exists", str(cm.exception))
  447. def test_add_worktree_branch_already_checked_out(self) -> None:
  448. """Test that checking out same branch in multiple worktrees fails."""
  449. # Create initial commit
  450. worktree = self.repo.get_worktree()
  451. self.write_file("test.txt", b"test content")
  452. worktree.stage(["test.txt"])
  453. worktree.commit(message=b"Initial commit")
  454. # First worktree should succeed with a new branch
  455. wt_path1 = os.path.join(self.tempdir, "wt1")
  456. add_worktree(self.repo, wt_path1, branch=b"feature")
  457. # Second worktree with same branch should fail
  458. wt_path2 = os.path.join(self.tempdir, "wt2")
  459. with self.assertRaises(ValueError) as cm:
  460. add_worktree(self.repo, wt_path2, branch=b"feature")
  461. self.assertIn("already checked out", str(cm.exception))
  462. # But should work with force=True
  463. add_worktree(self.repo, wt_path2, branch=b"feature", force=True)
  464. def test_remove_worktree(self) -> None:
  465. """Test removing a worktree."""
  466. # Create a worktree
  467. wt_path = os.path.join(self.tempdir, "to-remove")
  468. add_worktree(self.repo, wt_path)
  469. # Verify it exists
  470. self.assertTrue(os.path.exists(wt_path))
  471. self.assertEqual(len(list_worktrees(self.repo)), 2)
  472. # Remove it
  473. remove_worktree(self.repo, wt_path)
  474. # Verify it's gone
  475. self.assertFalse(os.path.exists(wt_path))
  476. self.assertEqual(len(list_worktrees(self.repo)), 1)
  477. def test_remove_main_worktree_fails(self) -> None:
  478. """Test that removing the main worktree fails."""
  479. with self.assertRaises(ValueError) as cm:
  480. remove_worktree(self.repo, self.repo.path)
  481. self.assertIn("Cannot remove the main working tree", str(cm.exception))
  482. def test_remove_nonexistent_worktree(self) -> None:
  483. """Test that removing non-existent worktree fails."""
  484. with self.assertRaises(ValueError) as cm:
  485. remove_worktree(self.repo, "/nonexistent/path")
  486. self.assertIn("Worktree not found", str(cm.exception))
  487. def test_lock_unlock_worktree(self) -> None:
  488. """Test locking and unlocking a worktree."""
  489. # Create a worktree
  490. wt_path = os.path.join(self.tempdir, "lockable")
  491. add_worktree(self.repo, wt_path)
  492. # Lock it
  493. lock_worktree(self.repo, wt_path, reason="Testing lock")
  494. # Verify it's locked
  495. worktrees = list_worktrees(self.repo)
  496. for wt in worktrees:
  497. if wt.path == wt_path:
  498. self.assertTrue(wt.locked)
  499. # Try to remove locked worktree (should fail)
  500. with self.assertRaises(ValueError) as cm:
  501. remove_worktree(self.repo, wt_path)
  502. self.assertIn("locked", str(cm.exception))
  503. # Unlock it
  504. unlock_worktree(self.repo, wt_path)
  505. # Verify it's unlocked
  506. worktrees = list_worktrees(self.repo)
  507. for wt in worktrees:
  508. if wt.path == wt_path:
  509. self.assertFalse(wt.locked)
  510. # Now removal should work
  511. remove_worktree(self.repo, wt_path)
  512. def test_prune_worktrees(self) -> None:
  513. """Test pruning worktrees."""
  514. # Create a worktree
  515. wt_path = os.path.join(self.tempdir, "to-prune")
  516. add_worktree(self.repo, wt_path)
  517. # Manually remove the worktree directory
  518. shutil.rmtree(wt_path)
  519. # Verify it still shows up as prunable
  520. worktrees = list_worktrees(self.repo)
  521. prunable_count = sum(1 for wt in worktrees if wt.prunable)
  522. self.assertEqual(prunable_count, 1)
  523. # Prune it
  524. pruned = prune_worktrees(self.repo)
  525. self.assertEqual(len(pruned), 1)
  526. # Verify it's gone from the list
  527. worktrees = list_worktrees(self.repo)
  528. self.assertEqual(len(worktrees), 1)
  529. def test_prune_dry_run(self) -> None:
  530. """Test prune with dry_run doesn't remove anything."""
  531. # Create and manually remove a worktree
  532. wt_path = os.path.join(self.tempdir, "dry-run-test")
  533. add_worktree(self.repo, wt_path)
  534. shutil.rmtree(wt_path)
  535. # Dry run should report but not remove
  536. pruned = prune_worktrees(self.repo, dry_run=True)
  537. self.assertEqual(len(pruned), 1)
  538. # Worktree should still be in list
  539. worktrees = list_worktrees(self.repo)
  540. self.assertEqual(len(worktrees), 2)
  541. def test_prune_locked_worktree_not_pruned(self) -> None:
  542. """Test that locked worktrees are not pruned."""
  543. # Create and lock a worktree
  544. wt_path = os.path.join(self.tempdir, "locked-prune")
  545. add_worktree(self.repo, wt_path)
  546. lock_worktree(self.repo, wt_path)
  547. # Remove the directory
  548. shutil.rmtree(wt_path)
  549. # Prune should not remove locked worktree
  550. pruned = prune_worktrees(self.repo)
  551. self.assertEqual(len(pruned), 0)
  552. # Worktree should still be in list
  553. worktrees = list_worktrees(self.repo)
  554. self.assertEqual(len(worktrees), 2)
  555. def test_move_worktree(self) -> None:
  556. """Test moving a worktree."""
  557. # Create a worktree
  558. wt_path = os.path.join(self.tempdir, "to-move")
  559. add_worktree(self.repo, wt_path)
  560. # Create a file in the worktree
  561. test_file = os.path.join(wt_path, "test.txt")
  562. with open(test_file, "w") as f:
  563. f.write("test content")
  564. # Move it
  565. new_path = os.path.join(self.tempdir, "moved")
  566. move_worktree(self.repo, wt_path, new_path)
  567. # Verify old path doesn't exist
  568. self.assertFalse(os.path.exists(wt_path))
  569. # Verify new path exists with contents
  570. self.assertTrue(os.path.exists(new_path))
  571. self.assertTrue(os.path.exists(os.path.join(new_path, "test.txt")))
  572. # Verify it's in the list at new location
  573. worktrees = list_worktrees(self.repo)
  574. paths = [wt.path for wt in worktrees]
  575. self.assertIn(new_path, paths)
  576. self.assertNotIn(wt_path, paths)
  577. def test_move_main_worktree_fails(self) -> None:
  578. """Test that moving the main worktree fails."""
  579. new_path = os.path.join(self.tempdir, "new-main")
  580. with self.assertRaises(ValueError) as cm:
  581. move_worktree(self.repo, self.repo.path, new_path)
  582. self.assertIn("Cannot move the main working tree", str(cm.exception))
  583. def test_move_to_existing_path_fails(self) -> None:
  584. """Test that moving to an existing path fails."""
  585. # Create a worktree
  586. wt_path = os.path.join(self.tempdir, "worktree")
  587. add_worktree(self.repo, wt_path)
  588. # Create target directory
  589. new_path = os.path.join(self.tempdir, "existing")
  590. os.makedirs(new_path)
  591. with self.assertRaises(ValueError) as cm:
  592. move_worktree(self.repo, wt_path, new_path)
  593. self.assertIn("Path already exists", str(cm.exception))
  594. def test_repair_worktree_after_manual_move(self) -> None:
  595. """Test repairing a worktree after manually moving it."""
  596. # Create a worktree
  597. wt_path = os.path.join(self.tempdir, "original")
  598. add_worktree(self.repo, wt_path)
  599. # Manually move the worktree directory (simulating external move)
  600. new_path = os.path.join(self.tempdir, "moved")
  601. shutil.move(wt_path, new_path)
  602. # At this point, the connection is broken
  603. # Repair from the moved worktree
  604. repaired = repair_worktree(self.repo, paths=[new_path])
  605. # Should have repaired the worktree
  606. self.assertEqual(len(repaired), 1)
  607. self.assertEqual(repaired[0], new_path)
  608. # Verify the worktree is now properly connected
  609. worktrees = list_worktrees(self.repo)
  610. paths = [wt.path for wt in worktrees]
  611. self.assertIn(new_path, paths)
  612. def test_repair_worktree_from_main_repo(self) -> None:
  613. """Test repairing worktree connections from main repository."""
  614. # Create a worktree
  615. wt_path = os.path.join(self.tempdir, "worktree")
  616. add_worktree(self.repo, wt_path)
  617. # Read the .git file to get the control directory
  618. gitdir_file = os.path.join(wt_path, ".git")
  619. with open(gitdir_file, "rb") as f:
  620. content = f.read().strip()
  621. control_dir = content[8:].decode() # Remove "gitdir: " prefix
  622. # Manually corrupt the .git file to point to wrong location
  623. with open(gitdir_file, "wb") as f:
  624. f.write(b"gitdir: /wrong/path\n")
  625. # Repair from main repository
  626. repaired = repair_worktree(self.repo)
  627. # Should have repaired the connection
  628. self.assertEqual(len(repaired), 1)
  629. self.assertEqual(repaired[0], wt_path)
  630. # Verify .git file now points to correct location
  631. with open(gitdir_file, "rb") as f:
  632. content = f.read().strip()
  633. new_control_dir = content[8:].decode()
  634. self.assertEqual(
  635. os.path.abspath(new_control_dir), os.path.abspath(control_dir)
  636. )
  637. def test_repair_worktree_no_repairs_needed(self) -> None:
  638. """Test repair when no repairs are needed."""
  639. # Create a worktree
  640. wt_path = os.path.join(self.tempdir, "worktree")
  641. add_worktree(self.repo, wt_path)
  642. # Repair - should return empty list since nothing is broken
  643. repaired = repair_worktree(self.repo)
  644. self.assertEqual(len(repaired), 0)
  645. def test_repair_invalid_worktree_path(self) -> None:
  646. """Test that repairing an invalid path raises an error."""
  647. with self.assertRaises(ValueError) as cm:
  648. repair_worktree(self.repo, paths=["/nonexistent/path"])
  649. self.assertIn("Not a valid worktree", str(cm.exception))
  650. def test_repair_multiple_worktrees(self) -> None:
  651. """Test repairing multiple worktrees at once."""
  652. # Create two worktrees
  653. wt_path1 = os.path.join(self.tempdir, "wt1")
  654. wt_path2 = os.path.join(self.tempdir, "wt2")
  655. add_worktree(self.repo, wt_path1, branch=b"branch1")
  656. add_worktree(self.repo, wt_path2, branch=b"branch2")
  657. # Manually move both worktrees
  658. new_path1 = os.path.join(self.tempdir, "moved1")
  659. new_path2 = os.path.join(self.tempdir, "moved2")
  660. shutil.move(wt_path1, new_path1)
  661. shutil.move(wt_path2, new_path2)
  662. # Repair both at once
  663. repaired = repair_worktree(self.repo, paths=[new_path1, new_path2])
  664. # Both should be repaired
  665. self.assertEqual(len(repaired), 2)
  666. self.assertIn(new_path1, repaired)
  667. self.assertIn(new_path2, repaired)
  668. def test_repair_worktree_with_relative_paths(self) -> None:
  669. """Test that repair handles worktrees with relative paths in gitdir."""
  670. # Create a worktree
  671. wt_path = os.path.join(self.tempdir, "worktree")
  672. add_worktree(self.repo, wt_path)
  673. # Manually move the worktree
  674. new_path = os.path.join(self.tempdir, "new-location")
  675. shutil.move(wt_path, new_path)
  676. # Repair from the new location
  677. repaired = repair_worktree(self.repo, paths=[new_path])
  678. # Should have repaired successfully
  679. self.assertEqual(len(repaired), 1)
  680. self.assertEqual(repaired[0], new_path)
  681. # Verify the gitdir pointer was updated
  682. from dulwich.repo import GITDIR, WORKTREES
  683. worktrees_dir = os.path.join(self.repo.controldir(), WORKTREES)
  684. for entry in os.listdir(worktrees_dir):
  685. gitdir_path = os.path.join(worktrees_dir, entry, GITDIR)
  686. if os.path.exists(gitdir_path):
  687. with open(gitdir_path, "rb") as f:
  688. content = f.read().strip()
  689. gitdir_location = os.fsdecode(content)
  690. # Should point to the new .git file location
  691. self.assertTrue(gitdir_location.endswith(".git"))
  692. def test_repair_worktree_container_method(self) -> None:
  693. """Test the WorkTreeContainer.repair() method."""
  694. # Create a worktree
  695. wt_path = os.path.join(self.tempdir, "worktree")
  696. add_worktree(self.repo, wt_path)
  697. # Manually move it
  698. new_path = os.path.join(self.tempdir, "moved")
  699. shutil.move(wt_path, new_path)
  700. # Use the container method to repair
  701. repaired = self.repo.worktrees.repair(paths=[new_path])
  702. # Should have repaired
  703. self.assertEqual(len(repaired), 1)
  704. self.assertEqual(repaired[0], new_path)
  705. def test_repair_with_missing_gitdir_pointer(self) -> None:
  706. """Test repair when gitdir pointer file is missing."""
  707. # Create a worktree
  708. wt_path = os.path.join(self.tempdir, "worktree")
  709. add_worktree(self.repo, wt_path)
  710. # Find and remove the gitdir pointer file
  711. from dulwich.repo import GITDIR, WORKTREES
  712. worktrees_dir = os.path.join(self.repo.controldir(), WORKTREES)
  713. for entry in os.listdir(worktrees_dir):
  714. gitdir_path = os.path.join(worktrees_dir, entry, GITDIR)
  715. if os.path.exists(gitdir_path):
  716. os.remove(gitdir_path)
  717. # Repair should not crash, but won't repair anything
  718. repaired = repair_worktree(self.repo, paths=[wt_path])
  719. self.assertEqual(len(repaired), 0)
  720. def test_repair_worktree_with_corrupted_git_file(self) -> None:
  721. """Test repair with a corrupted .git file."""
  722. # Create a worktree
  723. wt_path = os.path.join(self.tempdir, "worktree")
  724. add_worktree(self.repo, wt_path)
  725. # Corrupt the .git file
  726. gitdir_file = os.path.join(wt_path, ".git")
  727. with open(gitdir_file, "wb") as f:
  728. f.write(b"invalid content\n")
  729. # Attempting to repair should raise an error
  730. with self.assertRaises(ValueError) as cm:
  731. repair_worktree(self.repo, paths=[wt_path])
  732. self.assertIn("Invalid .git file", str(cm.exception))
  733. class TemporaryWorktreeTests(TestCase):
  734. """Tests for temporary_worktree context manager."""
  735. def setUp(self) -> None:
  736. super().setUp()
  737. self.tempdir = tempfile.mkdtemp()
  738. self.addCleanup(shutil.rmtree, self.tempdir)
  739. self.repo_path = os.path.join(self.tempdir, "repo")
  740. self.repo = Repo.init(self.repo_path, mkdir=True)
  741. # Create an initial commit so HEAD exists
  742. readme_path = os.path.join(self.repo_path, "README.md")
  743. with open(readme_path, "w") as f:
  744. f.write("# Test Repository\n")
  745. wt = self.repo.get_worktree()
  746. wt.stage(["README.md"])
  747. wt.commit(message=b"Initial commit")
  748. def test_temporary_worktree_creates_and_cleans_up(self) -> None:
  749. """Test that temporary worktree is created and cleaned up."""
  750. worktree_path = None
  751. # Use the context manager
  752. with temporary_worktree(self.repo) as worktree:
  753. worktree_path = worktree.path
  754. # Check that worktree exists
  755. self.assertTrue(os.path.exists(worktree_path))
  756. # Check that it's in the list of worktrees
  757. worktrees = list_worktrees(self.repo)
  758. paths = [wt.path for wt in worktrees]
  759. self.assertIn(worktree_path, paths)
  760. # Check that .git file exists in worktree
  761. gitdir_file = os.path.join(worktree_path, ".git")
  762. self.assertTrue(os.path.exists(gitdir_file))
  763. # After context manager exits, check cleanup
  764. self.assertFalse(os.path.exists(worktree_path))
  765. # Check that it's no longer in the list of worktrees
  766. worktrees = list_worktrees(self.repo)
  767. paths = [wt.path for wt in worktrees]
  768. self.assertNotIn(worktree_path, paths)
  769. def test_temporary_worktree_with_custom_prefix(self) -> None:
  770. """Test temporary worktree with custom prefix."""
  771. custom_prefix = "my-custom-prefix-"
  772. with temporary_worktree(self.repo, prefix=custom_prefix) as worktree:
  773. # Check that the directory name starts with our prefix
  774. dirname = os.path.basename(worktree.path)
  775. self.assertTrue(dirname.startswith(custom_prefix))
  776. def test_temporary_worktree_cleanup_on_exception(self) -> None:
  777. """Test that cleanup happens even when exception is raised."""
  778. worktree_path = None
  779. class TestException(Exception):
  780. pass
  781. try:
  782. with temporary_worktree(self.repo) as worktree:
  783. worktree_path = worktree.path
  784. self.assertTrue(os.path.exists(worktree_path))
  785. raise TestException("Test exception")
  786. except TestException:
  787. pass
  788. # Cleanup should still happen
  789. self.assertFalse(os.path.exists(worktree_path))
  790. # Check that it's no longer in the list of worktrees
  791. worktrees = list_worktrees(self.repo)
  792. paths = [wt.path for wt in worktrees]
  793. self.assertNotIn(worktree_path, paths)
  794. def test_temporary_worktree_operations(self) -> None:
  795. """Test that operations can be performed in temporary worktree."""
  796. # Create a test file in main repo
  797. test_file = os.path.join(self.repo_path, "test.txt")
  798. with open(test_file, "w") as f:
  799. f.write("Hello, world!")
  800. wt = self.repo.get_worktree()
  801. wt.stage(["test.txt"])
  802. wt.commit(message=b"Initial commit")
  803. with temporary_worktree(self.repo) as worktree:
  804. # Check that the file exists in the worktree
  805. wt_test_file = os.path.join(worktree.path, "test.txt")
  806. self.assertTrue(os.path.exists(wt_test_file))
  807. # Read and verify content
  808. with open(wt_test_file) as f:
  809. content = f.read()
  810. self.assertEqual(content, "Hello, world!")
  811. # Make changes in the worktree
  812. with open(wt_test_file, "w") as f:
  813. f.write("Modified content")
  814. # Changes should be visible as unstaged
  815. unstaged = get_unstaged_changes(worktree)
  816. self.assertIn(os.fsencode("test.txt"), unstaged)