test_repository.py 91 KB


  1. # test_repository.py -- tests for repository.py
  2. # Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for the repository."""
  22. import glob
  23. import locale
  24. import os
  25. import shutil
  26. import stat
  27. import sys
  28. import tempfile
  29. import time
  30. import warnings
  31. from dulwich import errors, objects
  32. from dulwich.config import Config
  33. from dulwich.errors import NotGitRepository
  34. from dulwich.index import get_unstaged_changes as _get_unstaged_changes
  35. from dulwich.object_store import tree_lookup_path
  36. from dulwich.repo import (
  37. InvalidUserIdentity,
  38. MemoryRepo,
  39. Repo,
  40. UnsupportedExtension,
  41. UnsupportedVersion,
  42. check_user_identity,
  43. )
  44. from dulwich.tests.utils import open_repo, setup_warning_catcher, tear_down_repo
  45. from . import TestCase, skipIf
  46. missing_sha = b"b91fa4d900e17e99b433218e988c4eb4a3e9a097"
  47. def get_unstaged_changes(repo):
  48. """Helper to get unstaged changes for a repo."""
  49. index = repo.open_index()
  50. normalizer = repo.get_blob_normalizer()
  51. filter_callback = normalizer.checkin_normalize if normalizer else None
  52. return list(_get_unstaged_changes(index, repo.path, filter_callback, False))
  53. class CreateRepositoryTests(TestCase):
  54. def assertFileContentsEqual(self, expected, repo, path) -> None:
  55. f = repo.get_named_file(path)
  56. if not f:
  57. self.assertEqual(expected, None)
  58. else:
  59. with f:
  60. self.assertEqual(expected, f.read())
  61. def _check_repo_contents(self, repo, expect_bare) -> None:
  62. self.assertEqual(expect_bare, repo.bare)
  63. self.assertFileContentsEqual(b"Unnamed repository", repo, "description")
  64. self.assertFileContentsEqual(b"", repo, os.path.join("info", "exclude"))
  65. self.assertFileContentsEqual(None, repo, "nonexistent file")
  66. barestr = b"bare = " + str(expect_bare).lower().encode("ascii")
  67. with repo.get_named_file("config") as f:
  68. config_text = f.read()
  69. self.assertIn(barestr, config_text, f"{config_text!r}")
  70. expect_filemode = sys.platform != "win32"
  71. barestr = b"filemode = " + str(expect_filemode).lower().encode("ascii")
  72. with repo.get_named_file("config") as f:
  73. config_text = f.read()
  74. self.assertIn(barestr, config_text, f"{config_text!r}")
  75. if isinstance(repo, Repo):
  76. expected_mode = "0o100644" if expect_filemode else "0o100666"
  77. expected = {
  78. "HEAD": expected_mode,
  79. "config": expected_mode,
  80. "description": expected_mode,
  81. }
  82. actual = {
  83. f[len(repo._controldir) + 1 :]: oct(os.stat(f).st_mode)
  84. for f in glob.glob(os.path.join(repo._controldir, "*"))
  85. if os.path.isfile(f)
  86. }
  87. self.assertEqual(expected, actual)
  88. def test_create_memory(self) -> None:
  89. repo = MemoryRepo.init_bare([], {})
  90. self._check_repo_contents(repo, True)
  91. def test_create_disk_bare(self) -> None:
  92. tmp_dir = tempfile.mkdtemp()
  93. self.addCleanup(shutil.rmtree, tmp_dir)
  94. repo = Repo.init_bare(tmp_dir)
  95. self.assertEqual(tmp_dir, repo._controldir)
  96. self._check_repo_contents(repo, True)
  97. def test_create_disk_non_bare(self) -> None:
  98. tmp_dir = tempfile.mkdtemp()
  99. self.addCleanup(shutil.rmtree, tmp_dir)
  100. repo = Repo.init(tmp_dir)
  101. self.assertEqual(os.path.join(tmp_dir, ".git"), repo._controldir)
  102. self._check_repo_contents(repo, False)
  103. def test_create_disk_non_bare_mkdir(self) -> None:
  104. tmp_dir = tempfile.mkdtemp()
  105. target_dir = os.path.join(tmp_dir, "target")
  106. self.addCleanup(shutil.rmtree, tmp_dir)
  107. repo = Repo.init(target_dir, mkdir=True)
  108. self.assertEqual(os.path.join(target_dir, ".git"), repo._controldir)
  109. self._check_repo_contents(repo, False)
  110. def test_create_disk_bare_mkdir(self) -> None:
  111. tmp_dir = tempfile.mkdtemp()
  112. target_dir = os.path.join(tmp_dir, "target")
  113. self.addCleanup(shutil.rmtree, tmp_dir)
  114. repo = Repo.init_bare(target_dir, mkdir=True)
  115. self.assertEqual(target_dir, repo._controldir)
  116. self._check_repo_contents(repo, True)
  117. def test_create_disk_bare_pathlib(self) -> None:
  118. from pathlib import Path
  119. tmp_dir = tempfile.mkdtemp()
  120. self.addCleanup(shutil.rmtree, tmp_dir)
  121. repo_path = Path(tmp_dir)
  122. repo = Repo.init_bare(repo_path)
  123. self.assertEqual(tmp_dir, repo._controldir)
  124. self._check_repo_contents(repo, True)
  125. # Test that refpath works with pathlib
  126. ref_path = repo.refs.refpath(b"refs/heads/master")
  127. self.assertTrue(isinstance(ref_path, bytes))
  128. expected_path = os.path.join(tmp_dir.encode(), b"refs", b"heads", b"master")
  129. self.assertEqual(ref_path, expected_path)
  130. def test_create_disk_non_bare_pathlib(self) -> None:
  131. from pathlib import Path
  132. tmp_dir = tempfile.mkdtemp()
  133. self.addCleanup(shutil.rmtree, tmp_dir)
  134. repo_path = Path(tmp_dir)
  135. repo = Repo.init(repo_path)
  136. self.assertEqual(os.path.join(tmp_dir, ".git"), repo._controldir)
  137. self._check_repo_contents(repo, False)
  138. def test_open_repo_pathlib(self) -> None:
  139. from pathlib import Path
  140. tmp_dir = tempfile.mkdtemp()
  141. self.addCleanup(shutil.rmtree, tmp_dir)
  142. # First create a repo
  143. repo = Repo.init_bare(tmp_dir)
  144. repo.close()
  145. # Now open it with pathlib
  146. repo_path = Path(tmp_dir)
  147. repo2 = Repo(repo_path)
  148. self.assertEqual(tmp_dir, repo2._controldir)
  149. self.assertTrue(repo2.bare)
  150. repo2.close()
  151. def test_create_disk_bare_mkdir_pathlib(self) -> None:
  152. from pathlib import Path
  153. tmp_dir = tempfile.mkdtemp()
  154. target_path = Path(tmp_dir) / "target"
  155. self.addCleanup(shutil.rmtree, tmp_dir)
  156. repo = Repo.init_bare(target_path, mkdir=True)
  157. self.assertEqual(str(target_path), repo._controldir)
  158. self._check_repo_contents(repo, True)
  159. class MemoryRepoTests(TestCase):
  160. def test_set_description(self) -> None:
  161. r = MemoryRepo.init_bare([], {})
  162. description = b"Some description"
  163. r.set_description(description)
  164. self.assertEqual(description, r.get_description())
  165. def test_pull_into(self) -> None:
  166. r = MemoryRepo.init_bare([], {})
  167. repo = open_repo("a.git")
  168. self.addCleanup(tear_down_repo, repo)
  169. repo.fetch(r)
  170. def test_fetch_from_git_cloned_repo(self) -> None:
  171. """Test fetching from a git-cloned repo into MemoryRepo (issue #1179)."""
  172. import tempfile
  173. from dulwich.client import LocalGitClient
  174. with tempfile.TemporaryDirectory() as tmpdir:
  175. # Create initial repo using dulwich
  176. initial_path = os.path.join(tmpdir, "initial")
  177. initial_repo = Repo.init(initial_path, mkdir=True)
  178. # Create some content
  179. test_file = os.path.join(initial_path, "test.txt")
  180. with open(test_file, "w") as f:
  181. f.write("test content\n")
  182. # Stage and commit using dulwich
  183. initial_repo.get_worktree().stage(["test.txt"])
  184. initial_repo.get_worktree().commit(
  185. message=b"Initial commit\n",
  186. committer=b"Test Committer <test@example.com>",
  187. author=b"Test Author <test@example.com>",
  188. )
  189. # Clone using dulwich
  190. cloned_path = os.path.join(tmpdir, "cloned")
  191. cloned_repo = initial_repo.clone(cloned_path, mkdir=True)
  192. initial_repo.close()
  193. cloned_repo.close()
  194. # Fetch from the cloned repo into MemoryRepo
  195. memory_repo = MemoryRepo()
  196. client = LocalGitClient()
  197. # This should not raise AssertionError
  198. result = client.fetch(cloned_path, memory_repo)
  199. # Verify the fetch worked
  200. self.assertIn(b"HEAD", result.refs)
  201. self.assertIn(b"refs/heads/master", result.refs)
  202. # Verify we can read the fetched objects
  203. head_sha = result.refs[b"HEAD"]
  204. commit = memory_repo[head_sha]
  205. self.assertEqual(commit.message, b"Initial commit\n")
  206. class RepositoryRootTests(TestCase):
  207. def mkdtemp(self):
  208. return tempfile.mkdtemp()
  209. def open_repo(self, name):
  210. temp_dir = self.mkdtemp()
  211. repo = open_repo(name, temp_dir)
  212. self.addCleanup(tear_down_repo, repo)
  213. return repo
  214. def test_simple_props(self) -> None:
  215. r = self.open_repo("a.git")
  216. self.assertEqual(r.controldir(), r.path)
  217. def test_setitem(self) -> None:
  218. r = self.open_repo("a.git")
  219. r[b"refs/tags/foo"] = b"a90fa2d900a17e99b433217e988c4eb4a2e9a097"
  220. self.assertEqual(
  221. b"a90fa2d900a17e99b433217e988c4eb4a2e9a097", r[b"refs/tags/foo"].id
  222. )
  223. def test_getitem_unicode(self) -> None:
  224. r = self.open_repo("a.git")
  225. test_keys = [
  226. (b"refs/heads/master", True),
  227. (b"a90fa2d900a17e99b433217e988c4eb4a2e9a097", True),
  228. (b"11" * 19 + b"--", False),
  229. ]
  230. for k, contained in test_keys:
  231. self.assertEqual(k in r, contained)
  232. # Avoid deprecation warning under Py3.2+
  233. if getattr(self, "assertRaisesRegex", None):
  234. assertRaisesRegexp = self.assertRaisesRegex
  235. else:
  236. assertRaisesRegexp = self.assertRaisesRegexp
  237. for k, _ in test_keys:
  238. assertRaisesRegexp(
  239. TypeError,
  240. "'name' must be bytestring, not int",
  241. r.__getitem__,
  242. 12,
  243. )
  244. def test_delitem(self) -> None:
  245. r = self.open_repo("a.git")
  246. del r[b"refs/heads/master"]
  247. self.assertRaises(KeyError, lambda: r[b"refs/heads/master"])
  248. del r[b"HEAD"]
  249. self.assertRaises(KeyError, lambda: r[b"HEAD"])
  250. self.assertRaises(ValueError, r.__delitem__, b"notrefs/foo")
  251. def test_getitem_32_byte_ref(self) -> None:
  252. """Test that accessing a ref name that's 32 bytes long works (issue #2040)."""
  253. r = self.open_repo("a.git")
  254. # Create a ref with exactly 32 bytes
  255. ref_name = b"refs/heads/feat-backend-refactor"
  256. self.assertEqual(len(ref_name), 32)
  257. r[ref_name] = b"a90fa2d900a17e99b433217e988c4eb4a2e9a097"
  258. # This should not raise AssertionError
  259. obj = r[ref_name]
  260. self.assertEqual(obj.id, b"a90fa2d900a17e99b433217e988c4eb4a2e9a097")
  261. def test_get_refs(self) -> None:
  262. r = self.open_repo("a.git")
  263. self.assertEqual(
  264. {
  265. b"HEAD": b"a90fa2d900a17e99b433217e988c4eb4a2e9a097",
  266. b"refs/heads/master": b"a90fa2d900a17e99b433217e988c4eb4a2e9a097",
  267. b"refs/tags/mytag": b"28237f4dc30d0d462658d6b937b08a0f0b6ef55a",
  268. b"refs/tags/mytag-packed": b"b0931cadc54336e78a1d980420e3268903b57a50",
  269. },
  270. r.get_refs(),
  271. )
  272. def test_head(self) -> None:
  273. r = self.open_repo("a.git")
  274. self.assertEqual(r.head(), b"a90fa2d900a17e99b433217e988c4eb4a2e9a097")
  275. def test_get_object(self) -> None:
  276. r = self.open_repo("a.git")
  277. obj = r.get_object(r.head())
  278. self.assertEqual(obj.type_name, b"commit")
  279. def test_get_object_non_existant(self) -> None:
  280. r = self.open_repo("a.git")
  281. self.assertRaises(KeyError, r.get_object, missing_sha)
  282. def test_contains_object(self) -> None:
  283. r = self.open_repo("a.git")
  284. self.assertIn(r.head(), r)
  285. self.assertNotIn(b"z" * 40, r)
  286. def test_contains_ref(self) -> None:
  287. r = self.open_repo("a.git")
  288. self.assertIn(b"HEAD", r)
  289. def test_get_no_description(self) -> None:
  290. r = self.open_repo("a.git")
  291. self.assertIs(None, r.get_description())
  292. def test_get_description(self) -> None:
  293. r = self.open_repo("a.git")
  294. with open(os.path.join(r.path, "description"), "wb") as f:
  295. f.write(b"Some description")
  296. self.assertEqual(b"Some description", r.get_description())
  297. def test_set_description(self) -> None:
  298. r = self.open_repo("a.git")
  299. description = b"Some description"
  300. r.set_description(description)
  301. self.assertEqual(description, r.get_description())
  302. def test_get_gitattributes(self) -> None:
  303. # Test when no .gitattributes file exists
  304. r = self.open_repo("a.git")
  305. attrs = r.get_gitattributes()
  306. from dulwich.attrs import GitAttributes
  307. self.assertIsInstance(attrs, GitAttributes)
  308. self.assertEqual(len(attrs), 0)
  309. # Create .git/info/attributes file (which is read by get_gitattributes)
  310. info_dir = os.path.join(r.controldir(), "info")
  311. if not os.path.exists(info_dir):
  312. os.makedirs(info_dir)
  313. attrs_path = os.path.join(info_dir, "attributes")
  314. with open(attrs_path, "wb") as f:
  315. f.write(b"*.txt text\n")
  316. f.write(b"*.jpg -text binary\n")
  317. # Test with attributes file
  318. attrs = r.get_gitattributes()
  319. self.assertEqual(len(attrs), 2)
  320. # Test matching
  321. txt_attrs = attrs.match_path(b"file.txt")
  322. self.assertEqual(txt_attrs, {b"text": True})
  323. jpg_attrs = attrs.match_path(b"image.jpg")
  324. self.assertEqual(jpg_attrs, {b"text": False, b"binary": True})
  325. def test_contains_missing(self) -> None:
  326. r = self.open_repo("a.git")
  327. self.assertNotIn(b"bar", r)
  328. def test_get_peeled(self) -> None:
  329. # unpacked ref
  330. r = self.open_repo("a.git")
  331. tag_sha = b"28237f4dc30d0d462658d6b937b08a0f0b6ef55a"
  332. self.assertNotEqual(r[tag_sha].sha().hexdigest(), r.head())
  333. self.assertEqual(r.get_peeled(b"refs/tags/mytag"), r.head())
  334. # packed ref with cached peeled value
  335. packed_tag_sha = b"b0931cadc54336e78a1d980420e3268903b57a50"
  336. parent_sha = r[r.head()].parents[0]
  337. self.assertNotEqual(r[packed_tag_sha].sha().hexdigest(), parent_sha)
  338. self.assertEqual(r.get_peeled(b"refs/tags/mytag-packed"), parent_sha)
  339. # TODO: add more corner cases to test repo
  340. def test_get_peeled_not_tag(self) -> None:
  341. r = self.open_repo("a.git")
  342. self.assertEqual(r.get_peeled(b"HEAD"), r.head())
  343. def test_get_parents(self) -> None:
  344. r = self.open_repo("a.git")
  345. self.assertEqual(
  346. [b"2a72d929692c41d8554c07f6301757ba18a65d91"],
  347. r.get_parents(b"a90fa2d900a17e99b433217e988c4eb4a2e9a097"),
  348. )
  349. r.update_shallow([b"a90fa2d900a17e99b433217e988c4eb4a2e9a097"], None)
  350. self.assertEqual([], r.get_parents(b"a90fa2d900a17e99b433217e988c4eb4a2e9a097"))
  351. def test_get_walker(self) -> None:
  352. r = self.open_repo("a.git")
  353. # include defaults to [r.head()]
  354. self.assertEqual(
  355. [e.commit.id for e in r.get_walker()],
  356. [r.head(), b"2a72d929692c41d8554c07f6301757ba18a65d91"],
  357. )
  358. self.assertEqual(
  359. [
  360. e.commit.id
  361. for e in r.get_walker([b"2a72d929692c41d8554c07f6301757ba18a65d91"])
  362. ],
  363. [b"2a72d929692c41d8554c07f6301757ba18a65d91"],
  364. )
  365. self.assertEqual(
  366. [
  367. e.commit.id
  368. for e in r.get_walker(b"2a72d929692c41d8554c07f6301757ba18a65d91")
  369. ],
  370. [b"2a72d929692c41d8554c07f6301757ba18a65d91"],
  371. )
  372. def assertFilesystemHidden(self, path) -> None:
  373. if sys.platform != "win32":
  374. return
  375. import ctypes
  376. from ctypes.wintypes import DWORD, LPCWSTR
  377. GetFileAttributesW = ctypes.WINFUNCTYPE(DWORD, LPCWSTR)(
  378. ("GetFileAttributesW", ctypes.windll.kernel32)
  379. )
  380. self.assertTrue(2 & GetFileAttributesW(path))
  381. def test_init_existing(self) -> None:
  382. tmp_dir = self.mkdtemp()
  383. self.addCleanup(shutil.rmtree, tmp_dir)
  384. t = Repo.init(tmp_dir)
  385. self.addCleanup(t.close)
  386. self.assertEqual(os.listdir(tmp_dir), [".git"])
  387. self.assertFilesystemHidden(os.path.join(tmp_dir, ".git"))
  388. def test_init_mkdir(self) -> None:
  389. tmp_dir = self.mkdtemp()
  390. self.addCleanup(shutil.rmtree, tmp_dir)
  391. repo_dir = os.path.join(tmp_dir, "a-repo")
  392. t = Repo.init(repo_dir, mkdir=True)
  393. self.addCleanup(t.close)
  394. self.assertEqual(os.listdir(repo_dir), [".git"])
  395. self.assertFilesystemHidden(os.path.join(repo_dir, ".git"))
  396. def test_init_mkdir_unicode(self) -> None:
  397. repo_name = "\xa7"
  398. try:
  399. os.fsencode(repo_name)
  400. except UnicodeEncodeError:
  401. self.skipTest("filesystem lacks unicode support")
  402. tmp_dir = self.mkdtemp()
  403. self.addCleanup(shutil.rmtree, tmp_dir)
  404. repo_dir = os.path.join(tmp_dir, repo_name)
  405. t = Repo.init(repo_dir, mkdir=True)
  406. self.addCleanup(t.close)
  407. self.assertEqual(os.listdir(repo_dir), [".git"])
  408. self.assertFilesystemHidden(os.path.join(repo_dir, ".git"))
  409. def test_init_format(self) -> None:
  410. tmp_dir = self.mkdtemp()
  411. self.addCleanup(shutil.rmtree, tmp_dir)
  412. # Test format 0
  413. t0 = Repo.init(tmp_dir + "0", mkdir=True, format=0)
  414. self.addCleanup(t0.close)
  415. self.assertEqual(t0.get_config().get("core", "repositoryformatversion"), b"0")
  416. # Test format 1
  417. t1 = Repo.init(tmp_dir + "1", mkdir=True, format=1)
  418. self.addCleanup(t1.close)
  419. self.assertEqual(t1.get_config().get("core", "repositoryformatversion"), b"1")
  420. # Test default format
  421. td = Repo.init(tmp_dir + "d", mkdir=True)
  422. self.addCleanup(td.close)
  423. self.assertEqual(td.get_config().get("core", "repositoryformatversion"), b"0")
  424. # Test invalid format
  425. with self.assertRaises(ValueError):
  426. Repo.init(tmp_dir + "bad", mkdir=True, format=99)
  427. def test_init_bare_format(self) -> None:
  428. tmp_dir = self.mkdtemp()
  429. self.addCleanup(shutil.rmtree, tmp_dir)
  430. # Test format 1 for bare repo
  431. t = Repo.init_bare(tmp_dir + "bare", mkdir=True, format=1)
  432. self.addCleanup(t.close)
  433. self.assertEqual(t.get_config().get("core", "repositoryformatversion"), b"1")
  434. # Test invalid format for bare repo
  435. with self.assertRaises(ValueError):
  436. Repo.init_bare(tmp_dir + "badbr", mkdir=True, format=2)
  437. @skipIf(sys.platform == "win32", "fails on Windows")
  438. def test_fetch(self) -> None:
  439. r = self.open_repo("a.git")
  440. tmp_dir = self.mkdtemp()
  441. self.addCleanup(shutil.rmtree, tmp_dir)
  442. t = Repo.init(tmp_dir)
  443. self.addCleanup(t.close)
  444. r.fetch(t)
  445. self.assertIn(b"a90fa2d900a17e99b433217e988c4eb4a2e9a097", t)
  446. self.assertIn(b"a90fa2d900a17e99b433217e988c4eb4a2e9a097", t)
  447. self.assertIn(b"a90fa2d900a17e99b433217e988c4eb4a2e9a097", t)
  448. self.assertIn(b"28237f4dc30d0d462658d6b937b08a0f0b6ef55a", t)
  449. self.assertIn(b"b0931cadc54336e78a1d980420e3268903b57a50", t)
  450. @skipIf(sys.platform == "win32", "fails on Windows")
  451. def test_fetch_ignores_missing_refs(self) -> None:
  452. r = self.open_repo("a.git")
  453. missing = b"1234566789123456789123567891234657373833"
  454. r.refs[b"refs/heads/blah"] = missing
  455. tmp_dir = self.mkdtemp()
  456. self.addCleanup(shutil.rmtree, tmp_dir)
  457. t = Repo.init(tmp_dir)
  458. self.addCleanup(t.close)
  459. with self.assertLogs(level="WARNING"):
  460. r.fetch(t)
  461. self.assertIn(b"a90fa2d900a17e99b433217e988c4eb4a2e9a097", t)
  462. self.assertIn(b"a90fa2d900a17e99b433217e988c4eb4a2e9a097", t)
  463. self.assertIn(b"a90fa2d900a17e99b433217e988c4eb4a2e9a097", t)
  464. self.assertIn(b"28237f4dc30d0d462658d6b937b08a0f0b6ef55a", t)
  465. self.assertIn(b"b0931cadc54336e78a1d980420e3268903b57a50", t)
  466. self.assertNotIn(missing, t)
  467. def test_clone(self) -> None:
  468. r = self.open_repo("a.git")
  469. tmp_dir = self.mkdtemp()
  470. self.addCleanup(shutil.rmtree, tmp_dir)
  471. with r.clone(tmp_dir, mkdir=False) as t:
  472. self.assertEqual(
  473. {
  474. b"HEAD": b"a90fa2d900a17e99b433217e988c4eb4a2e9a097",
  475. b"refs/remotes/origin/master": b"a90fa2d900a17e99b433217e988c4eb4a2e9a097",
  476. b"refs/remotes/origin/HEAD": b"a90fa2d900a17e99b433217e988c4eb4a2e9a097",
  477. b"refs/heads/master": b"a90fa2d900a17e99b433217e988c4eb4a2e9a097",
  478. b"refs/tags/mytag": b"28237f4dc30d0d462658d6b937b08a0f0b6ef55a",
  479. b"refs/tags/mytag-packed": b"b0931cadc54336e78a1d980420e3268903b57a50",
  480. },
  481. t.refs.as_dict(),
  482. )
  483. shas = [e.commit.id for e in r.get_walker()]
  484. self.assertEqual(
  485. shas, [t.head(), b"2a72d929692c41d8554c07f6301757ba18a65d91"]
  486. )
  487. c = t.get_config()
  488. encoded_path = r.path
  489. if not isinstance(encoded_path, bytes):
  490. encoded_path = os.fsencode(encoded_path)
  491. self.assertEqual(encoded_path, c.get((b"remote", b"origin"), b"url"))
  492. self.assertEqual(
  493. b"+refs/heads/*:refs/remotes/origin/*",
  494. c.get((b"remote", b"origin"), b"fetch"),
  495. )
  496. def test_clone_no_head(self) -> None:
  497. temp_dir = self.mkdtemp()
  498. self.addCleanup(shutil.rmtree, temp_dir)
  499. repo_dir = os.path.join(os.path.dirname(__file__), "..", "testdata", "repos")
  500. dest_dir = os.path.join(temp_dir, "a.git")
  501. shutil.copytree(os.path.join(repo_dir, "a.git"), dest_dir, symlinks=True)
  502. r = Repo(dest_dir)
  503. self.addCleanup(r.close)
  504. del r.refs[b"refs/heads/master"]
  505. del r.refs[b"HEAD"]
  506. t = r.clone(os.path.join(temp_dir, "b.git"), mkdir=True)
  507. self.addCleanup(t.close)
  508. self.assertEqual(
  509. {
  510. b"refs/tags/mytag": b"28237f4dc30d0d462658d6b937b08a0f0b6ef55a",
  511. b"refs/tags/mytag-packed": b"b0931cadc54336e78a1d980420e3268903b57a50",
  512. },
  513. t.refs.as_dict(),
  514. )
  515. def test_clone_empty(self) -> None:
  516. """Test clone() doesn't crash if HEAD points to a non-existing ref.
  517. This simulates cloning server-side bare repository either when it is
  518. still empty or if user renames master branch and pushes private repo
  519. to the server.
  520. Non-bare repo HEAD always points to an existing ref.
  521. """
  522. r = self.open_repo("empty.git")
  523. tmp_dir = self.mkdtemp()
  524. self.addCleanup(shutil.rmtree, tmp_dir)
  525. r.clone(tmp_dir, mkdir=False, bare=True)
  526. def test_reset_index_symlink_enabled(self) -> None:
  527. if sys.platform == "win32":
  528. self.skipTest("symlinks are not supported on Windows")
  529. tmp_dir = self.mkdtemp()
  530. self.addCleanup(shutil.rmtree, tmp_dir)
  531. o = Repo.init(os.path.join(tmp_dir, "s"), mkdir=True)
  532. os.symlink("foo", os.path.join(tmp_dir, "s", "bar"))
  533. o.get_worktree().stage("bar")
  534. o.get_worktree().commit(
  535. message=b"add symlink",
  536. )
  537. t = o.clone(os.path.join(tmp_dir, "t"), symlinks=True)
  538. o.close()
  539. bar_path = os.path.join(tmp_dir, "t", "bar")
  540. if sys.platform == "win32":
  541. with open(bar_path) as f:
  542. self.assertEqual("foo", f.read())
  543. else:
  544. self.assertEqual("foo", os.readlink(bar_path))
  545. t.close()
  546. def test_reset_index_symlink_disabled(self) -> None:
  547. tmp_dir = self.mkdtemp()
  548. self.addCleanup(shutil.rmtree, tmp_dir)
  549. o = Repo.init(os.path.join(tmp_dir, "s"), mkdir=True)
  550. self.addCleanup(o.close)
  551. os.symlink("foo", os.path.join(tmp_dir, "s", "bar"))
  552. o.get_worktree().stage("bar")
  553. o.get_worktree().commit(
  554. message=b"add symlink",
  555. )
  556. t = o.clone(os.path.join(tmp_dir, "t"), symlinks=False)
  557. self.addCleanup(t.close)
  558. with open(os.path.join(tmp_dir, "t", "bar")) as f:
  559. self.assertEqual("foo", f.read())
  560. def test_reset_index_protect_hfs(self) -> None:
  561. tmp_dir = self.mkdtemp()
  562. self.addCleanup(shutil.rmtree, tmp_dir)
  563. repo = Repo.init(tmp_dir)
  564. self.addCleanup(repo.close)
  565. config = repo.get_config()
  566. # Test with protectHFS enabled
  567. config.set(b"core", b"core.protectHFS", b"true")
  568. config.write_to_path()
  569. # Create a file with HFS+ Unicode attack vector
  570. # This uses a zero-width non-joiner to create ".g\u200cit"
  571. attack_name = b".g\xe2\x80\x8cit"
  572. attack_path = os.path.join(tmp_dir, attack_name.decode("utf-8"))
  573. os.mkdir(attack_path)
  574. # Try to stage the malicious path - should be rejected
  575. with self.assertRaises(ValueError):
  576. repo.get_worktree().stage([attack_name])
  577. # Test with protectHFS disabled
  578. config.set(b"core", b"core.protectHFS", b"false")
  579. config.write_to_path()
  580. # Now it should work (though still dangerous!)
  581. # We're not actually staging it to avoid creating a dangerous repo
  582. def test_clone_bare(self) -> None:
  583. r = self.open_repo("a.git")
  584. tmp_dir = self.mkdtemp()
  585. self.addCleanup(shutil.rmtree, tmp_dir)
  586. t = r.clone(tmp_dir, mkdir=False)
  587. t.close()
  588. def test_clone_checkout_and_bare(self) -> None:
  589. r = self.open_repo("a.git")
  590. tmp_dir = self.mkdtemp()
  591. self.addCleanup(shutil.rmtree, tmp_dir)
  592. self.assertRaises(
  593. ValueError, r.clone, tmp_dir, mkdir=False, checkout=True, bare=True
  594. )
  595. def test_clone_branch(self) -> None:
  596. r = self.open_repo("a.git")
  597. r.refs[b"refs/heads/mybranch"] = b"28237f4dc30d0d462658d6b937b08a0f0b6ef55a"
  598. tmp_dir = self.mkdtemp()
  599. self.addCleanup(shutil.rmtree, tmp_dir)
  600. with r.clone(tmp_dir, mkdir=False, branch=b"mybranch") as t:
  601. # HEAD should point to specified branch and not origin HEAD
  602. chain, sha = t.refs.follow(b"HEAD")
  603. self.assertEqual(chain[-1], b"refs/heads/mybranch")
  604. self.assertEqual(sha, b"28237f4dc30d0d462658d6b937b08a0f0b6ef55a")
  605. self.assertEqual(
  606. t.refs[b"refs/remotes/origin/HEAD"],
  607. b"a90fa2d900a17e99b433217e988c4eb4a2e9a097",
  608. )
  609. def test_clone_tag(self) -> None:
  610. r = self.open_repo("a.git")
  611. tmp_dir = self.mkdtemp()
  612. self.addCleanup(shutil.rmtree, tmp_dir)
  613. with r.clone(tmp_dir, mkdir=False, branch=b"mytag") as t:
  614. # HEAD should be detached (and not a symbolic ref) at tag
  615. self.assertEqual(
  616. t.refs.read_ref(b"HEAD"),
  617. b"28237f4dc30d0d462658d6b937b08a0f0b6ef55a",
  618. )
  619. self.assertEqual(
  620. t.refs[b"refs/remotes/origin/HEAD"],
  621. b"a90fa2d900a17e99b433217e988c4eb4a2e9a097",
  622. )
  623. def test_clone_invalid_branch(self) -> None:
  624. r = self.open_repo("a.git")
  625. tmp_dir = self.mkdtemp()
  626. self.addCleanup(shutil.rmtree, tmp_dir)
  627. self.assertRaises(
  628. ValueError,
  629. r.clone,
  630. tmp_dir,
  631. mkdir=False,
  632. branch=b"mybranch",
  633. )
  634. def test_merge_history(self) -> None:
  635. r = self.open_repo("simple_merge.git")
  636. shas = [e.commit.id for e in r.get_walker()]
  637. self.assertEqual(
  638. shas,
  639. [
  640. b"5dac377bdded4c9aeb8dff595f0faeebcc8498cc",
  641. b"ab64bbdcc51b170d21588e5c5d391ee5c0c96dfd",
  642. b"4cffe90e0a41ad3f5190079d7c8f036bde29cbe6",
  643. b"60dacdc733de308bb77bb76ce0fb0f9b44c9769e",
  644. b"0d89f20333fbb1d2f3a94da77f4981373d8f4310",
  645. ],
  646. )
  647. def test_out_of_order_merge(self) -> None:
  648. """Test that revision history is ordered by date, not parent order."""
  649. r = self.open_repo("ooo_merge.git")
  650. shas = [e.commit.id for e in r.get_walker()]
  651. self.assertEqual(
  652. shas,
  653. [
  654. b"7601d7f6231db6a57f7bbb79ee52e4d462fd44d1",
  655. b"f507291b64138b875c28e03469025b1ea20bc614",
  656. b"fb5b0425c7ce46959bec94d54b9a157645e114f5",
  657. b"f9e39b120c68182a4ba35349f832d0e4e61f485c",
  658. ],
  659. )
  660. def test_get_tags_empty(self) -> None:
  661. r = self.open_repo("ooo_merge.git")
  662. self.assertEqual({}, r.refs.as_dict(b"refs/tags"))
  663. def test_get_config(self) -> None:
  664. r = self.open_repo("ooo_merge.git")
  665. self.assertIsInstance(r.get_config(), Config)
  666. def test_get_config_stack(self) -> None:
  667. r = self.open_repo("ooo_merge.git")
  668. self.assertIsInstance(r.get_config_stack(), Config)
  669. def test_common_revisions(self) -> None:
  670. """This test demonstrates that ``find_common_revisions()`` actually
  671. returns common heads, not revisions; dulwich already uses
  672. ``find_common_revisions()`` in such a manner (see
  673. ``Repo.find_objects()``).
  674. """
  675. expected_shas = {b"60dacdc733de308bb77bb76ce0fb0f9b44c9769e"}
  676. # Source for objects.
  677. r_base = self.open_repo("simple_merge.git")
  678. # Re-create each-side of the merge in simple_merge.git.
  679. #
  680. # Since the trees and blobs are missing, the repository created is
  681. # corrupted, but we're only checking for commits for the purpose of
  682. # this test, so it's immaterial.
  683. r1_dir = self.mkdtemp()
  684. self.addCleanup(shutil.rmtree, r1_dir)
  685. r1_commits = [
  686. b"ab64bbdcc51b170d21588e5c5d391ee5c0c96dfd", # HEAD
  687. b"60dacdc733de308bb77bb76ce0fb0f9b44c9769e",
  688. b"0d89f20333fbb1d2f3a94da77f4981373d8f4310",
  689. ]
  690. r2_dir = self.mkdtemp()
  691. self.addCleanup(shutil.rmtree, r2_dir)
  692. r2_commits = [
  693. b"4cffe90e0a41ad3f5190079d7c8f036bde29cbe6", # HEAD
  694. b"60dacdc733de308bb77bb76ce0fb0f9b44c9769e",
  695. b"0d89f20333fbb1d2f3a94da77f4981373d8f4310",
  696. ]
  697. r1 = Repo.init_bare(r1_dir)
  698. for c in r1_commits:
  699. r1.object_store.add_object(r_base.get_object(c))
  700. r1.refs[b"HEAD"] = r1_commits[0]
  701. r2 = Repo.init_bare(r2_dir)
  702. for c in r2_commits:
  703. r2.object_store.add_object(r_base.get_object(c))
  704. r2.refs[b"HEAD"] = r2_commits[0]
  705. # Finally, the 'real' testing!
  706. shas = r2.object_store.find_common_revisions(r1.get_graph_walker())
  707. self.assertEqual(set(shas), expected_shas)
  708. shas = r1.object_store.find_common_revisions(r2.get_graph_walker())
  709. self.assertEqual(set(shas), expected_shas)
  710. def test_shell_hook_pre_commit(self) -> None:
  711. if os.name != "posix":
  712. self.skipTest("shell hook tests requires POSIX shell")
  713. pre_commit_fail = """#!/bin/sh
  714. exit 1
  715. """
  716. pre_commit_success = """#!/bin/sh
  717. exit 0
  718. """
  719. repo_dir = os.path.join(self.mkdtemp())
  720. self.addCleanup(shutil.rmtree, repo_dir)
  721. r = Repo.init(repo_dir)
  722. self.addCleanup(r.close)
  723. pre_commit = os.path.join(r.controldir(), "hooks", "pre-commit")
  724. with open(pre_commit, "w") as f:
  725. f.write(pre_commit_fail)
  726. os.chmod(pre_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
  727. self.assertRaises(
  728. errors.CommitError,
  729. r.get_worktree().commit,
  730. b"failed commit",
  731. committer=b"Test Committer <test@nodomain.com>",
  732. author=b"Test Author <test@nodomain.com>",
  733. commit_timestamp=12345,
  734. commit_timezone=0,
  735. author_timestamp=12345,
  736. author_timezone=0,
  737. )
  738. with open(pre_commit, "w") as f:
  739. f.write(pre_commit_success)
  740. os.chmod(pre_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
  741. commit_sha = r.get_worktree().commit(
  742. message=b"empty commit",
  743. committer=b"Test Committer <test@nodomain.com>",
  744. author=b"Test Author <test@nodomain.com>",
  745. commit_timestamp=12395,
  746. commit_timezone=0,
  747. author_timestamp=12395,
  748. author_timezone=0,
  749. )
  750. self.assertEqual([], r[commit_sha].parents)
  751. def test_shell_hook_commit_msg(self) -> None:
  752. if os.name != "posix":
  753. self.skipTest("shell hook tests requires POSIX shell")
  754. commit_msg_fail = """#!/bin/sh
  755. exit 1
  756. """
  757. commit_msg_success = """#!/bin/sh
  758. exit 0
  759. """
  760. repo_dir = self.mkdtemp()
  761. self.addCleanup(shutil.rmtree, repo_dir)
  762. r = Repo.init(repo_dir)
  763. self.addCleanup(r.close)
  764. commit_msg = os.path.join(r.controldir(), "hooks", "commit-msg")
  765. with open(commit_msg, "w") as f:
  766. f.write(commit_msg_fail)
  767. os.chmod(commit_msg, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
  768. self.assertRaises(
  769. errors.CommitError,
  770. r.get_worktree().commit,
  771. b"failed commit",
  772. committer=b"Test Committer <test@nodomain.com>",
  773. author=b"Test Author <test@nodomain.com>",
  774. commit_timestamp=12345,
  775. commit_timezone=0,
  776. author_timestamp=12345,
  777. author_timezone=0,
  778. )
  779. with open(commit_msg, "w") as f:
  780. f.write(commit_msg_success)
  781. os.chmod(commit_msg, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
  782. commit_sha = r.get_worktree().commit(
  783. message=b"empty commit",
  784. committer=b"Test Committer <test@nodomain.com>",
  785. author=b"Test Author <test@nodomain.com>",
  786. commit_timestamp=12395,
  787. commit_timezone=0,
  788. author_timestamp=12395,
  789. author_timezone=0,
  790. )
  791. self.assertEqual([], r[commit_sha].parents)
  792. def test_shell_hook_pre_commit_add_files(self) -> None:
  793. if os.name != "posix":
  794. self.skipTest("shell hook tests requires POSIX shell")
  795. pre_commit_contents = """#!{executable}
  796. import sys
  797. sys.path.extend({path!r})
  798. from dulwich.repo import Repo
  799. with open('foo', 'w') as f:
  800. f.write('newfile')
  801. r = Repo('.')
  802. r.get_worktree().stage(['foo'])
  803. """.format(
  804. executable=sys.executable,
  805. path=[os.path.join(os.path.dirname(__file__), "..", ".."), *sys.path],
  806. )
  807. repo_dir = os.path.join(self.mkdtemp())
  808. self.addCleanup(shutil.rmtree, repo_dir)
  809. r = Repo.init(repo_dir)
  810. self.addCleanup(r.close)
  811. with open(os.path.join(repo_dir, "blah"), "w") as f:
  812. f.write("blah")
  813. r.get_worktree().stage(["blah"])
  814. pre_commit = os.path.join(r.controldir(), "hooks", "pre-commit")
  815. with open(pre_commit, "w") as f:
  816. f.write(pre_commit_contents)
  817. os.chmod(pre_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
  818. commit_sha = r.get_worktree().commit(
  819. message=b"new commit",
  820. committer=b"Test Committer <test@nodomain.com>",
  821. author=b"Test Author <test@nodomain.com>",
  822. commit_timestamp=12395,
  823. commit_timezone=0,
  824. author_timestamp=12395,
  825. author_timezone=0,
  826. )
  827. self.assertEqual([], r[commit_sha].parents)
  828. tree = r[r[commit_sha].tree]
  829. self.assertEqual({b"blah", b"foo"}, set(tree))
  830. def test_shell_hook_post_commit(self) -> None:
  831. if os.name != "posix":
  832. self.skipTest("shell hook tests requires POSIX shell")
  833. repo_dir = self.mkdtemp()
  834. self.addCleanup(shutil.rmtree, repo_dir)
  835. r = Repo.init(repo_dir)
  836. self.addCleanup(r.close)
  837. (fd, path) = tempfile.mkstemp(dir=repo_dir)
  838. os.close(fd)
  839. post_commit_msg = (
  840. """#!/bin/sh
  841. rm """
  842. + path
  843. + """
  844. """
  845. )
  846. root_sha = r.get_worktree().commit(
  847. message=b"empty commit",
  848. committer=b"Test Committer <test@nodomain.com>",
  849. author=b"Test Author <test@nodomain.com>",
  850. commit_timestamp=12345,
  851. commit_timezone=0,
  852. author_timestamp=12345,
  853. author_timezone=0,
  854. )
  855. self.assertEqual([], r[root_sha].parents)
  856. post_commit = os.path.join(r.controldir(), "hooks", "post-commit")
  857. with open(post_commit, "wb") as f:
  858. f.write(post_commit_msg.encode(locale.getpreferredencoding()))
  859. os.chmod(post_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
  860. commit_sha = r.get_worktree().commit(
  861. message=b"empty commit",
  862. committer=b"Test Committer <test@nodomain.com>",
  863. author=b"Test Author <test@nodomain.com>",
  864. commit_timestamp=12345,
  865. commit_timezone=0,
  866. author_timestamp=12345,
  867. author_timezone=0,
  868. )
  869. self.assertEqual([root_sha], r[commit_sha].parents)
  870. self.assertFalse(os.path.exists(path))
  871. post_commit_msg_fail = """#!/bin/sh
  872. exit 1
  873. """
  874. with open(post_commit, "w") as f:
  875. f.write(post_commit_msg_fail)
  876. os.chmod(post_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
  877. warnings.simplefilter("always", UserWarning)
  878. self.addCleanup(warnings.resetwarnings)
  879. warnings_list, restore_warnings = setup_warning_catcher()
  880. self.addCleanup(restore_warnings)
  881. commit_sha2 = r.get_worktree().commit(
  882. message=b"empty commit",
  883. committer=b"Test Committer <test@nodomain.com>",
  884. author=b"Test Author <test@nodomain.com>",
  885. commit_timestamp=12345,
  886. commit_timezone=0,
  887. author_timestamp=12345,
  888. author_timezone=0,
  889. )
  890. expected_warning = UserWarning(
  891. "post-commit hook failed: Hook post-commit exited with non-zero status 1",
  892. )
  893. for w in warnings_list:
  894. if type(w) is type(expected_warning) and w.args == expected_warning.args:
  895. break
  896. else:
  897. raise AssertionError(
  898. f"Expected warning {expected_warning!r} not in {warnings_list!r}"
  899. )
  900. self.assertEqual([commit_sha], r[commit_sha2].parents)
  901. def test_as_dict(self) -> None:
  902. def check(repo) -> None:
  903. self.assertEqual(
  904. repo.refs.subkeys(b"refs/tags"),
  905. repo.refs.subkeys(b"refs/tags/"),
  906. )
  907. self.assertEqual(
  908. repo.refs.as_dict(b"refs/tags"),
  909. repo.refs.as_dict(b"refs/tags/"),
  910. )
  911. self.assertEqual(
  912. repo.refs.as_dict(b"refs/heads"),
  913. repo.refs.as_dict(b"refs/heads/"),
  914. )
  915. bare = self.open_repo("a.git")
  916. tmp_dir = self.mkdtemp()
  917. self.addCleanup(shutil.rmtree, tmp_dir)
  918. with bare.clone(tmp_dir, mkdir=False) as nonbare:
  919. check(nonbare)
  920. check(bare)
  921. def test_working_tree(self) -> None:
  922. temp_dir = tempfile.mkdtemp()
  923. self.addCleanup(shutil.rmtree, temp_dir)
  924. worktree_temp_dir = tempfile.mkdtemp()
  925. self.addCleanup(shutil.rmtree, worktree_temp_dir)
  926. r = Repo.init(temp_dir)
  927. self.addCleanup(r.close)
  928. root_sha = r.get_worktree().commit(
  929. message=b"empty commit",
  930. committer=b"Test Committer <test@nodomain.com>",
  931. author=b"Test Author <test@nodomain.com>",
  932. commit_timestamp=12345,
  933. commit_timezone=0,
  934. author_timestamp=12345,
  935. author_timezone=0,
  936. )
  937. r.refs[b"refs/heads/master"] = root_sha
  938. w = Repo._init_new_working_directory(worktree_temp_dir, r)
  939. self.addCleanup(w.close)
  940. new_sha = w.get_worktree().commit(
  941. message=b"new commit",
  942. committer=b"Test Committer <test@nodomain.com>",
  943. author=b"Test Author <test@nodomain.com>",
  944. commit_timestamp=12345,
  945. commit_timezone=0,
  946. author_timestamp=12345,
  947. author_timezone=0,
  948. )
  949. w.refs[b"HEAD"] = new_sha
  950. self.assertEqual(
  951. os.path.abspath(r.controldir()), os.path.abspath(w.commondir())
  952. )
  953. self.assertEqual(r.refs.keys(), w.refs.keys())
  954. self.assertNotEqual(r.head(), w.head())
  955. class BuildRepoRootTests(TestCase):
  956. """Tests that build on-disk repos from scratch.
  957. Repos live in a temp dir and are torn down after each test. They start with
  958. a single commit in master having single file named 'a'.
  959. """
  960. def get_repo_dir(self):
  961. return os.path.join(tempfile.mkdtemp(), "test")
  962. def setUp(self) -> None:
  963. super().setUp()
  964. self._repo_dir = self.get_repo_dir()
  965. os.makedirs(self._repo_dir)
  966. r = self._repo = Repo.init(self._repo_dir)
  967. self.addCleanup(tear_down_repo, r)
  968. self.assertFalse(r.bare)
  969. self.assertEqual(b"ref: refs/heads/master", r.refs.read_ref(b"HEAD"))
  970. self.assertRaises(KeyError, lambda: r.refs[b"refs/heads/master"])
  971. with open(os.path.join(r.path, "a"), "wb") as f:
  972. f.write(b"file contents")
  973. r.get_worktree().stage(["a"])
  974. commit_sha = r.get_worktree().commit(
  975. message=b"msg",
  976. committer=b"Test Committer <test@nodomain.com>",
  977. author=b"Test Author <test@nodomain.com>",
  978. commit_timestamp=12345,
  979. commit_timezone=0,
  980. author_timestamp=12345,
  981. author_timezone=0,
  982. )
  983. self.assertEqual([], r[commit_sha].parents)
  984. self._root_commit = commit_sha
  985. def test_get_shallow(self) -> None:
  986. self.assertEqual(set(), self._repo.get_shallow())
  987. with open(os.path.join(self._repo.path, ".git", "shallow"), "wb") as f:
  988. f.write(b"a90fa2d900a17e99b433217e988c4eb4a2e9a097\n")
  989. self.assertEqual(
  990. {b"a90fa2d900a17e99b433217e988c4eb4a2e9a097"},
  991. self._repo.get_shallow(),
  992. )
  993. def test_update_shallow(self) -> None:
  994. self._repo.update_shallow(None, None) # no op
  995. self.assertEqual(set(), self._repo.get_shallow())
  996. self._repo.update_shallow([b"a90fa2d900a17e99b433217e988c4eb4a2e9a097"], None)
  997. self.assertEqual(
  998. {b"a90fa2d900a17e99b433217e988c4eb4a2e9a097"},
  999. self._repo.get_shallow(),
  1000. )
  1001. self._repo.update_shallow(
  1002. [b"a90fa2d900a17e99b433217e988c4eb4a2e9a097"],
  1003. [b"f9e39b120c68182a4ba35349f832d0e4e61f485c"],
  1004. )
  1005. self.assertEqual(
  1006. {b"a90fa2d900a17e99b433217e988c4eb4a2e9a097"},
  1007. self._repo.get_shallow(),
  1008. )
  1009. self._repo.update_shallow(None, [b"a90fa2d900a17e99b433217e988c4eb4a2e9a097"])
  1010. self.assertEqual(set(), self._repo.get_shallow())
  1011. self.assertEqual(
  1012. False,
  1013. os.path.exists(os.path.join(self._repo.controldir(), "shallow")),
  1014. )
  1015. def test_build_repo(self) -> None:
  1016. r = self._repo
  1017. self.assertEqual(b"ref: refs/heads/master", r.refs.read_ref(b"HEAD"))
  1018. self.assertEqual(self._root_commit, r.refs[b"refs/heads/master"])
  1019. expected_blob = objects.Blob.from_string(b"file contents")
  1020. self.assertEqual(expected_blob.data, r[expected_blob.id].data)
  1021. actual_commit = r[self._root_commit]
  1022. self.assertEqual(b"msg", actual_commit.message)
  1023. def test_commit_modified(self) -> None:
  1024. r = self._repo
  1025. with open(os.path.join(r.path, "a"), "wb") as f:
  1026. f.write(b"new contents")
  1027. r.get_worktree().stage(["a"])
  1028. commit_sha = r.get_worktree().commit(
  1029. message=b"modified a",
  1030. committer=b"Test Committer <test@nodomain.com>",
  1031. author=b"Test Author <test@nodomain.com>",
  1032. commit_timestamp=12395,
  1033. commit_timezone=0,
  1034. author_timestamp=12395,
  1035. author_timezone=0,
  1036. )
  1037. self.assertEqual([self._root_commit], r[commit_sha].parents)
  1038. a_mode, a_id = tree_lookup_path(r.get_object, r[commit_sha].tree, b"a")
  1039. self.assertEqual(stat.S_IFREG | 0o644, a_mode)
  1040. self.assertEqual(b"new contents", r[a_id].data)
  1041. @skipIf(not getattr(os, "symlink", None), "Requires symlink support")
  1042. def test_commit_symlink(self) -> None:
  1043. r = self._repo
  1044. os.symlink("a", os.path.join(r.path, "b"))
  1045. r.get_worktree().stage(["a", "b"])
  1046. commit_sha = r.get_worktree().commit(
  1047. message=b"Symlink b",
  1048. committer=b"Test Committer <test@nodomain.com>",
  1049. author=b"Test Author <test@nodomain.com>",
  1050. commit_timestamp=12395,
  1051. commit_timezone=0,
  1052. author_timestamp=12395,
  1053. author_timezone=0,
  1054. )
  1055. self.assertEqual([self._root_commit], r[commit_sha].parents)
  1056. b_mode, b_id = tree_lookup_path(r.get_object, r[commit_sha].tree, b"b")
  1057. self.assertTrue(stat.S_ISLNK(b_mode))
  1058. self.assertEqual(b"a", r[b_id].data)
  1059. def test_commit_merge_heads_file(self) -> None:
  1060. tmp_dir = tempfile.mkdtemp()
  1061. self.addCleanup(shutil.rmtree, tmp_dir)
  1062. r = Repo.init(tmp_dir)
  1063. with open(os.path.join(r.path, "a"), "w") as f:
  1064. f.write("initial text")
  1065. c1 = r.get_worktree().commit(
  1066. message=b"initial commit",
  1067. committer=b"Test Committer <test@nodomain.com>",
  1068. author=b"Test Author <test@nodomain.com>",
  1069. commit_timestamp=12395,
  1070. commit_timezone=0,
  1071. author_timestamp=12395,
  1072. author_timezone=0,
  1073. )
  1074. with open(os.path.join(r.path, "a"), "w") as f:
  1075. f.write("merged text")
  1076. with open(os.path.join(r.path, ".git", "MERGE_HEAD"), "w") as f:
  1077. f.write("c27a2d21dd136312d7fa9e8baabb82561a1727d0\n")
  1078. r.get_worktree().stage(["a"])
  1079. commit_sha = r.get_worktree().commit(
  1080. message=b"deleted a",
  1081. committer=b"Test Committer <test@nodomain.com>",
  1082. author=b"Test Author <test@nodomain.com>",
  1083. commit_timestamp=12395,
  1084. commit_timezone=0,
  1085. author_timestamp=12395,
  1086. author_timezone=0,
  1087. )
  1088. self.assertEqual(
  1089. [c1, b"c27a2d21dd136312d7fa9e8baabb82561a1727d0"],
  1090. r[commit_sha].parents,
  1091. )
  1092. def test_commit_deleted(self) -> None:
  1093. r = self._repo
  1094. os.remove(os.path.join(r.path, "a"))
  1095. r.get_worktree().stage(["a"])
  1096. commit_sha = r.get_worktree().commit(
  1097. message=b"deleted a",
  1098. committer=b"Test Committer <test@nodomain.com>",
  1099. author=b"Test Author <test@nodomain.com>",
  1100. commit_timestamp=12395,
  1101. commit_timezone=0,
  1102. author_timestamp=12395,
  1103. author_timezone=0,
  1104. )
  1105. self.assertEqual([self._root_commit], r[commit_sha].parents)
  1106. self.assertEqual([], list(r.open_index()))
  1107. tree = r[r[commit_sha].tree]
  1108. self.assertEqual([], list(tree.iteritems()))
  1109. def test_commit_follows(self) -> None:
  1110. r = self._repo
  1111. r.refs.set_symbolic_ref(b"HEAD", b"refs/heads/bla")
  1112. commit_sha = r.get_worktree().commit(
  1113. message=b"commit with strange character",
  1114. committer=b"Test Committer <test@nodomain.com>",
  1115. author=b"Test Author <test@nodomain.com>",
  1116. commit_timestamp=12395,
  1117. commit_timezone=0,
  1118. author_timestamp=12395,
  1119. author_timezone=0,
  1120. ref=b"HEAD",
  1121. )
  1122. self.assertEqual(commit_sha, r[b"refs/heads/bla"].id)
  1123. def test_commit_encoding(self) -> None:
  1124. r = self._repo
  1125. commit_sha = r.get_worktree().commit(
  1126. message=b"commit with strange character \xee",
  1127. committer=b"Test Committer <test@nodomain.com>",
  1128. author=b"Test Author <test@nodomain.com>",
  1129. commit_timestamp=12395,
  1130. commit_timezone=0,
  1131. author_timestamp=12395,
  1132. author_timezone=0,
  1133. encoding=b"iso8859-1",
  1134. )
  1135. self.assertEqual(b"iso8859-1", r[commit_sha].encoding)
  1136. def test_compression_level(self) -> None:
  1137. r = self._repo
  1138. c = r.get_config()
  1139. c.set(("core",), "compression", "3")
  1140. c.set(("core",), "looseCompression", "4")
  1141. c.write_to_path()
  1142. r = Repo(self._repo_dir)
  1143. self.addCleanup(r.close)
  1144. self.assertEqual(r.object_store.loose_compression_level, 4)
  1145. def test_repositoryformatversion_unsupported(self) -> None:
  1146. r = self._repo
  1147. c = r.get_config()
  1148. c.set(("core",), "repositoryformatversion", "2")
  1149. c.write_to_path()
  1150. self.assertRaises(UnsupportedVersion, Repo, self._repo_dir)
  1151. def test_repositoryformatversion_1(self) -> None:
  1152. r = self._repo
  1153. c = r.get_config()
  1154. c.set(("core",), "repositoryformatversion", "1")
  1155. c.write_to_path()
  1156. Repo(self._repo_dir)
  1157. def test_worktreeconfig_extension(self) -> None:
  1158. r = self._repo
  1159. c = r.get_config()
  1160. c.set(("core",), "repositoryformatversion", "1")
  1161. c.set(("extensions",), "worktreeconfig", True)
  1162. c.write_to_path()
  1163. c = r.get_worktree_config()
  1164. c.set(("user",), "repositoryformatversion", "1")
  1165. c.set((b"user",), b"name", b"Jelmer")
  1166. c.write_to_path()
  1167. cs = r.get_config_stack()
  1168. self.assertEqual(cs.get(("user",), "name"), b"Jelmer")
  1169. def test_worktreeconfig_extension_case(self) -> None:
  1170. """Test that worktree code does not error for alternate case format."""
  1171. r = self._repo
  1172. c = r.get_config()
  1173. c.set(("core",), "repositoryformatversion", "1")
  1174. # Capitalize "Config"
  1175. c.set(("extensions",), "worktreeConfig", True)
  1176. c.write_to_path()
  1177. c = r.get_worktree_config()
  1178. c.set(("user",), "repositoryformatversion", "1")
  1179. c.set((b"user",), b"name", b"Jelmer")
  1180. c.write_to_path()
  1181. # The following line errored before
  1182. # https://github.com/jelmer/dulwich/issues/1285 was addressed
  1183. Repo(self._repo_dir)
  1184. def test_repositoryformatversion_1_extension(self) -> None:
  1185. r = self._repo
  1186. c = r.get_config()
  1187. c.set(("core",), "repositoryformatversion", "1")
  1188. c.set(("extensions",), "unknownextension", True)
  1189. c.write_to_path()
  1190. self.assertRaises(UnsupportedExtension, Repo, self._repo_dir)
  1191. def test_commit_encoding_from_config(self) -> None:
  1192. r = self._repo
  1193. c = r.get_config()
  1194. c.set(("i18n",), "commitEncoding", "iso8859-1")
  1195. c.write_to_path()
  1196. commit_sha = r.get_worktree().commit(
  1197. message=b"commit with strange character \xee",
  1198. committer=b"Test Committer <test@nodomain.com>",
  1199. author=b"Test Author <test@nodomain.com>",
  1200. commit_timestamp=12395,
  1201. commit_timezone=0,
  1202. author_timestamp=12395,
  1203. author_timezone=0,
  1204. )
  1205. self.assertEqual(b"iso8859-1", r[commit_sha].encoding)
  1206. def test_commit_config_identity(self) -> None:
  1207. # commit falls back to the users' identity if it wasn't specified
  1208. r = self._repo
  1209. c = r.get_config()
  1210. c.set((b"user",), b"name", b"Jelmer")
  1211. c.set((b"user",), b"email", b"jelmer@apache.org")
  1212. c.write_to_path()
  1213. commit_sha = r.get_worktree().commit(
  1214. message=b"message",
  1215. )
  1216. self.assertEqual(b"Jelmer <jelmer@apache.org>", r[commit_sha].author)
  1217. self.assertEqual(b"Jelmer <jelmer@apache.org>", r[commit_sha].committer)
  1218. def test_commit_config_identity_strips_than(self) -> None:
  1219. # commit falls back to the users' identity if it wasn't specified,
  1220. # and strips superfluous <>
  1221. r = self._repo
  1222. c = r.get_config()
  1223. c.set((b"user",), b"name", b"Jelmer")
  1224. c.set((b"user",), b"email", b"<jelmer@apache.org>")
  1225. c.write_to_path()
  1226. commit_sha = r.get_worktree().commit(
  1227. message=b"message",
  1228. )
  1229. self.assertEqual(b"Jelmer <jelmer@apache.org>", r[commit_sha].author)
  1230. self.assertEqual(b"Jelmer <jelmer@apache.org>", r[commit_sha].committer)
  1231. def test_commit_config_identity_in_memoryrepo(self) -> None:
  1232. # commit falls back to the users' identity if it wasn't specified
  1233. r = MemoryRepo.init_bare([], {})
  1234. c = r.get_config()
  1235. c.set((b"user",), b"name", b"Jelmer")
  1236. c.set((b"user",), b"email", b"jelmer@apache.org")
  1237. # Create a tree object
  1238. tree = objects.Tree()
  1239. r.object_store.add_object(tree)
  1240. # Use do_commit for MemoryRepo since it doesn't support worktree
  1241. # Suppress deprecation warning since we're intentionally testing the deprecated method
  1242. with warnings.catch_warnings():
  1243. warnings.simplefilter("ignore", DeprecationWarning)
  1244. commit_sha = r.do_commit(
  1245. message=b"message",
  1246. tree=tree.id,
  1247. )
  1248. self.assertEqual(b"Jelmer <jelmer@apache.org>", r[commit_sha].author)
  1249. self.assertEqual(b"Jelmer <jelmer@apache.org>", r[commit_sha].committer)
  1250. def test_commit_config_identity_from_env(self) -> None:
  1251. # commit falls back to the users' identity if it wasn't specified
  1252. self.overrideEnv("GIT_COMMITTER_NAME", "joe")
  1253. self.overrideEnv("GIT_COMMITTER_EMAIL", "joe@example.com")
  1254. r = self._repo
  1255. c = r.get_config()
  1256. c.set((b"user",), b"name", b"Jelmer")
  1257. c.set((b"user",), b"email", b"jelmer@apache.org")
  1258. c.write_to_path()
  1259. commit_sha = r.get_worktree().commit(
  1260. message=b"message",
  1261. )
  1262. self.assertEqual(b"Jelmer <jelmer@apache.org>", r[commit_sha].author)
  1263. self.assertEqual(b"joe <joe@example.com>", r[commit_sha].committer)
  1264. def test_commit_fail_ref(self) -> None:
  1265. r = self._repo
  1266. def set_if_equals(name, old_ref, new_ref, **kwargs) -> bool:
  1267. return False
  1268. r.refs.set_if_equals = set_if_equals
  1269. def add_if_new(name, new_ref, **kwargs) -> None:
  1270. self.fail("Unexpected call to add_if_new")
  1271. r.refs.add_if_new = add_if_new
  1272. old_shas = set(r.object_store)
  1273. self.assertRaises(
  1274. errors.CommitError,
  1275. r.get_worktree().commit,
  1276. b"failed commit",
  1277. committer=b"Test Committer <test@nodomain.com>",
  1278. author=b"Test Author <test@nodomain.com>",
  1279. commit_timestamp=12345,
  1280. commit_timezone=0,
  1281. author_timestamp=12345,
  1282. author_timezone=0,
  1283. )
  1284. new_shas = set(r.object_store) - old_shas
  1285. self.assertEqual(1, len(new_shas))
  1286. # Check that the new commit (now garbage) was added.
  1287. def test_commit_message_callback(self) -> None:
  1288. """Test commit with a callable message."""
  1289. r = self._repo
  1290. # Define a callback that generates message based on repo and commit
  1291. def message_callback(repo, commit):
  1292. # Verify we get the right objects
  1293. self.assertEqual(repo, r)
  1294. self.assertIsNotNone(commit.tree)
  1295. self.assertIsNotNone(commit.author)
  1296. self.assertIsNotNone(commit.committer)
  1297. # Generate a message
  1298. return b"Generated commit for tree " + commit.tree[:8]
  1299. commit_sha = r.get_worktree().commit(
  1300. message=message_callback,
  1301. committer=b"Test Committer <test@nodomain.com>",
  1302. author=b"Test Author <test@nodomain.com>",
  1303. commit_timestamp=12345,
  1304. commit_timezone=0,
  1305. author_timestamp=12345,
  1306. author_timezone=0,
  1307. )
  1308. commit = r[commit_sha]
  1309. self.assertTrue(commit.message.startswith(b"Generated commit for tree "))
  1310. self.assertIn(commit.tree[:8], commit.message)
  1311. def test_commit_message_callback_returns_none(self) -> None:
  1312. """Test commit with callback that returns None."""
  1313. r = self._repo
  1314. def message_callback(repo, commit):
  1315. return None
  1316. self.assertRaises(
  1317. ValueError,
  1318. r.get_worktree().commit,
  1319. message_callback,
  1320. committer=b"Test Committer <test@nodomain.com>",
  1321. author=b"Test Author <test@nodomain.com>",
  1322. commit_timestamp=12345,
  1323. commit_timezone=0,
  1324. author_timestamp=12345,
  1325. author_timezone=0,
  1326. )
  1327. def test_commit_message_callback_with_merge_heads(self) -> None:
  1328. """Test commit with callback for merge commits."""
  1329. r = self._repo
  1330. # Create two parent commits first
  1331. parent1 = r.get_worktree().commit(
  1332. message=b"Parent 1",
  1333. committer=b"Test Committer <test@nodomain.com>",
  1334. author=b"Test Author <test@nodomain.com>",
  1335. )
  1336. parent2 = r.get_worktree().commit(
  1337. message=b"Parent 2",
  1338. committer=b"Test Committer <test@nodomain.com>",
  1339. author=b"Test Author <test@nodomain.com>",
  1340. ref=None,
  1341. )
  1342. def message_callback(repo, commit):
  1343. # Verify the commit object has parents set
  1344. self.assertEqual(2, len(commit.parents))
  1345. return b"Merge commit with %d parents" % len(commit.parents)
  1346. merge_sha = r.get_worktree().commit(
  1347. message=message_callback,
  1348. committer=b"Test Committer <test@nodomain.com>",
  1349. author=b"Test Author <test@nodomain.com>",
  1350. merge_heads=[parent2],
  1351. )
  1352. merge_commit = r[merge_sha]
  1353. self.assertEqual(b"Merge commit with 2 parents", merge_commit.message)
  1354. self.assertEqual([parent1, parent2], merge_commit.parents)
  1355. def test_commit_branch(self) -> None:
  1356. r = self._repo
  1357. commit_sha = r.get_worktree().commit(
  1358. message=b"commit to branch",
  1359. committer=b"Test Committer <test@nodomain.com>",
  1360. author=b"Test Author <test@nodomain.com>",
  1361. commit_timestamp=12395,
  1362. commit_timezone=0,
  1363. author_timestamp=12395,
  1364. author_timezone=0,
  1365. ref=b"refs/heads/new_branch",
  1366. )
  1367. self.assertEqual(self._root_commit, r[b"HEAD"].id)
  1368. self.assertEqual(commit_sha, r[b"refs/heads/new_branch"].id)
  1369. self.assertEqual([], r[commit_sha].parents)
  1370. self.assertIn(b"refs/heads/new_branch", r)
  1371. new_branch_head = commit_sha
  1372. commit_sha = r.get_worktree().commit(
  1373. message=b"commit to branch 2",
  1374. committer=b"Test Committer <test@nodomain.com>",
  1375. author=b"Test Author <test@nodomain.com>",
  1376. commit_timestamp=12395,
  1377. commit_timezone=0,
  1378. author_timestamp=12395,
  1379. author_timezone=0,
  1380. ref=b"refs/heads/new_branch",
  1381. )
  1382. self.assertEqual(self._root_commit, r[b"HEAD"].id)
  1383. self.assertEqual(commit_sha, r[b"refs/heads/new_branch"].id)
  1384. self.assertEqual([new_branch_head], r[commit_sha].parents)
  1385. def test_commit_merge_heads(self) -> None:
  1386. r = self._repo
  1387. merge_1 = r.get_worktree().commit(
  1388. message=b"commit to branch 2",
  1389. committer=b"Test Committer <test@nodomain.com>",
  1390. author=b"Test Author <test@nodomain.com>",
  1391. commit_timestamp=12395,
  1392. commit_timezone=0,
  1393. author_timestamp=12395,
  1394. author_timezone=0,
  1395. ref=b"refs/heads/new_branch",
  1396. )
  1397. commit_sha = r.get_worktree().commit(
  1398. message=b"commit with merge",
  1399. committer=b"Test Committer <test@nodomain.com>",
  1400. author=b"Test Author <test@nodomain.com>",
  1401. commit_timestamp=12395,
  1402. commit_timezone=0,
  1403. author_timestamp=12395,
  1404. author_timezone=0,
  1405. merge_heads=[merge_1],
  1406. )
  1407. self.assertEqual([self._root_commit, merge_1], r[commit_sha].parents)
  1408. def test_commit_dangling_commit(self) -> None:
  1409. r = self._repo
  1410. old_shas = set(r.object_store)
  1411. old_refs = r.get_refs()
  1412. commit_sha = r.get_worktree().commit(
  1413. message=b"commit with no ref",
  1414. committer=b"Test Committer <test@nodomain.com>",
  1415. author=b"Test Author <test@nodomain.com>",
  1416. commit_timestamp=12395,
  1417. commit_timezone=0,
  1418. author_timestamp=12395,
  1419. author_timezone=0,
  1420. ref=None,
  1421. )
  1422. new_shas = set(r.object_store) - old_shas
  1423. # New sha is added, but no new refs
  1424. self.assertEqual(1, len(new_shas))
  1425. new_commit = r[new_shas.pop()]
  1426. self.assertEqual(r[self._root_commit].tree, new_commit.tree)
  1427. self.assertEqual([], r[commit_sha].parents)
  1428. self.assertEqual(old_refs, r.get_refs())
  1429. def test_commit_dangling_commit_with_parents(self) -> None:
  1430. r = self._repo
  1431. old_shas = set(r.object_store)
  1432. old_refs = r.get_refs()
  1433. commit_sha = r.get_worktree().commit(
  1434. message=b"commit with no ref",
  1435. committer=b"Test Committer <test@nodomain.com>",
  1436. author=b"Test Author <test@nodomain.com>",
  1437. commit_timestamp=12395,
  1438. commit_timezone=0,
  1439. author_timestamp=12395,
  1440. author_timezone=0,
  1441. ref=None,
  1442. merge_heads=[self._root_commit],
  1443. )
  1444. new_shas = set(r.object_store) - old_shas
  1445. # New sha is added, but no new refs
  1446. self.assertEqual(1, len(new_shas))
  1447. new_commit = r[new_shas.pop()]
  1448. self.assertEqual(r[self._root_commit].tree, new_commit.tree)
  1449. self.assertEqual([self._root_commit], r[commit_sha].parents)
  1450. self.assertEqual(old_refs, r.get_refs())
  1451. def test_stage_absolute(self) -> None:
  1452. r = self._repo
  1453. os.remove(os.path.join(r.path, "a"))
  1454. # Suppress deprecation warning since we're intentionally testing the deprecated method
  1455. with warnings.catch_warnings():
  1456. warnings.simplefilter("ignore", DeprecationWarning)
  1457. self.assertRaises(ValueError, r.stage, [os.path.join(r.path, "a")])
  1458. def test_stage_deleted(self) -> None:
  1459. r = self._repo
  1460. os.remove(os.path.join(r.path, "a"))
  1461. r.get_worktree().stage(["a"])
  1462. r.get_worktree().stage(["a"]) # double-stage a deleted path
  1463. self.assertEqual([], list(r.open_index()))
  1464. def test_stage_directory(self) -> None:
  1465. r = self._repo
  1466. os.mkdir(os.path.join(r.path, "c"))
  1467. r.get_worktree().stage(["c"])
  1468. self.assertEqual([b"a"], list(r.open_index()))
  1469. def test_stage_submodule(self) -> None:
  1470. r = self._repo
  1471. s = Repo.init(os.path.join(r.path, "sub"), mkdir=True)
  1472. s.get_worktree().commit(
  1473. message=b"message",
  1474. )
  1475. r.get_worktree().stage(["sub"])
  1476. self.assertEqual([b"a", b"sub"], list(r.open_index()))
  1477. def test_unstage_midify_file_with_dir(self) -> None:
  1478. os.mkdir(os.path.join(self._repo.path, "new_dir"))
  1479. full_path = os.path.join(self._repo.path, "new_dir", "foo")
  1480. with open(full_path, "w") as f:
  1481. f.write("hello")
  1482. wt = self._repo.get_worktree()
  1483. wt.stage(["new_dir/foo"])
  1484. wt.commit(
  1485. message=b"unitest",
  1486. committer=b"Jane <jane@example.com>",
  1487. author=b"John <john@example.com>",
  1488. )
  1489. with open(full_path, "a") as f:
  1490. f.write("something new")
  1491. wt.unstage(["new_dir/foo"])
  1492. unstaged = get_unstaged_changes(self._repo)
  1493. self.assertEqual([b"new_dir/foo"], unstaged)
  1494. def test_unstage_while_no_commit(self) -> None:
  1495. file = "foo"
  1496. full_path = os.path.join(self._repo.path, file)
  1497. with open(full_path, "w") as f:
  1498. f.write("hello")
  1499. wt = self._repo.get_worktree()
  1500. wt.stage([file])
  1501. wt.unstage([file])
  1502. # Check that file is no longer in index
  1503. index = self._repo.open_index()
  1504. self.assertNotIn(b"foo", index)
  1505. def test_unstage_add_file(self) -> None:
  1506. file = "foo"
  1507. full_path = os.path.join(self._repo.path, file)
  1508. wt = self._repo.get_worktree()
  1509. wt.commit(
  1510. message=b"unitest",
  1511. committer=b"Jane <jane@example.com>",
  1512. author=b"John <john@example.com>",
  1513. )
  1514. with open(full_path, "w") as f:
  1515. f.write("hello")
  1516. wt.stage([file])
  1517. wt.unstage([file])
  1518. # Check that file is no longer in index
  1519. index = self._repo.open_index()
  1520. self.assertNotIn(b"foo", index)
  1521. def test_unstage_modify_file(self) -> None:
  1522. file = "foo"
  1523. full_path = os.path.join(self._repo.path, file)
  1524. with open(full_path, "w") as f:
  1525. f.write("hello")
  1526. wt = self._repo.get_worktree()
  1527. wt.stage([file])
  1528. wt.commit(
  1529. message=b"unitest",
  1530. committer=b"Jane <jane@example.com>",
  1531. author=b"John <john@example.com>",
  1532. )
  1533. with open(full_path, "a") as f:
  1534. f.write("broken")
  1535. wt.stage([file])
  1536. wt.unstage([file])
  1537. unstaged = get_unstaged_changes(self._repo)
  1538. self.assertEqual([os.fsencode("foo")], unstaged)
  1539. def test_unstage_remove_file(self) -> None:
  1540. file = "foo"
  1541. full_path = os.path.join(self._repo.path, file)
  1542. with open(full_path, "w") as f:
  1543. f.write("hello")
  1544. wt = self._repo.get_worktree()
  1545. wt.stage([file])
  1546. wt.commit(
  1547. message=b"unitest",
  1548. committer=b"Jane <jane@example.com>",
  1549. author=b"John <john@example.com>",
  1550. )
  1551. os.remove(full_path)
  1552. wt.unstage([file])
  1553. unstaged = get_unstaged_changes(self._repo)
  1554. self.assertEqual([os.fsencode("foo")], unstaged)
  1555. def test_reset_index(self) -> None:
  1556. r = self._repo
  1557. with open(os.path.join(r.path, "a"), "wb") as f:
  1558. f.write(b"changed")
  1559. with open(os.path.join(r.path, "b"), "wb") as f:
  1560. f.write(b"added")
  1561. r.get_worktree().stage(["a", "b"])
  1562. # Check staged changes using lower-level APIs
  1563. index = r.open_index()
  1564. staged = {"add": [], "delete": [], "modify": []}
  1565. try:
  1566. head_commit = r[b"HEAD"]
  1567. tree_id = head_commit.tree
  1568. except KeyError:
  1569. tree_id = None
  1570. for change in index.changes_from_tree(r.object_store, tree_id):
  1571. if not change[0][0]:
  1572. staged["add"].append(change[0][1])
  1573. elif not change[1][1]:
  1574. staged["delete"].append(change[0][1])
  1575. else:
  1576. staged["modify"].append(change[0][1])
  1577. self.assertEqual({"add": [b"b"], "delete": [], "modify": [b"a"]}, staged)
  1578. r.get_worktree().reset_index()
  1579. # After reset, check that nothing is staged and b is untracked
  1580. index = r.open_index()
  1581. self.assertNotIn(b"b", index)
  1582. self.assertIn(b"a", index)
  1583. @skipIf(
  1584. sys.platform in ("win32", "darwin"),
  1585. "tries to implicitly decode as utf8",
  1586. )
  1587. def test_commit_no_encode_decode(self) -> None:
  1588. r = self._repo
  1589. repo_path_bytes = os.fsencode(r.path)
  1590. encodings = ("utf8", "latin1")
  1591. names = ["À".encode(encoding) for encoding in encodings]
  1592. for name, encoding in zip(names, encodings):
  1593. full_path = os.path.join(repo_path_bytes, name)
  1594. with open(full_path, "wb") as f:
  1595. f.write(encoding.encode("ascii"))
  1596. # These files are break tear_down_repo, so cleanup these files
  1597. # ourselves.
  1598. self.addCleanup(os.remove, full_path)
  1599. r.get_worktree().stage(names)
  1600. commit_sha = r.get_worktree().commit(
  1601. message=b"Files with different encodings",
  1602. committer=b"Test Committer <test@nodomain.com>",
  1603. author=b"Test Author <test@nodomain.com>",
  1604. commit_timestamp=12395,
  1605. commit_timezone=0,
  1606. author_timestamp=12395,
  1607. author_timezone=0,
  1608. ref=None,
  1609. merge_heads=[self._root_commit],
  1610. )
  1611. for name, encoding in zip(names, encodings):
  1612. mode, id = tree_lookup_path(r.get_object, r[commit_sha].tree, name)
  1613. self.assertEqual(stat.S_IFREG | 0o644, mode)
  1614. self.assertEqual(encoding.encode("ascii"), r[id].data)
  1615. def test_discover_intended(self) -> None:
  1616. path = os.path.join(self._repo_dir, "b/c")
  1617. r = Repo.discover(path)
  1618. self.assertEqual(r.head(), self._repo.head())
  1619. def test_discover_isrepo(self) -> None:
  1620. r = Repo.discover(self._repo_dir)
  1621. self.assertEqual(r.head(), self._repo.head())
  1622. def test_discover_notrepo(self) -> None:
  1623. with self.assertRaises(NotGitRepository):
  1624. Repo.discover("/")
  1625. class CheckUserIdentityTests(TestCase):
  1626. def test_valid(self) -> None:
  1627. check_user_identity(b"Me <me@example.com>")
  1628. def test_invalid(self) -> None:
  1629. self.assertRaises(InvalidUserIdentity, check_user_identity, b"No Email")
  1630. self.assertRaises(
  1631. InvalidUserIdentity, check_user_identity, b"Fullname <missing"
  1632. )
  1633. self.assertRaises(
  1634. InvalidUserIdentity, check_user_identity, b"Fullname missing>"
  1635. )
  1636. self.assertRaises(
  1637. InvalidUserIdentity, check_user_identity, b"Fullname >order<>"
  1638. )
  1639. self.assertRaises(
  1640. InvalidUserIdentity, check_user_identity, b"Contains\0null byte <>"
  1641. )
  1642. self.assertRaises(
  1643. InvalidUserIdentity, check_user_identity, b"Contains\nnewline byte <>"
  1644. )
  1645. class RepoConfigIncludeIfTests(TestCase):
  1646. """Test includeIf functionality in repository config loading."""
  1647. def test_repo_config_includeif_gitdir(self) -> None:
  1648. """Test that includeIf gitdir conditions work when loading repo config."""
  1649. import tempfile
  1650. from dulwich.repo import Repo
  1651. with tempfile.TemporaryDirectory() as tmpdir:
  1652. # Create a repository
  1653. repo_path = os.path.join(tmpdir, "myrepo")
  1654. r = Repo.init(repo_path, mkdir=True)
  1655. # Use realpath to resolve any symlinks (important on macOS)
  1656. repo_path = os.path.realpath(repo_path)
  1657. # Create an included config file
  1658. included_path = os.path.join(tmpdir, "work.config")
  1659. with open(included_path, "wb") as f:
  1660. f.write(b"[user]\n email = work@example.com\n")
  1661. # Add includeIf to the repo config
  1662. config_path = os.path.join(repo_path, ".git", "config")
  1663. with open(config_path, "ab") as f:
  1664. f.write(f'\n[includeIf "gitdir:{repo_path}/.git/"]\n'.encode())
  1665. escaped_path = included_path.replace("\\", "\\\\")
  1666. f.write(f" path = {escaped_path}\n".encode())
  1667. # Close and reopen to reload config
  1668. r.close()
  1669. r = Repo(repo_path)
  1670. # Check if include was processed
  1671. config = r.get_config()
  1672. self.assertEqual(b"work@example.com", config.get((b"user",), b"email"))
  1673. r.close()
  1674. def test_repo_config_includeif_gitdir_pattern(self) -> None:
  1675. """Test includeIf gitdir pattern matching in repository config."""
  1676. import tempfile
  1677. from dulwich.repo import Repo
  1678. with tempfile.TemporaryDirectory() as tmpdir:
  1679. # Create a repository under "work" directory
  1680. work_dir = os.path.join(tmpdir, "work", "project1")
  1681. os.makedirs(os.path.dirname(work_dir), exist_ok=True)
  1682. r = Repo.init(work_dir, mkdir=True)
  1683. # Create an included config file
  1684. included_path = os.path.join(tmpdir, "work.config")
  1685. with open(included_path, "wb") as f:
  1686. f.write(b"[user]\n email = work@company.com\n")
  1687. # Add includeIf with pattern to the repo config
  1688. config_path = os.path.join(work_dir, ".git", "config")
  1689. with open(config_path, "ab") as f:
  1690. # Use a pattern that will match paths containing /work/
  1691. f.write(b'\n[includeIf "gitdir:**/work/**"]\n')
  1692. escaped_path = included_path.replace("\\", "\\\\")
  1693. f.write(f" path = {escaped_path}\n".encode())
  1694. # Close and reopen to reload config
  1695. r.close()
  1696. r = Repo(work_dir)
  1697. # Check if include was processed
  1698. config = r.get_config()
  1699. self.assertEqual(b"work@company.com", config.get((b"user",), b"email"))
  1700. r.close()
  1701. def test_repo_config_includeif_no_match(self) -> None:
  1702. """Test that includeIf doesn't include when condition doesn't match."""
  1703. import tempfile
  1704. from dulwich.repo import Repo
  1705. with tempfile.TemporaryDirectory() as tmpdir:
  1706. # Create a repository
  1707. repo_path = os.path.join(tmpdir, "personal", "project")
  1708. os.makedirs(os.path.dirname(repo_path), exist_ok=True)
  1709. r = Repo.init(repo_path, mkdir=True)
  1710. # Create an included config file
  1711. included_path = os.path.join(tmpdir, "work.config")
  1712. with open(included_path, "wb") as f:
  1713. f.write(b"[user]\n email = work@company.com\n")
  1714. # Add includeIf that won't match
  1715. config_path = os.path.join(repo_path, ".git", "config")
  1716. with open(config_path, "ab") as f:
  1717. f.write(b'\n[includeIf "gitdir:**/work/**"]\n')
  1718. escaped_path = included_path.replace("\\", "\\\\")
  1719. f.write(f" path = {escaped_path}\n".encode())
  1720. # Close and reopen to reload config
  1721. r.close()
  1722. r = Repo(repo_path)
  1723. # Check that include was NOT processed
  1724. config = r.get_config()
  1725. with self.assertRaises(KeyError):
  1726. config.get((b"user",), b"email")
  1727. r.close()
  1728. def test_bare_repo_config_includeif(self) -> None:
  1729. """Test includeIf in bare repository."""
  1730. import tempfile
  1731. from dulwich.repo import Repo
  1732. with tempfile.TemporaryDirectory() as tmpdir:
  1733. # Create a bare repository
  1734. repo_path = os.path.join(tmpdir, "bare.git")
  1735. r = Repo.init_bare(repo_path, mkdir=True)
  1736. # Use realpath to resolve any symlinks (important on macOS)
  1737. repo_path = os.path.realpath(repo_path)
  1738. # Create an included config file
  1739. included_path = os.path.join(tmpdir, "server.config")
  1740. with open(included_path, "wb") as f:
  1741. f.write(b"[receive]\n denyNonFastForwards = true\n")
  1742. # Add includeIf to the repo config
  1743. config_path = os.path.join(repo_path, "config")
  1744. with open(config_path, "ab") as f:
  1745. f.write(f'\n[includeIf "gitdir:{repo_path}/"]\n'.encode())
  1746. escaped_path = included_path.replace("\\", "\\\\")
  1747. f.write(f" path = {escaped_path}\n".encode())
  1748. # Close and reopen to reload config
  1749. r.close()
  1750. r = Repo(repo_path)
  1751. # Check if include was processed
  1752. config = r.get_config()
  1753. self.assertEqual(b"true", config.get((b"receive",), b"denyNonFastForwards"))
  1754. r.close()
  1755. def test_repo_config_includeif_hasconfig(self) -> None:
  1756. """Test includeIf hasconfig conditions in repository config."""
  1757. import tempfile
  1758. from dulwich.repo import Repo
  1759. with tempfile.TemporaryDirectory() as tmpdir:
  1760. # Create a repository
  1761. repo_path = os.path.join(tmpdir, "myrepo")
  1762. r = Repo.init(repo_path, mkdir=True)
  1763. # Create an included config file
  1764. included_path = os.path.join(tmpdir, "work.config")
  1765. with open(included_path, "wb") as f:
  1766. f.write(b"[user]\n name = WorkUser\n")
  1767. # Add a remote and includeIf hasconfig to the repo config
  1768. config_path = os.path.join(repo_path, ".git", "config")
  1769. with open(config_path, "ab") as f:
  1770. f.write(b'\n[remote "origin"]\n')
  1771. f.write(b" url = ssh://org-work@github.com/company/project\n")
  1772. f.write(
  1773. b'[includeIf "hasconfig:remote.*.url:ssh://org-*@github.com/**"]\n'
  1774. )
  1775. escaped_path = included_path.replace("\\", "\\\\")
  1776. f.write(f" path = {escaped_path}\n".encode())
  1777. # Close and reopen to reload config
  1778. r.close()
  1779. r = Repo(repo_path)
  1780. # Check if include was processed
  1781. config = r.get_config()
  1782. self.assertEqual(b"WorkUser", config.get((b"user",), b"name"))
  1783. r.close()
  1784. def test_repo_config_includeif_onbranch(self) -> None:
  1785. """Test includeIf onbranch conditions in repository config."""
  1786. import tempfile
  1787. from dulwich.repo import Repo
  1788. with tempfile.TemporaryDirectory() as tmpdir:
  1789. # Create a repository
  1790. repo_path = os.path.join(tmpdir, "myrepo")
  1791. r = Repo.init(repo_path, mkdir=True)
  1792. # Create HEAD pointing to main branch
  1793. refs_heads_dir = os.path.join(repo_path, ".git", "refs", "heads")
  1794. os.makedirs(refs_heads_dir, exist_ok=True)
  1795. main_ref_path = os.path.join(refs_heads_dir, "main")
  1796. with open(main_ref_path, "wb") as f:
  1797. f.write(b"0123456789012345678901234567890123456789\n")
  1798. head_path = os.path.join(repo_path, ".git", "HEAD")
  1799. with open(head_path, "wb") as f:
  1800. f.write(b"ref: refs/heads/main\n")
  1801. # Create an included config file
  1802. included_path = os.path.join(tmpdir, "main.config")
  1803. with open(included_path, "wb") as f:
  1804. f.write(b"[core]\n autocrlf = true\n")
  1805. # Add includeIf onbranch to the repo config
  1806. config_path = os.path.join(repo_path, ".git", "config")
  1807. with open(config_path, "ab") as f:
  1808. f.write(b'\n[includeIf "onbranch:main"]\n')
  1809. escaped_path = included_path.replace("\\", "\\\\")
  1810. f.write(f" path = {escaped_path}\n".encode())
  1811. # Close and reopen to reload config
  1812. r.close()
  1813. r = Repo(repo_path)
  1814. # Check if include was processed
  1815. config = r.get_config()
  1816. self.assertEqual(b"true", config.get((b"core",), b"autocrlf"))
  1817. r.close()
  1818. @skipIf(sys.platform == "win32", "Windows does not support Unix file permissions")
  1819. class SharedRepositoryTests(TestCase):
  1820. """Tests for core.sharedRepository functionality."""
  1821. def setUp(self):
  1822. super().setUp()
  1823. self._orig_umask = os.umask(0o022)
  1824. def tearDown(self):
  1825. os.umask(self._orig_umask)
  1826. super().tearDown()
  1827. def _get_file_mode(self, path):
  1828. """Get the file mode bits (without file type bits)."""
  1829. return stat.S_IMODE(os.stat(path).st_mode)
  1830. def _check_permissions(self, repo, expected_file_mode, expected_dir_mode):
  1831. """Check that repository files and directories have expected permissions."""
  1832. objects_dir = os.path.join(repo.commondir(), "objects")
  1833. # Check objects directory
  1834. actual_dir_mode = self._get_file_mode(objects_dir)
  1835. self.assertEqual(
  1836. expected_dir_mode,
  1837. actual_dir_mode,
  1838. f"objects dir mode: expected {oct(expected_dir_mode)}, got {oct(actual_dir_mode)}",
  1839. )
  1840. # Check pack directory
  1841. pack_dir = os.path.join(objects_dir, "pack")
  1842. actual_dir_mode = self._get_file_mode(pack_dir)
  1843. self.assertEqual(
  1844. expected_dir_mode,
  1845. actual_dir_mode,
  1846. f"pack dir mode: expected {oct(expected_dir_mode)}, got {oct(actual_dir_mode)}",
  1847. )
  1848. # Check info directory
  1849. info_dir = os.path.join(objects_dir, "info")
  1850. actual_dir_mode = self._get_file_mode(info_dir)
  1851. self.assertEqual(
  1852. expected_dir_mode,
  1853. actual_dir_mode,
  1854. f"info dir mode: expected {oct(expected_dir_mode)}, got {oct(actual_dir_mode)}",
  1855. )
  1856. def test_init_bare_shared_group(self):
  1857. """Test initializing bare repo with sharedRepository=group."""
  1858. tmp_dir = tempfile.mkdtemp()
  1859. self.addCleanup(shutil.rmtree, tmp_dir)
  1860. # Set umask to 0 to see what permissions are actually set
  1861. os.umask(0)
  1862. repo = Repo.init_bare(tmp_dir, shared_repository="group")
  1863. self.addCleanup(repo.close)
  1864. # Expected permissions for group sharing
  1865. expected_dir_mode = 0o2775 # setgid + rwxrwxr-x
  1866. expected_file_mode = 0o664 # rw-rw-r--
  1867. self._check_permissions(repo, expected_file_mode, expected_dir_mode)
  1868. def test_init_bare_shared_all(self):
  1869. """Test initializing bare repo with sharedRepository=all."""
  1870. tmp_dir = tempfile.mkdtemp()
  1871. self.addCleanup(shutil.rmtree, tmp_dir)
  1872. # Set umask to 0 to see what permissions are actually set
  1873. os.umask(0)
  1874. repo = Repo.init_bare(tmp_dir, shared_repository="all")
  1875. self.addCleanup(repo.close)
  1876. # Expected permissions for world sharing
  1877. expected_dir_mode = 0o2777 # setgid + rwxrwxrwx
  1878. expected_file_mode = 0o666 # rw-rw-rw-
  1879. self._check_permissions(repo, expected_file_mode, expected_dir_mode)
  1880. def test_init_bare_shared_umask(self):
  1881. """Test initializing bare repo with sharedRepository=umask (default)."""
  1882. tmp_dir = tempfile.mkdtemp()
  1883. self.addCleanup(shutil.rmtree, tmp_dir)
  1884. repo = Repo.init_bare(tmp_dir, shared_repository="umask")
  1885. self.addCleanup(repo.close)
  1886. # With umask, no special permissions should be set
  1887. # The actual permissions will depend on the umask, but we can
  1888. # at least verify that setgid is NOT set
  1889. objects_dir = os.path.join(repo.commondir(), "objects")
  1890. actual_mode = os.stat(objects_dir).st_mode
  1891. # Verify setgid bit is NOT set
  1892. self.assertEqual(0, actual_mode & stat.S_ISGID)
  1893. def test_loose_object_permissions_group(self):
  1894. """Test that loose objects get correct permissions with sharedRepository=group."""
  1895. tmp_dir = tempfile.mkdtemp()
  1896. self.addCleanup(shutil.rmtree, tmp_dir)
  1897. # Set umask to 0 to see what permissions are actually set
  1898. os.umask(0)
  1899. repo = Repo.init_bare(tmp_dir, shared_repository="group")
  1900. self.addCleanup(repo.close)
  1901. # Create a blob object
  1902. blob = objects.Blob.from_string(b"test content")
  1903. repo.object_store.add_object(blob)
  1904. # Find the object file
  1905. obj_path = repo.object_store._get_shafile_path(blob.id)
  1906. # Check file permissions
  1907. actual_mode = self._get_file_mode(obj_path)
  1908. expected_mode = 0o664 # rw-rw-r--
  1909. self.assertEqual(
  1910. expected_mode,
  1911. actual_mode,
  1912. f"loose object mode: expected {oct(expected_mode)}, got {oct(actual_mode)}",
  1913. )
  1914. # Check directory permissions
  1915. obj_dir = os.path.dirname(obj_path)
  1916. actual_dir_mode = self._get_file_mode(obj_dir)
  1917. expected_dir_mode = 0o2775 # setgid + rwxrwxr-x
  1918. self.assertEqual(
  1919. expected_dir_mode,
  1920. actual_dir_mode,
  1921. f"object dir mode: expected {oct(expected_dir_mode)}, got {oct(actual_dir_mode)}",
  1922. )
  1923. def test_loose_object_permissions_all(self):
  1924. """Test that loose objects get correct permissions with sharedRepository=all."""
  1925. tmp_dir = tempfile.mkdtemp()
  1926. self.addCleanup(shutil.rmtree, tmp_dir)
  1927. # Set umask to 0 to see what permissions are actually set
  1928. os.umask(0)
  1929. repo = Repo.init_bare(tmp_dir, shared_repository="all")
  1930. self.addCleanup(repo.close)
  1931. # Create a blob object
  1932. blob = objects.Blob.from_string(b"test content")
  1933. repo.object_store.add_object(blob)
  1934. # Find the object file
  1935. obj_path = repo.object_store._get_shafile_path(blob.id)
  1936. # Check file permissions
  1937. actual_mode = self._get_file_mode(obj_path)
  1938. expected_mode = 0o666 # rw-rw-rw-
  1939. self.assertEqual(
  1940. expected_mode,
  1941. actual_mode,
  1942. f"loose object mode: expected {oct(expected_mode)}, got {oct(actual_mode)}",
  1943. )
  1944. def test_pack_file_permissions_group(self):
  1945. """Test that pack files get correct permissions with sharedRepository=group."""
  1946. tmp_dir = tempfile.mkdtemp()
  1947. self.addCleanup(shutil.rmtree, tmp_dir)
  1948. # Set umask to 0 to see what permissions are actually set
  1949. os.umask(0)
  1950. repo = Repo.init_bare(tmp_dir, shared_repository="group")
  1951. self.addCleanup(repo.close)
  1952. # Create some objects
  1953. blobs = [
  1954. objects.Blob.from_string(f"test content {i}".encode()) for i in range(5)
  1955. ]
  1956. repo.object_store.add_objects([(blob, None) for blob in blobs])
  1957. # Find the pack files
  1958. pack_dir = os.path.join(repo.commondir(), "objects", "pack")
  1959. pack_files = [f for f in os.listdir(pack_dir) if f.endswith(".pack")]
  1960. self.assertGreater(len(pack_files), 0, "No pack files created")
  1961. # Check pack file permissions
  1962. pack_path = os.path.join(pack_dir, pack_files[0])
  1963. actual_mode = self._get_file_mode(pack_path)
  1964. expected_mode = 0o664 # rw-rw-r--
  1965. self.assertEqual(
  1966. expected_mode,
  1967. actual_mode,
  1968. f"pack file mode: expected {oct(expected_mode)}, got {oct(actual_mode)}",
  1969. )
  1970. def test_pack_index_permissions_group(self):
  1971. """Test that pack index files get correct permissions with sharedRepository=group."""
  1972. tmp_dir = tempfile.mkdtemp()
  1973. self.addCleanup(shutil.rmtree, tmp_dir)
  1974. # Set umask to 0 to see what permissions are actually set
  1975. os.umask(0)
  1976. repo = Repo.init_bare(tmp_dir, shared_repository="group")
  1977. self.addCleanup(repo.close)
  1978. # Create some objects
  1979. blobs = [
  1980. objects.Blob.from_string(f"test content {i}".encode()) for i in range(5)
  1981. ]
  1982. repo.object_store.add_objects([(blob, None) for blob in blobs])
  1983. # Find the pack index files
  1984. pack_dir = os.path.join(repo.commondir(), "objects", "pack")
  1985. idx_files = [f for f in os.listdir(pack_dir) if f.endswith(".idx")]
  1986. self.assertGreater(len(idx_files), 0, "No pack index files created")
  1987. # Check pack index file permissions
  1988. idx_path = os.path.join(pack_dir, idx_files[0])
  1989. actual_mode = self._get_file_mode(idx_path)
  1990. expected_mode = 0o664 # rw-rw-r--
  1991. self.assertEqual(
  1992. expected_mode,
  1993. actual_mode,
  1994. f"pack index mode: expected {oct(expected_mode)}, got {oct(actual_mode)}",
  1995. )
  1996. def test_index_file_permissions_group(self):
  1997. """Test that index file gets correct permissions with sharedRepository=group."""
  1998. tmp_dir = tempfile.mkdtemp()
  1999. self.addCleanup(shutil.rmtree, tmp_dir)
  2000. # Set umask to 0 to see what permissions are actually set
  2001. os.umask(0)
  2002. # Create non-bare repo (index only exists in non-bare repos)
  2003. repo = Repo.init(tmp_dir, shared_repository="group")
  2004. self.addCleanup(repo.close)
  2005. # Make a change to trigger index write
  2006. blob = objects.Blob.from_string(b"test content")
  2007. repo.object_store.add_object(blob)
  2008. test_file = os.path.join(tmp_dir, "test.txt")
  2009. with open(test_file, "wb") as f:
  2010. f.write(b"test content")
  2011. # Stage the file
  2012. repo.get_worktree().stage(["test.txt"])
  2013. # Check index file permissions
  2014. index_path = repo.index_path()
  2015. actual_mode = self._get_file_mode(index_path)
  2016. expected_mode = 0o664 # rw-rw-r--
  2017. self.assertEqual(
  2018. expected_mode,
  2019. actual_mode,
  2020. f"index file mode: expected {oct(expected_mode)}, got {oct(actual_mode)}",
  2021. )
  2022. def test_existing_repo_respects_config(self):
  2023. """Test that opening an existing repo respects core.sharedRepository config."""
  2024. tmp_dir = tempfile.mkdtemp()
  2025. self.addCleanup(shutil.rmtree, tmp_dir)
  2026. # Set umask to 0 to see what permissions are actually set
  2027. os.umask(0)
  2028. # Create repo with shared=group
  2029. repo = Repo.init_bare(tmp_dir, shared_repository="group")
  2030. repo.close()
  2031. # Reopen the repo
  2032. repo = Repo(tmp_dir)
  2033. self.addCleanup(repo.close)
  2034. # Add an object and check permissions
  2035. blob = objects.Blob.from_string(b"test content after reopen")
  2036. repo.object_store.add_object(blob)
  2037. obj_path = repo.object_store._get_shafile_path(blob.id)
  2038. actual_mode = self._get_file_mode(obj_path)
  2039. expected_mode = 0o664 # rw-rw-r--
  2040. self.assertEqual(
  2041. expected_mode,
  2042. actual_mode,
  2043. f"loose object mode after reopen: expected {oct(expected_mode)}, got {oct(actual_mode)}",
  2044. )
  2045. def test_reflog_permissions_group(self):
  2046. """Test that reflog files get correct permissions with sharedRepository=group."""
  2047. tmp_dir = tempfile.mkdtemp()
  2048. self.addCleanup(shutil.rmtree, tmp_dir)
  2049. # Set umask to 0 to see what permissions are actually set
  2050. os.umask(0)
  2051. repo = Repo.init(tmp_dir, shared_repository="group")
  2052. self.addCleanup(repo.close)
  2053. # Create a commit to trigger reflog creation
  2054. blob = objects.Blob.from_string(b"test content")
  2055. tree = objects.Tree()
  2056. tree.add(b"test.txt", 0o100644, blob.id)
  2057. c = objects.Commit()
  2058. c.tree = tree.id
  2059. c.author = c.committer = b"Test <test@example.com>"
  2060. c.author_time = c.commit_time = int(time.time())
  2061. c.author_timezone = c.commit_timezone = 0
  2062. c.encoding = b"UTF-8"
  2063. c.message = b"Test commit"
  2064. repo.object_store.add_object(blob)
  2065. repo.object_store.add_object(tree)
  2066. repo.object_store.add_object(c)
  2067. # Update ref to trigger reflog creation
  2068. repo.refs.set_if_equals(
  2069. b"refs/heads/master", None, c.id, message=b"commit: initial commit"
  2070. )
  2071. # Check reflog file permissions
  2072. reflog_path = os.path.join(repo.controldir(), "logs", "refs", "heads", "master")
  2073. self.assertTrue(os.path.exists(reflog_path), "Reflog file should exist")
  2074. actual_mode = self._get_file_mode(reflog_path)
  2075. expected_mode = 0o664 # rw-rw-r--
  2076. self.assertEqual(
  2077. expected_mode,
  2078. actual_mode,
  2079. f"reflog file mode: expected {oct(expected_mode)}, got {oct(actual_mode)}",
  2080. )
  2081. # Check reflog directory permissions
  2082. reflog_dir = os.path.dirname(reflog_path)
  2083. actual_dir_mode = self._get_file_mode(reflog_dir)
  2084. expected_dir_mode = 0o2775 # setgid + rwxrwxr-x
  2085. self.assertEqual(
  2086. expected_dir_mode,
  2087. actual_dir_mode,
  2088. f"reflog dir mode: expected {oct(expected_dir_mode)}, got {oct(actual_dir_mode)}",
  2089. )