test_repository.py 72 KB


  1. # test_repository.py -- tests for repository.py
  2. # Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as public by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for the repository."""
  22. import glob
  23. import locale
  24. import os
  25. import shutil
  26. import stat
  27. import sys
  28. import tempfile
  29. import warnings
  30. from dulwich import errors, objects, porcelain
  31. from dulwich.config import Config
  32. from dulwich.errors import NotGitRepository
  33. from dulwich.object_store import tree_lookup_path
  34. from dulwich.repo import (
  35. InvalidUserIdentity,
  36. MemoryRepo,
  37. Repo,
  38. UnsupportedExtension,
  39. UnsupportedVersion,
  40. check_user_identity,
  41. )
  42. from dulwich.tests.utils import open_repo, setup_warning_catcher, tear_down_repo
  43. from . import TestCase, skipIf
  44. missing_sha = b"b91fa4d900e17e99b433218e988c4eb4a3e9a097"
  45. class CreateRepositoryTests(TestCase):
  46. def assertFileContentsEqual(self, expected, repo, path) -> None:
  47. f = repo.get_named_file(path)
  48. if not f:
  49. self.assertEqual(expected, None)
  50. else:
  51. with f:
  52. self.assertEqual(expected, f.read())
  53. def _check_repo_contents(self, repo, expect_bare) -> None:
  54. self.assertEqual(expect_bare, repo.bare)
  55. self.assertFileContentsEqual(b"Unnamed repository", repo, "description")
  56. self.assertFileContentsEqual(b"", repo, os.path.join("info", "exclude"))
  57. self.assertFileContentsEqual(None, repo, "nonexistent file")
  58. barestr = b"bare = " + str(expect_bare).lower().encode("ascii")
  59. with repo.get_named_file("config") as f:
  60. config_text = f.read()
  61. self.assertIn(barestr, config_text, f"{config_text!r}")
  62. expect_filemode = sys.platform != "win32"
  63. barestr = b"filemode = " + str(expect_filemode).lower().encode("ascii")
  64. with repo.get_named_file("config") as f:
  65. config_text = f.read()
  66. self.assertIn(barestr, config_text, f"{config_text!r}")
  67. if isinstance(repo, Repo):
  68. expected_mode = "0o100644" if expect_filemode else "0o100666"
  69. expected = {
  70. "HEAD": expected_mode,
  71. "config": expected_mode,
  72. "description": expected_mode,
  73. }
  74. actual = {
  75. f[len(repo._controldir) + 1 :]: oct(os.stat(f).st_mode)
  76. for f in glob.glob(os.path.join(repo._controldir, "*"))
  77. if os.path.isfile(f)
  78. }
  79. self.assertEqual(expected, actual)
  80. def test_create_memory(self) -> None:
  81. repo = MemoryRepo.init_bare([], {})
  82. self._check_repo_contents(repo, True)
  83. def test_create_disk_bare(self) -> None:
  84. tmp_dir = tempfile.mkdtemp()
  85. self.addCleanup(shutil.rmtree, tmp_dir)
  86. repo = Repo.init_bare(tmp_dir)
  87. self.assertEqual(tmp_dir, repo._controldir)
  88. self._check_repo_contents(repo, True)
  89. def test_create_disk_non_bare(self) -> None:
  90. tmp_dir = tempfile.mkdtemp()
  91. self.addCleanup(shutil.rmtree, tmp_dir)
  92. repo = Repo.init(tmp_dir)
  93. self.assertEqual(os.path.join(tmp_dir, ".git"), repo._controldir)
  94. self._check_repo_contents(repo, False)
  95. def test_create_disk_non_bare_mkdir(self) -> None:
  96. tmp_dir = tempfile.mkdtemp()
  97. target_dir = os.path.join(tmp_dir, "target")
  98. self.addCleanup(shutil.rmtree, tmp_dir)
  99. repo = Repo.init(target_dir, mkdir=True)
  100. self.assertEqual(os.path.join(target_dir, ".git"), repo._controldir)
  101. self._check_repo_contents(repo, False)
  102. def test_create_disk_bare_mkdir(self) -> None:
  103. tmp_dir = tempfile.mkdtemp()
  104. target_dir = os.path.join(tmp_dir, "target")
  105. self.addCleanup(shutil.rmtree, tmp_dir)
  106. repo = Repo.init_bare(target_dir, mkdir=True)
  107. self.assertEqual(target_dir, repo._controldir)
  108. self._check_repo_contents(repo, True)
  109. def test_create_disk_bare_pathlib(self) -> None:
  110. from pathlib import Path
  111. tmp_dir = tempfile.mkdtemp()
  112. self.addCleanup(shutil.rmtree, tmp_dir)
  113. repo_path = Path(tmp_dir)
  114. repo = Repo.init_bare(repo_path)
  115. self.assertEqual(tmp_dir, repo._controldir)
  116. self._check_repo_contents(repo, True)
  117. # Test that refpath works with pathlib
  118. ref_path = repo.refs.refpath(b"refs/heads/master")
  119. self.assertTrue(isinstance(ref_path, bytes))
  120. expected_path = os.path.join(tmp_dir.encode(), b"refs", b"heads", b"master")
  121. self.assertEqual(ref_path, expected_path)
  122. def test_create_disk_non_bare_pathlib(self) -> None:
  123. from pathlib import Path
  124. tmp_dir = tempfile.mkdtemp()
  125. self.addCleanup(shutil.rmtree, tmp_dir)
  126. repo_path = Path(tmp_dir)
  127. repo = Repo.init(repo_path)
  128. self.assertEqual(os.path.join(tmp_dir, ".git"), repo._controldir)
  129. self._check_repo_contents(repo, False)
  130. def test_open_repo_pathlib(self) -> None:
  131. from pathlib import Path
  132. tmp_dir = tempfile.mkdtemp()
  133. self.addCleanup(shutil.rmtree, tmp_dir)
  134. # First create a repo
  135. repo = Repo.init_bare(tmp_dir)
  136. repo.close()
  137. # Now open it with pathlib
  138. repo_path = Path(tmp_dir)
  139. repo2 = Repo(repo_path)
  140. self.assertEqual(tmp_dir, repo2._controldir)
  141. self.assertTrue(repo2.bare)
  142. repo2.close()
  143. def test_create_disk_bare_mkdir_pathlib(self) -> None:
  144. from pathlib import Path
  145. tmp_dir = tempfile.mkdtemp()
  146. target_path = Path(tmp_dir) / "target"
  147. self.addCleanup(shutil.rmtree, tmp_dir)
  148. repo = Repo.init_bare(target_path, mkdir=True)
  149. self.assertEqual(str(target_path), repo._controldir)
  150. self._check_repo_contents(repo, True)
  151. class MemoryRepoTests(TestCase):
  152. def test_set_description(self) -> None:
  153. r = MemoryRepo.init_bare([], {})
  154. description = b"Some description"
  155. r.set_description(description)
  156. self.assertEqual(description, r.get_description())
  157. def test_pull_into(self) -> None:
  158. r = MemoryRepo.init_bare([], {})
  159. repo = open_repo("a.git")
  160. self.addCleanup(tear_down_repo, repo)
  161. repo.fetch(r)
  162. def test_fetch_from_git_cloned_repo(self) -> None:
  163. """Test fetching from a git-cloned repo into MemoryRepo (issue #1179)."""
  164. import tempfile
  165. from dulwich.client import LocalGitClient
  166. with tempfile.TemporaryDirectory() as tmpdir:
  167. # Create initial repo using dulwich
  168. initial_path = os.path.join(tmpdir, "initial")
  169. initial_repo = Repo.init(initial_path, mkdir=True)
  170. # Create some content
  171. test_file = os.path.join(initial_path, "test.txt")
  172. with open(test_file, "w") as f:
  173. f.write("test content\n")
  174. # Stage and commit using dulwich
  175. initial_repo.stage(["test.txt"])
  176. initial_repo.do_commit(
  177. b"Initial commit\n",
  178. committer=b"Test Committer <test@example.com>",
  179. author=b"Test Author <test@example.com>",
  180. )
  181. # Clone using dulwich
  182. cloned_path = os.path.join(tmpdir, "cloned")
  183. cloned_repo = initial_repo.clone(cloned_path, mkdir=True)
  184. initial_repo.close()
  185. cloned_repo.close()
  186. # Fetch from the cloned repo into MemoryRepo
  187. memory_repo = MemoryRepo()
  188. client = LocalGitClient()
  189. # This should not raise AssertionError
  190. result = client.fetch(cloned_path, memory_repo)
  191. # Verify the fetch worked
  192. self.assertIn(b"HEAD", result.refs)
  193. self.assertIn(b"refs/heads/master", result.refs)
  194. # Verify we can read the fetched objects
  195. head_sha = result.refs[b"HEAD"]
  196. commit = memory_repo[head_sha]
  197. self.assertEqual(commit.message, b"Initial commit\n")
  198. class RepositoryRootTests(TestCase):
  199. def mkdtemp(self):
  200. return tempfile.mkdtemp()
  201. def open_repo(self, name):
  202. temp_dir = self.mkdtemp()
  203. repo = open_repo(name, temp_dir)
  204. self.addCleanup(tear_down_repo, repo)
  205. return repo
  206. def test_simple_props(self) -> None:
  207. r = self.open_repo("a.git")
  208. self.assertEqual(r.controldir(), r.path)
  209. def test_setitem(self) -> None:
  210. r = self.open_repo("a.git")
  211. r[b"refs/tags/foo"] = b"a90fa2d900a17e99b433217e988c4eb4a2e9a097"
  212. self.assertEqual(
  213. b"a90fa2d900a17e99b433217e988c4eb4a2e9a097", r[b"refs/tags/foo"].id
  214. )
  215. def test_getitem_unicode(self) -> None:
  216. r = self.open_repo("a.git")
  217. test_keys = [
  218. (b"refs/heads/master", True),
  219. (b"a90fa2d900a17e99b433217e988c4eb4a2e9a097", True),
  220. (b"11" * 19 + b"--", False),
  221. ]
  222. for k, contained in test_keys:
  223. self.assertEqual(k in r, contained)
  224. # Avoid deprecation warning under Py3.2+
  225. if getattr(self, "assertRaisesRegex", None):
  226. assertRaisesRegexp = self.assertRaisesRegex
  227. else:
  228. assertRaisesRegexp = self.assertRaisesRegexp
  229. for k, _ in test_keys:
  230. assertRaisesRegexp(
  231. TypeError,
  232. "'name' must be bytestring, not int",
  233. r.__getitem__,
  234. 12,
  235. )
  236. def test_delitem(self) -> None:
  237. r = self.open_repo("a.git")
  238. del r[b"refs/heads/master"]
  239. self.assertRaises(KeyError, lambda: r[b"refs/heads/master"])
  240. del r[b"HEAD"]
  241. self.assertRaises(KeyError, lambda: r[b"HEAD"])
  242. self.assertRaises(ValueError, r.__delitem__, b"notrefs/foo")
  243. def test_get_refs(self) -> None:
  244. r = self.open_repo("a.git")
  245. self.assertEqual(
  246. {
  247. b"HEAD": b"a90fa2d900a17e99b433217e988c4eb4a2e9a097",
  248. b"refs/heads/master": b"a90fa2d900a17e99b433217e988c4eb4a2e9a097",
  249. b"refs/tags/mytag": b"28237f4dc30d0d462658d6b937b08a0f0b6ef55a",
  250. b"refs/tags/mytag-packed": b"b0931cadc54336e78a1d980420e3268903b57a50",
  251. },
  252. r.get_refs(),
  253. )
  254. def test_head(self) -> None:
  255. r = self.open_repo("a.git")
  256. self.assertEqual(r.head(), b"a90fa2d900a17e99b433217e988c4eb4a2e9a097")
  257. def test_get_object(self) -> None:
  258. r = self.open_repo("a.git")
  259. obj = r.get_object(r.head())
  260. self.assertEqual(obj.type_name, b"commit")
  261. def test_get_object_non_existant(self) -> None:
  262. r = self.open_repo("a.git")
  263. self.assertRaises(KeyError, r.get_object, missing_sha)
  264. def test_contains_object(self) -> None:
  265. r = self.open_repo("a.git")
  266. self.assertIn(r.head(), r)
  267. self.assertNotIn(b"z" * 40, r)
  268. def test_contains_ref(self) -> None:
  269. r = self.open_repo("a.git")
  270. self.assertIn(b"HEAD", r)
  271. def test_get_no_description(self) -> None:
  272. r = self.open_repo("a.git")
  273. self.assertIs(None, r.get_description())
  274. def test_get_description(self) -> None:
  275. r = self.open_repo("a.git")
  276. with open(os.path.join(r.path, "description"), "wb") as f:
  277. f.write(b"Some description")
  278. self.assertEqual(b"Some description", r.get_description())
  279. def test_set_description(self) -> None:
  280. r = self.open_repo("a.git")
  281. description = b"Some description"
  282. r.set_description(description)
  283. self.assertEqual(description, r.get_description())
  284. def test_get_gitattributes(self) -> None:
  285. # Test when no .gitattributes file exists
  286. r = self.open_repo("a.git")
  287. attrs = r.get_gitattributes()
  288. from dulwich.attrs import GitAttributes
  289. self.assertIsInstance(attrs, GitAttributes)
  290. self.assertEqual(len(attrs), 0)
  291. # Create .git/info/attributes file (which is read by get_gitattributes)
  292. info_dir = os.path.join(r.controldir(), "info")
  293. if not os.path.exists(info_dir):
  294. os.makedirs(info_dir)
  295. attrs_path = os.path.join(info_dir, "attributes")
  296. with open(attrs_path, "wb") as f:
  297. f.write(b"*.txt text\n")
  298. f.write(b"*.jpg -text binary\n")
  299. # Test with attributes file
  300. attrs = r.get_gitattributes()
  301. self.assertEqual(len(attrs), 2)
  302. # Test matching
  303. txt_attrs = attrs.match_path(b"file.txt")
  304. self.assertEqual(txt_attrs, {b"text": True})
  305. jpg_attrs = attrs.match_path(b"image.jpg")
  306. self.assertEqual(jpg_attrs, {b"text": False, b"binary": True})
  307. def test_contains_missing(self) -> None:
  308. r = self.open_repo("a.git")
  309. self.assertNotIn(b"bar", r)
  310. def test_get_peeled(self) -> None:
  311. # unpacked ref
  312. r = self.open_repo("a.git")
  313. tag_sha = b"28237f4dc30d0d462658d6b937b08a0f0b6ef55a"
  314. self.assertNotEqual(r[tag_sha].sha().hexdigest(), r.head())
  315. self.assertEqual(r.get_peeled(b"refs/tags/mytag"), r.head())
  316. # packed ref with cached peeled value
  317. packed_tag_sha = b"b0931cadc54336e78a1d980420e3268903b57a50"
  318. parent_sha = r[r.head()].parents[0]
  319. self.assertNotEqual(r[packed_tag_sha].sha().hexdigest(), parent_sha)
  320. self.assertEqual(r.get_peeled(b"refs/tags/mytag-packed"), parent_sha)
  321. # TODO: add more corner cases to test repo
  322. def test_get_peeled_not_tag(self) -> None:
  323. r = self.open_repo("a.git")
  324. self.assertEqual(r.get_peeled(b"HEAD"), r.head())
  325. def test_get_parents(self) -> None:
  326. r = self.open_repo("a.git")
  327. self.assertEqual(
  328. [b"2a72d929692c41d8554c07f6301757ba18a65d91"],
  329. r.get_parents(b"a90fa2d900a17e99b433217e988c4eb4a2e9a097"),
  330. )
  331. r.update_shallow([b"a90fa2d900a17e99b433217e988c4eb4a2e9a097"], None)
  332. self.assertEqual([], r.get_parents(b"a90fa2d900a17e99b433217e988c4eb4a2e9a097"))
  333. def test_get_walker(self) -> None:
  334. r = self.open_repo("a.git")
  335. # include defaults to [r.head()]
  336. self.assertEqual(
  337. [e.commit.id for e in r.get_walker()],
  338. [r.head(), b"2a72d929692c41d8554c07f6301757ba18a65d91"],
  339. )
  340. self.assertEqual(
  341. [
  342. e.commit.id
  343. for e in r.get_walker([b"2a72d929692c41d8554c07f6301757ba18a65d91"])
  344. ],
  345. [b"2a72d929692c41d8554c07f6301757ba18a65d91"],
  346. )
  347. self.assertEqual(
  348. [
  349. e.commit.id
  350. for e in r.get_walker(b"2a72d929692c41d8554c07f6301757ba18a65d91")
  351. ],
  352. [b"2a72d929692c41d8554c07f6301757ba18a65d91"],
  353. )
  354. def assertFilesystemHidden(self, path) -> None:
  355. if sys.platform != "win32":
  356. return
  357. import ctypes
  358. from ctypes.wintypes import DWORD, LPCWSTR
  359. GetFileAttributesW = ctypes.WINFUNCTYPE(DWORD, LPCWSTR)(
  360. ("GetFileAttributesW", ctypes.windll.kernel32)
  361. )
  362. self.assertTrue(2 & GetFileAttributesW(path))
  363. def test_init_existing(self) -> None:
  364. tmp_dir = self.mkdtemp()
  365. self.addCleanup(shutil.rmtree, tmp_dir)
  366. t = Repo.init(tmp_dir)
  367. self.addCleanup(t.close)
  368. self.assertEqual(os.listdir(tmp_dir), [".git"])
  369. self.assertFilesystemHidden(os.path.join(tmp_dir, ".git"))
  370. def test_init_mkdir(self) -> None:
  371. tmp_dir = self.mkdtemp()
  372. self.addCleanup(shutil.rmtree, tmp_dir)
  373. repo_dir = os.path.join(tmp_dir, "a-repo")
  374. t = Repo.init(repo_dir, mkdir=True)
  375. self.addCleanup(t.close)
  376. self.assertEqual(os.listdir(repo_dir), [".git"])
  377. self.assertFilesystemHidden(os.path.join(repo_dir, ".git"))
  378. def test_init_mkdir_unicode(self) -> None:
  379. repo_name = "\xa7"
  380. try:
  381. os.fsencode(repo_name)
  382. except UnicodeEncodeError:
  383. self.skipTest("filesystem lacks unicode support")
  384. tmp_dir = self.mkdtemp()
  385. self.addCleanup(shutil.rmtree, tmp_dir)
  386. repo_dir = os.path.join(tmp_dir, repo_name)
  387. t = Repo.init(repo_dir, mkdir=True)
  388. self.addCleanup(t.close)
  389. self.assertEqual(os.listdir(repo_dir), [".git"])
  390. self.assertFilesystemHidden(os.path.join(repo_dir, ".git"))
  391. def test_init_format(self) -> None:
  392. tmp_dir = self.mkdtemp()
  393. self.addCleanup(shutil.rmtree, tmp_dir)
  394. # Test format 0
  395. t0 = Repo.init(tmp_dir + "0", mkdir=True, format=0)
  396. self.addCleanup(t0.close)
  397. self.assertEqual(t0.get_config().get("core", "repositoryformatversion"), b"0")
  398. # Test format 1
  399. t1 = Repo.init(tmp_dir + "1", mkdir=True, format=1)
  400. self.addCleanup(t1.close)
  401. self.assertEqual(t1.get_config().get("core", "repositoryformatversion"), b"1")
  402. # Test default format
  403. td = Repo.init(tmp_dir + "d", mkdir=True)
  404. self.addCleanup(td.close)
  405. self.assertEqual(td.get_config().get("core", "repositoryformatversion"), b"0")
  406. # Test invalid format
  407. with self.assertRaises(ValueError):
  408. Repo.init(tmp_dir + "bad", mkdir=True, format=99)
  409. def test_init_bare_format(self) -> None:
  410. tmp_dir = self.mkdtemp()
  411. self.addCleanup(shutil.rmtree, tmp_dir)
  412. # Test format 1 for bare repo
  413. t = Repo.init_bare(tmp_dir + "bare", mkdir=True, format=1)
  414. self.addCleanup(t.close)
  415. self.assertEqual(t.get_config().get("core", "repositoryformatversion"), b"1")
  416. # Test invalid format for bare repo
  417. with self.assertRaises(ValueError):
  418. Repo.init_bare(tmp_dir + "badbr", mkdir=True, format=2)
  419. @skipIf(sys.platform == "win32", "fails on Windows")
  420. def test_fetch(self) -> None:
  421. r = self.open_repo("a.git")
  422. tmp_dir = self.mkdtemp()
  423. self.addCleanup(shutil.rmtree, tmp_dir)
  424. t = Repo.init(tmp_dir)
  425. self.addCleanup(t.close)
  426. r.fetch(t)
  427. self.assertIn(b"a90fa2d900a17e99b433217e988c4eb4a2e9a097", t)
  428. self.assertIn(b"a90fa2d900a17e99b433217e988c4eb4a2e9a097", t)
  429. self.assertIn(b"a90fa2d900a17e99b433217e988c4eb4a2e9a097", t)
  430. self.assertIn(b"28237f4dc30d0d462658d6b937b08a0f0b6ef55a", t)
  431. self.assertIn(b"b0931cadc54336e78a1d980420e3268903b57a50", t)
  432. @skipIf(sys.platform == "win32", "fails on Windows")
  433. def test_fetch_ignores_missing_refs(self) -> None:
  434. r = self.open_repo("a.git")
  435. missing = b"1234566789123456789123567891234657373833"
  436. r.refs[b"refs/heads/blah"] = missing
  437. tmp_dir = self.mkdtemp()
  438. self.addCleanup(shutil.rmtree, tmp_dir)
  439. t = Repo.init(tmp_dir)
  440. self.addCleanup(t.close)
  441. r.fetch(t)
  442. self.assertIn(b"a90fa2d900a17e99b433217e988c4eb4a2e9a097", t)
  443. self.assertIn(b"a90fa2d900a17e99b433217e988c4eb4a2e9a097", t)
  444. self.assertIn(b"a90fa2d900a17e99b433217e988c4eb4a2e9a097", t)
  445. self.assertIn(b"28237f4dc30d0d462658d6b937b08a0f0b6ef55a", t)
  446. self.assertIn(b"b0931cadc54336e78a1d980420e3268903b57a50", t)
  447. self.assertNotIn(missing, t)
  448. def test_clone(self) -> None:
  449. r = self.open_repo("a.git")
  450. tmp_dir = self.mkdtemp()
  451. self.addCleanup(shutil.rmtree, tmp_dir)
  452. with r.clone(tmp_dir, mkdir=False) as t:
  453. self.assertEqual(
  454. {
  455. b"HEAD": b"a90fa2d900a17e99b433217e988c4eb4a2e9a097",
  456. b"refs/remotes/origin/master": b"a90fa2d900a17e99b433217e988c4eb4a2e9a097",
  457. b"refs/remotes/origin/HEAD": b"a90fa2d900a17e99b433217e988c4eb4a2e9a097",
  458. b"refs/heads/master": b"a90fa2d900a17e99b433217e988c4eb4a2e9a097",
  459. b"refs/tags/mytag": b"28237f4dc30d0d462658d6b937b08a0f0b6ef55a",
  460. b"refs/tags/mytag-packed": b"b0931cadc54336e78a1d980420e3268903b57a50",
  461. },
  462. t.refs.as_dict(),
  463. )
  464. shas = [e.commit.id for e in r.get_walker()]
  465. self.assertEqual(
  466. shas, [t.head(), b"2a72d929692c41d8554c07f6301757ba18a65d91"]
  467. )
  468. c = t.get_config()
  469. encoded_path = r.path
  470. if not isinstance(encoded_path, bytes):
  471. encoded_path = os.fsencode(encoded_path)
  472. self.assertEqual(encoded_path, c.get((b"remote", b"origin"), b"url"))
  473. self.assertEqual(
  474. b"+refs/heads/*:refs/remotes/origin/*",
  475. c.get((b"remote", b"origin"), b"fetch"),
  476. )
  477. def test_clone_no_head(self) -> None:
  478. temp_dir = self.mkdtemp()
  479. self.addCleanup(shutil.rmtree, temp_dir)
  480. repo_dir = os.path.join(os.path.dirname(__file__), "..", "testdata", "repos")
  481. dest_dir = os.path.join(temp_dir, "a.git")
  482. shutil.copytree(os.path.join(repo_dir, "a.git"), dest_dir, symlinks=True)
  483. r = Repo(dest_dir)
  484. self.addCleanup(r.close)
  485. del r.refs[b"refs/heads/master"]
  486. del r.refs[b"HEAD"]
  487. t = r.clone(os.path.join(temp_dir, "b.git"), mkdir=True)
  488. self.addCleanup(t.close)
  489. self.assertEqual(
  490. {
  491. b"refs/tags/mytag": b"28237f4dc30d0d462658d6b937b08a0f0b6ef55a",
  492. b"refs/tags/mytag-packed": b"b0931cadc54336e78a1d980420e3268903b57a50",
  493. },
  494. t.refs.as_dict(),
  495. )
  496. def test_clone_empty(self) -> None:
  497. """Test clone() doesn't crash if HEAD points to a non-existing ref.
  498. This simulates cloning server-side bare repository either when it is
  499. still empty or if user renames master branch and pushes private repo
  500. to the server.
  501. Non-bare repo HEAD always points to an existing ref.
  502. """
  503. r = self.open_repo("empty.git")
  504. tmp_dir = self.mkdtemp()
  505. self.addCleanup(shutil.rmtree, tmp_dir)
  506. r.clone(tmp_dir, mkdir=False, bare=True)
  507. def test_reset_index_symlink_enabled(self) -> None:
  508. if sys.platform == "win32":
  509. self.skipTest("symlinks are not supported on Windows")
  510. tmp_dir = self.mkdtemp()
  511. self.addCleanup(shutil.rmtree, tmp_dir)
  512. o = Repo.init(os.path.join(tmp_dir, "s"), mkdir=True)
  513. os.symlink("foo", os.path.join(tmp_dir, "s", "bar"))
  514. o.stage("bar")
  515. o.do_commit(b"add symlink")
  516. t = o.clone(os.path.join(tmp_dir, "t"), symlinks=True)
  517. o.close()
  518. bar_path = os.path.join(tmp_dir, "t", "bar")
  519. if sys.platform == "win32":
  520. with open(bar_path) as f:
  521. self.assertEqual("foo", f.read())
  522. else:
  523. self.assertEqual("foo", os.readlink(bar_path))
  524. t.close()
  525. def test_reset_index_symlink_disabled(self) -> None:
  526. tmp_dir = self.mkdtemp()
  527. self.addCleanup(shutil.rmtree, tmp_dir)
  528. o = Repo.init(os.path.join(tmp_dir, "s"), mkdir=True)
  529. o.close()
  530. os.symlink("foo", os.path.join(tmp_dir, "s", "bar"))
  531. o.stage("bar")
  532. o.do_commit(b"add symlink")
  533. t = o.clone(os.path.join(tmp_dir, "t"), symlinks=False)
  534. with open(os.path.join(tmp_dir, "t", "bar")) as f:
  535. self.assertEqual("foo", f.read())
  536. t.close()
  537. def test_clone_bare(self) -> None:
  538. r = self.open_repo("a.git")
  539. tmp_dir = self.mkdtemp()
  540. self.addCleanup(shutil.rmtree, tmp_dir)
  541. t = r.clone(tmp_dir, mkdir=False)
  542. t.close()
  543. def test_clone_checkout_and_bare(self) -> None:
  544. r = self.open_repo("a.git")
  545. tmp_dir = self.mkdtemp()
  546. self.addCleanup(shutil.rmtree, tmp_dir)
  547. self.assertRaises(
  548. ValueError, r.clone, tmp_dir, mkdir=False, checkout=True, bare=True
  549. )
  550. def test_clone_branch(self) -> None:
  551. r = self.open_repo("a.git")
  552. r.refs[b"refs/heads/mybranch"] = b"28237f4dc30d0d462658d6b937b08a0f0b6ef55a"
  553. tmp_dir = self.mkdtemp()
  554. self.addCleanup(shutil.rmtree, tmp_dir)
  555. with r.clone(tmp_dir, mkdir=False, branch=b"mybranch") as t:
  556. # HEAD should point to specified branch and not origin HEAD
  557. chain, sha = t.refs.follow(b"HEAD")
  558. self.assertEqual(chain[-1], b"refs/heads/mybranch")
  559. self.assertEqual(sha, b"28237f4dc30d0d462658d6b937b08a0f0b6ef55a")
  560. self.assertEqual(
  561. t.refs[b"refs/remotes/origin/HEAD"],
  562. b"a90fa2d900a17e99b433217e988c4eb4a2e9a097",
  563. )
  564. def test_clone_tag(self) -> None:
  565. r = self.open_repo("a.git")
  566. tmp_dir = self.mkdtemp()
  567. self.addCleanup(shutil.rmtree, tmp_dir)
  568. with r.clone(tmp_dir, mkdir=False, branch=b"mytag") as t:
  569. # HEAD should be detached (and not a symbolic ref) at tag
  570. self.assertEqual(
  571. t.refs.read_ref(b"HEAD"),
  572. b"28237f4dc30d0d462658d6b937b08a0f0b6ef55a",
  573. )
  574. self.assertEqual(
  575. t.refs[b"refs/remotes/origin/HEAD"],
  576. b"a90fa2d900a17e99b433217e988c4eb4a2e9a097",
  577. )
  578. def test_clone_invalid_branch(self) -> None:
  579. r = self.open_repo("a.git")
  580. tmp_dir = self.mkdtemp()
  581. self.addCleanup(shutil.rmtree, tmp_dir)
  582. self.assertRaises(
  583. ValueError,
  584. r.clone,
  585. tmp_dir,
  586. mkdir=False,
  587. branch=b"mybranch",
  588. )
  589. def test_merge_history(self) -> None:
  590. r = self.open_repo("simple_merge.git")
  591. shas = [e.commit.id for e in r.get_walker()]
  592. self.assertEqual(
  593. shas,
  594. [
  595. b"5dac377bdded4c9aeb8dff595f0faeebcc8498cc",
  596. b"ab64bbdcc51b170d21588e5c5d391ee5c0c96dfd",
  597. b"4cffe90e0a41ad3f5190079d7c8f036bde29cbe6",
  598. b"60dacdc733de308bb77bb76ce0fb0f9b44c9769e",
  599. b"0d89f20333fbb1d2f3a94da77f4981373d8f4310",
  600. ],
  601. )
  602. def test_out_of_order_merge(self) -> None:
  603. """Test that revision history is ordered by date, not parent order."""
  604. r = self.open_repo("ooo_merge.git")
  605. shas = [e.commit.id for e in r.get_walker()]
  606. self.assertEqual(
  607. shas,
  608. [
  609. b"7601d7f6231db6a57f7bbb79ee52e4d462fd44d1",
  610. b"f507291b64138b875c28e03469025b1ea20bc614",
  611. b"fb5b0425c7ce46959bec94d54b9a157645e114f5",
  612. b"f9e39b120c68182a4ba35349f832d0e4e61f485c",
  613. ],
  614. )
  615. def test_get_tags_empty(self) -> None:
  616. r = self.open_repo("ooo_merge.git")
  617. self.assertEqual({}, r.refs.as_dict(b"refs/tags"))
  618. def test_get_config(self) -> None:
  619. r = self.open_repo("ooo_merge.git")
  620. self.assertIsInstance(r.get_config(), Config)
  621. def test_get_config_stack(self) -> None:
  622. r = self.open_repo("ooo_merge.git")
  623. self.assertIsInstance(r.get_config_stack(), Config)
  624. def test_common_revisions(self) -> None:
  625. """This test demonstrates that ``find_common_revisions()`` actually
  626. returns common heads, not revisions; dulwich already uses
  627. ``find_common_revisions()`` in such a manner (see
  628. ``Repo.find_objects()``).
  629. """
  630. expected_shas = {b"60dacdc733de308bb77bb76ce0fb0f9b44c9769e"}
  631. # Source for objects.
  632. r_base = self.open_repo("simple_merge.git")
  633. # Re-create each-side of the merge in simple_merge.git.
  634. #
  635. # Since the trees and blobs are missing, the repository created is
  636. # corrupted, but we're only checking for commits for the purpose of
  637. # this test, so it's immaterial.
  638. r1_dir = self.mkdtemp()
  639. self.addCleanup(shutil.rmtree, r1_dir)
  640. r1_commits = [
  641. b"ab64bbdcc51b170d21588e5c5d391ee5c0c96dfd", # HEAD
  642. b"60dacdc733de308bb77bb76ce0fb0f9b44c9769e",
  643. b"0d89f20333fbb1d2f3a94da77f4981373d8f4310",
  644. ]
  645. r2_dir = self.mkdtemp()
  646. self.addCleanup(shutil.rmtree, r2_dir)
  647. r2_commits = [
  648. b"4cffe90e0a41ad3f5190079d7c8f036bde29cbe6", # HEAD
  649. b"60dacdc733de308bb77bb76ce0fb0f9b44c9769e",
  650. b"0d89f20333fbb1d2f3a94da77f4981373d8f4310",
  651. ]
  652. r1 = Repo.init_bare(r1_dir)
  653. for c in r1_commits:
  654. r1.object_store.add_object(r_base.get_object(c))
  655. r1.refs[b"HEAD"] = r1_commits[0]
  656. r2 = Repo.init_bare(r2_dir)
  657. for c in r2_commits:
  658. r2.object_store.add_object(r_base.get_object(c))
  659. r2.refs[b"HEAD"] = r2_commits[0]
  660. # Finally, the 'real' testing!
  661. shas = r2.object_store.find_common_revisions(r1.get_graph_walker())
  662. self.assertEqual(set(shas), expected_shas)
  663. shas = r1.object_store.find_common_revisions(r2.get_graph_walker())
  664. self.assertEqual(set(shas), expected_shas)
  665. def test_shell_hook_pre_commit(self) -> None:
  666. if os.name != "posix":
  667. self.skipTest("shell hook tests requires POSIX shell")
  668. pre_commit_fail = """#!/bin/sh
  669. exit 1
  670. """
  671. pre_commit_success = """#!/bin/sh
  672. exit 0
  673. """
  674. repo_dir = os.path.join(self.mkdtemp())
  675. self.addCleanup(shutil.rmtree, repo_dir)
  676. r = Repo.init(repo_dir)
  677. self.addCleanup(r.close)
  678. pre_commit = os.path.join(r.controldir(), "hooks", "pre-commit")
  679. with open(pre_commit, "w") as f:
  680. f.write(pre_commit_fail)
  681. os.chmod(pre_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
  682. self.assertRaises(
  683. errors.CommitError,
  684. r.do_commit,
  685. b"failed commit",
  686. committer=b"Test Committer <test@nodomain.com>",
  687. author=b"Test Author <test@nodomain.com>",
  688. commit_timestamp=12345,
  689. commit_timezone=0,
  690. author_timestamp=12345,
  691. author_timezone=0,
  692. )
  693. with open(pre_commit, "w") as f:
  694. f.write(pre_commit_success)
  695. os.chmod(pre_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
  696. commit_sha = r.do_commit(
  697. b"empty commit",
  698. committer=b"Test Committer <test@nodomain.com>",
  699. author=b"Test Author <test@nodomain.com>",
  700. commit_timestamp=12395,
  701. commit_timezone=0,
  702. author_timestamp=12395,
  703. author_timezone=0,
  704. )
  705. self.assertEqual([], r[commit_sha].parents)
  706. def test_shell_hook_commit_msg(self) -> None:
  707. if os.name != "posix":
  708. self.skipTest("shell hook tests requires POSIX shell")
  709. commit_msg_fail = """#!/bin/sh
  710. exit 1
  711. """
  712. commit_msg_success = """#!/bin/sh
  713. exit 0
  714. """
  715. repo_dir = self.mkdtemp()
  716. self.addCleanup(shutil.rmtree, repo_dir)
  717. r = Repo.init(repo_dir)
  718. self.addCleanup(r.close)
  719. commit_msg = os.path.join(r.controldir(), "hooks", "commit-msg")
  720. with open(commit_msg, "w") as f:
  721. f.write(commit_msg_fail)
  722. os.chmod(commit_msg, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
  723. self.assertRaises(
  724. errors.CommitError,
  725. r.do_commit,
  726. b"failed commit",
  727. committer=b"Test Committer <test@nodomain.com>",
  728. author=b"Test Author <test@nodomain.com>",
  729. commit_timestamp=12345,
  730. commit_timezone=0,
  731. author_timestamp=12345,
  732. author_timezone=0,
  733. )
  734. with open(commit_msg, "w") as f:
  735. f.write(commit_msg_success)
  736. os.chmod(commit_msg, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
  737. commit_sha = r.do_commit(
  738. b"empty commit",
  739. committer=b"Test Committer <test@nodomain.com>",
  740. author=b"Test Author <test@nodomain.com>",
  741. commit_timestamp=12395,
  742. commit_timezone=0,
  743. author_timestamp=12395,
  744. author_timezone=0,
  745. )
  746. self.assertEqual([], r[commit_sha].parents)
  747. def test_shell_hook_pre_commit_add_files(self) -> None:
  748. if os.name != "posix":
  749. self.skipTest("shell hook tests requires POSIX shell")
  750. pre_commit_contents = """#!{executable}
  751. import sys
  752. sys.path.extend({path!r})
  753. from dulwich.repo import Repo
  754. with open('foo', 'w') as f:
  755. f.write('newfile')
  756. r = Repo('.')
  757. r.stage(['foo'])
  758. """.format(
  759. executable=sys.executable,
  760. path=[os.path.join(os.path.dirname(__file__), "..", ".."), *sys.path],
  761. )
  762. repo_dir = os.path.join(self.mkdtemp())
  763. self.addCleanup(shutil.rmtree, repo_dir)
  764. r = Repo.init(repo_dir)
  765. self.addCleanup(r.close)
  766. with open(os.path.join(repo_dir, "blah"), "w") as f:
  767. f.write("blah")
  768. r.stage(["blah"])
  769. pre_commit = os.path.join(r.controldir(), "hooks", "pre-commit")
  770. with open(pre_commit, "w") as f:
  771. f.write(pre_commit_contents)
  772. os.chmod(pre_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
  773. commit_sha = r.do_commit(
  774. b"new commit",
  775. committer=b"Test Committer <test@nodomain.com>",
  776. author=b"Test Author <test@nodomain.com>",
  777. commit_timestamp=12395,
  778. commit_timezone=0,
  779. author_timestamp=12395,
  780. author_timezone=0,
  781. )
  782. self.assertEqual([], r[commit_sha].parents)
  783. tree = r[r[commit_sha].tree]
  784. self.assertEqual({b"blah", b"foo"}, set(tree))
  785. def test_shell_hook_post_commit(self) -> None:
  786. if os.name != "posix":
  787. self.skipTest("shell hook tests requires POSIX shell")
  788. repo_dir = self.mkdtemp()
  789. self.addCleanup(shutil.rmtree, repo_dir)
  790. r = Repo.init(repo_dir)
  791. self.addCleanup(r.close)
  792. (fd, path) = tempfile.mkstemp(dir=repo_dir)
  793. os.close(fd)
  794. post_commit_msg = (
  795. """#!/bin/sh
  796. rm """
  797. + path
  798. + """
  799. """
  800. )
  801. root_sha = r.do_commit(
  802. b"empty commit",
  803. committer=b"Test Committer <test@nodomain.com>",
  804. author=b"Test Author <test@nodomain.com>",
  805. commit_timestamp=12345,
  806. commit_timezone=0,
  807. author_timestamp=12345,
  808. author_timezone=0,
  809. )
  810. self.assertEqual([], r[root_sha].parents)
  811. post_commit = os.path.join(r.controldir(), "hooks", "post-commit")
  812. with open(post_commit, "wb") as f:
  813. f.write(post_commit_msg.encode(locale.getpreferredencoding()))
  814. os.chmod(post_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
  815. commit_sha = r.do_commit(
  816. b"empty commit",
  817. committer=b"Test Committer <test@nodomain.com>",
  818. author=b"Test Author <test@nodomain.com>",
  819. commit_timestamp=12345,
  820. commit_timezone=0,
  821. author_timestamp=12345,
  822. author_timezone=0,
  823. )
  824. self.assertEqual([root_sha], r[commit_sha].parents)
  825. self.assertFalse(os.path.exists(path))
  826. post_commit_msg_fail = """#!/bin/sh
  827. exit 1
  828. """
  829. with open(post_commit, "w") as f:
  830. f.write(post_commit_msg_fail)
  831. os.chmod(post_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
  832. warnings.simplefilter("always", UserWarning)
  833. self.addCleanup(warnings.resetwarnings)
  834. warnings_list, restore_warnings = setup_warning_catcher()
  835. self.addCleanup(restore_warnings)
  836. commit_sha2 = r.do_commit(
  837. b"empty commit",
  838. committer=b"Test Committer <test@nodomain.com>",
  839. author=b"Test Author <test@nodomain.com>",
  840. commit_timestamp=12345,
  841. commit_timezone=0,
  842. author_timestamp=12345,
  843. author_timezone=0,
  844. )
  845. expected_warning = UserWarning(
  846. "post-commit hook failed: Hook post-commit exited with non-zero status 1",
  847. )
  848. for w in warnings_list:
  849. if type(w) is type(expected_warning) and w.args == expected_warning.args:
  850. break
  851. else:
  852. raise AssertionError(
  853. f"Expected warning {expected_warning!r} not in {warnings_list!r}"
  854. )
  855. self.assertEqual([commit_sha], r[commit_sha2].parents)
  856. def test_as_dict(self) -> None:
  857. def check(repo) -> None:
  858. self.assertEqual(
  859. repo.refs.subkeys(b"refs/tags"),
  860. repo.refs.subkeys(b"refs/tags/"),
  861. )
  862. self.assertEqual(
  863. repo.refs.as_dict(b"refs/tags"),
  864. repo.refs.as_dict(b"refs/tags/"),
  865. )
  866. self.assertEqual(
  867. repo.refs.as_dict(b"refs/heads"),
  868. repo.refs.as_dict(b"refs/heads/"),
  869. )
  870. bare = self.open_repo("a.git")
  871. tmp_dir = self.mkdtemp()
  872. self.addCleanup(shutil.rmtree, tmp_dir)
  873. with bare.clone(tmp_dir, mkdir=False) as nonbare:
  874. check(nonbare)
  875. check(bare)
  876. def test_working_tree(self) -> None:
  877. temp_dir = tempfile.mkdtemp()
  878. self.addCleanup(shutil.rmtree, temp_dir)
  879. worktree_temp_dir = tempfile.mkdtemp()
  880. self.addCleanup(shutil.rmtree, worktree_temp_dir)
  881. r = Repo.init(temp_dir)
  882. self.addCleanup(r.close)
  883. root_sha = r.do_commit(
  884. b"empty commit",
  885. committer=b"Test Committer <test@nodomain.com>",
  886. author=b"Test Author <test@nodomain.com>",
  887. commit_timestamp=12345,
  888. commit_timezone=0,
  889. author_timestamp=12345,
  890. author_timezone=0,
  891. )
  892. r.refs[b"refs/heads/master"] = root_sha
  893. w = Repo._init_new_working_directory(worktree_temp_dir, r)
  894. self.addCleanup(w.close)
  895. new_sha = w.do_commit(
  896. b"new commit",
  897. committer=b"Test Committer <test@nodomain.com>",
  898. author=b"Test Author <test@nodomain.com>",
  899. commit_timestamp=12345,
  900. commit_timezone=0,
  901. author_timestamp=12345,
  902. author_timezone=0,
  903. )
  904. w.refs[b"HEAD"] = new_sha
  905. self.assertEqual(
  906. os.path.abspath(r.controldir()), os.path.abspath(w.commondir())
  907. )
  908. self.assertEqual(r.refs.keys(), w.refs.keys())
  909. self.assertNotEqual(r.head(), w.head())
  910. class BuildRepoRootTests(TestCase):
  911. """Tests that build on-disk repos from scratch.
  912. Repos live in a temp dir and are torn down after each test. They start with
  913. a single commit in master having single file named 'a'.
  914. """
  915. def get_repo_dir(self):
  916. return os.path.join(tempfile.mkdtemp(), "test")
  917. def setUp(self) -> None:
  918. super().setUp()
  919. self._repo_dir = self.get_repo_dir()
  920. os.makedirs(self._repo_dir)
  921. r = self._repo = Repo.init(self._repo_dir)
  922. self.addCleanup(tear_down_repo, r)
  923. self.assertFalse(r.bare)
  924. self.assertEqual(b"ref: refs/heads/master", r.refs.read_ref(b"HEAD"))
  925. self.assertRaises(KeyError, lambda: r.refs[b"refs/heads/master"])
  926. with open(os.path.join(r.path, "a"), "wb") as f:
  927. f.write(b"file contents")
  928. r.stage(["a"])
  929. commit_sha = r.do_commit(
  930. b"msg",
  931. committer=b"Test Committer <test@nodomain.com>",
  932. author=b"Test Author <test@nodomain.com>",
  933. commit_timestamp=12345,
  934. commit_timezone=0,
  935. author_timestamp=12345,
  936. author_timezone=0,
  937. )
  938. self.assertEqual([], r[commit_sha].parents)
  939. self._root_commit = commit_sha
  940. def test_get_shallow(self) -> None:
  941. self.assertEqual(set(), self._repo.get_shallow())
  942. with open(os.path.join(self._repo.path, ".git", "shallow"), "wb") as f:
  943. f.write(b"a90fa2d900a17e99b433217e988c4eb4a2e9a097\n")
  944. self.assertEqual(
  945. {b"a90fa2d900a17e99b433217e988c4eb4a2e9a097"},
  946. self._repo.get_shallow(),
  947. )
  948. def test_update_shallow(self) -> None:
  949. self._repo.update_shallow(None, None) # no op
  950. self.assertEqual(set(), self._repo.get_shallow())
  951. self._repo.update_shallow([b"a90fa2d900a17e99b433217e988c4eb4a2e9a097"], None)
  952. self.assertEqual(
  953. {b"a90fa2d900a17e99b433217e988c4eb4a2e9a097"},
  954. self._repo.get_shallow(),
  955. )
  956. self._repo.update_shallow(
  957. [b"a90fa2d900a17e99b433217e988c4eb4a2e9a097"],
  958. [b"f9e39b120c68182a4ba35349f832d0e4e61f485c"],
  959. )
  960. self.assertEqual(
  961. {b"a90fa2d900a17e99b433217e988c4eb4a2e9a097"},
  962. self._repo.get_shallow(),
  963. )
  964. self._repo.update_shallow(None, [b"a90fa2d900a17e99b433217e988c4eb4a2e9a097"])
  965. self.assertEqual(set(), self._repo.get_shallow())
  966. self.assertEqual(
  967. False,
  968. os.path.exists(os.path.join(self._repo.controldir(), "shallow")),
  969. )
  970. def test_build_repo(self) -> None:
  971. r = self._repo
  972. self.assertEqual(b"ref: refs/heads/master", r.refs.read_ref(b"HEAD"))
  973. self.assertEqual(self._root_commit, r.refs[b"refs/heads/master"])
  974. expected_blob = objects.Blob.from_string(b"file contents")
  975. self.assertEqual(expected_blob.data, r[expected_blob.id].data)
  976. actual_commit = r[self._root_commit]
  977. self.assertEqual(b"msg", actual_commit.message)
  978. def test_commit_modified(self) -> None:
  979. r = self._repo
  980. with open(os.path.join(r.path, "a"), "wb") as f:
  981. f.write(b"new contents")
  982. r.stage(["a"])
  983. commit_sha = r.do_commit(
  984. b"modified a",
  985. committer=b"Test Committer <test@nodomain.com>",
  986. author=b"Test Author <test@nodomain.com>",
  987. commit_timestamp=12395,
  988. commit_timezone=0,
  989. author_timestamp=12395,
  990. author_timezone=0,
  991. )
  992. self.assertEqual([self._root_commit], r[commit_sha].parents)
  993. a_mode, a_id = tree_lookup_path(r.get_object, r[commit_sha].tree, b"a")
  994. self.assertEqual(stat.S_IFREG | 0o644, a_mode)
  995. self.assertEqual(b"new contents", r[a_id].data)
  996. @skipIf(not getattr(os, "symlink", None), "Requires symlink support")
  997. def test_commit_symlink(self) -> None:
  998. r = self._repo
  999. os.symlink("a", os.path.join(r.path, "b"))
  1000. r.stage(["a", "b"])
  1001. commit_sha = r.do_commit(
  1002. b"Symlink b",
  1003. committer=b"Test Committer <test@nodomain.com>",
  1004. author=b"Test Author <test@nodomain.com>",
  1005. commit_timestamp=12395,
  1006. commit_timezone=0,
  1007. author_timestamp=12395,
  1008. author_timezone=0,
  1009. )
  1010. self.assertEqual([self._root_commit], r[commit_sha].parents)
  1011. b_mode, b_id = tree_lookup_path(r.get_object, r[commit_sha].tree, b"b")
  1012. self.assertTrue(stat.S_ISLNK(b_mode))
  1013. self.assertEqual(b"a", r[b_id].data)
  1014. def test_commit_merge_heads_file(self) -> None:
  1015. tmp_dir = tempfile.mkdtemp()
  1016. self.addCleanup(shutil.rmtree, tmp_dir)
  1017. r = Repo.init(tmp_dir)
  1018. with open(os.path.join(r.path, "a"), "w") as f:
  1019. f.write("initial text")
  1020. c1 = r.do_commit(
  1021. b"initial commit",
  1022. committer=b"Test Committer <test@nodomain.com>",
  1023. author=b"Test Author <test@nodomain.com>",
  1024. commit_timestamp=12395,
  1025. commit_timezone=0,
  1026. author_timestamp=12395,
  1027. author_timezone=0,
  1028. )
  1029. with open(os.path.join(r.path, "a"), "w") as f:
  1030. f.write("merged text")
  1031. with open(os.path.join(r.path, ".git", "MERGE_HEAD"), "w") as f:
  1032. f.write("c27a2d21dd136312d7fa9e8baabb82561a1727d0\n")
  1033. r.stage(["a"])
  1034. commit_sha = r.do_commit(
  1035. b"deleted a",
  1036. committer=b"Test Committer <test@nodomain.com>",
  1037. author=b"Test Author <test@nodomain.com>",
  1038. commit_timestamp=12395,
  1039. commit_timezone=0,
  1040. author_timestamp=12395,
  1041. author_timezone=0,
  1042. )
  1043. self.assertEqual(
  1044. [c1, b"c27a2d21dd136312d7fa9e8baabb82561a1727d0"],
  1045. r[commit_sha].parents,
  1046. )
  1047. def test_commit_deleted(self) -> None:
  1048. r = self._repo
  1049. os.remove(os.path.join(r.path, "a"))
  1050. r.stage(["a"])
  1051. commit_sha = r.do_commit(
  1052. b"deleted a",
  1053. committer=b"Test Committer <test@nodomain.com>",
  1054. author=b"Test Author <test@nodomain.com>",
  1055. commit_timestamp=12395,
  1056. commit_timezone=0,
  1057. author_timestamp=12395,
  1058. author_timezone=0,
  1059. )
  1060. self.assertEqual([self._root_commit], r[commit_sha].parents)
  1061. self.assertEqual([], list(r.open_index()))
  1062. tree = r[r[commit_sha].tree]
  1063. self.assertEqual([], list(tree.iteritems()))
  1064. def test_commit_follows(self) -> None:
  1065. r = self._repo
  1066. r.refs.set_symbolic_ref(b"HEAD", b"refs/heads/bla")
  1067. commit_sha = r.do_commit(
  1068. b"commit with strange character",
  1069. committer=b"Test Committer <test@nodomain.com>",
  1070. author=b"Test Author <test@nodomain.com>",
  1071. commit_timestamp=12395,
  1072. commit_timezone=0,
  1073. author_timestamp=12395,
  1074. author_timezone=0,
  1075. ref=b"HEAD",
  1076. )
  1077. self.assertEqual(commit_sha, r[b"refs/heads/bla"].id)
  1078. def test_commit_encoding(self) -> None:
  1079. r = self._repo
  1080. commit_sha = r.do_commit(
  1081. b"commit with strange character \xee",
  1082. committer=b"Test Committer <test@nodomain.com>",
  1083. author=b"Test Author <test@nodomain.com>",
  1084. commit_timestamp=12395,
  1085. commit_timezone=0,
  1086. author_timestamp=12395,
  1087. author_timezone=0,
  1088. encoding=b"iso8859-1",
  1089. )
  1090. self.assertEqual(b"iso8859-1", r[commit_sha].encoding)
  1091. def test_compression_level(self) -> None:
  1092. r = self._repo
  1093. c = r.get_config()
  1094. c.set(("core",), "compression", "3")
  1095. c.set(("core",), "looseCompression", "4")
  1096. c.write_to_path()
  1097. r = Repo(self._repo_dir)
  1098. self.assertEqual(r.object_store.loose_compression_level, 4)
  1099. def test_repositoryformatversion_unsupported(self) -> None:
  1100. r = self._repo
  1101. c = r.get_config()
  1102. c.set(("core",), "repositoryformatversion", "2")
  1103. c.write_to_path()
  1104. self.assertRaises(UnsupportedVersion, Repo, self._repo_dir)
  1105. def test_repositoryformatversion_1(self) -> None:
  1106. r = self._repo
  1107. c = r.get_config()
  1108. c.set(("core",), "repositoryformatversion", "1")
  1109. c.write_to_path()
  1110. Repo(self._repo_dir)
  1111. def test_worktreeconfig_extension(self) -> None:
  1112. r = self._repo
  1113. c = r.get_config()
  1114. c.set(("core",), "repositoryformatversion", "1")
  1115. c.set(("extensions",), "worktreeconfig", True)
  1116. c.write_to_path()
  1117. c = r.get_worktree_config()
  1118. c.set(("user",), "repositoryformatversion", "1")
  1119. c.set((b"user",), b"name", b"Jelmer")
  1120. c.write_to_path()
  1121. cs = r.get_config_stack()
  1122. self.assertEqual(cs.get(("user",), "name"), b"Jelmer")
  1123. def test_worktreeconfig_extension_case(self) -> None:
  1124. """Test that worktree code does not error for alternate case format."""
  1125. r = self._repo
  1126. c = r.get_config()
  1127. c.set(("core",), "repositoryformatversion", "1")
  1128. # Capitalize "Config"
  1129. c.set(("extensions",), "worktreeConfig", True)
  1130. c.write_to_path()
  1131. c = r.get_worktree_config()
  1132. c.set(("user",), "repositoryformatversion", "1")
  1133. c.set((b"user",), b"name", b"Jelmer")
  1134. c.write_to_path()
  1135. # The following line errored before
  1136. # https://github.com/jelmer/dulwich/issues/1285 was addressed
  1137. Repo(self._repo_dir)
  1138. def test_repositoryformatversion_1_extension(self) -> None:
  1139. r = self._repo
  1140. c = r.get_config()
  1141. c.set(("core",), "repositoryformatversion", "1")
  1142. c.set(("extensions",), "unknownextension", True)
  1143. c.write_to_path()
  1144. self.assertRaises(UnsupportedExtension, Repo, self._repo_dir)
  1145. def test_commit_encoding_from_config(self) -> None:
  1146. r = self._repo
  1147. c = r.get_config()
  1148. c.set(("i18n",), "commitEncoding", "iso8859-1")
  1149. c.write_to_path()
  1150. commit_sha = r.do_commit(
  1151. b"commit with strange character \xee",
  1152. committer=b"Test Committer <test@nodomain.com>",
  1153. author=b"Test Author <test@nodomain.com>",
  1154. commit_timestamp=12395,
  1155. commit_timezone=0,
  1156. author_timestamp=12395,
  1157. author_timezone=0,
  1158. )
  1159. self.assertEqual(b"iso8859-1", r[commit_sha].encoding)
  1160. def test_commit_config_identity(self) -> None:
  1161. # commit falls back to the users' identity if it wasn't specified
  1162. r = self._repo
  1163. c = r.get_config()
  1164. c.set((b"user",), b"name", b"Jelmer")
  1165. c.set((b"user",), b"email", b"jelmer@apache.org")
  1166. c.write_to_path()
  1167. commit_sha = r.do_commit(b"message")
  1168. self.assertEqual(b"Jelmer <jelmer@apache.org>", r[commit_sha].author)
  1169. self.assertEqual(b"Jelmer <jelmer@apache.org>", r[commit_sha].committer)
  1170. def test_commit_config_identity_strips_than(self) -> None:
  1171. # commit falls back to the users' identity if it wasn't specified,
  1172. # and strips superfluous <>
  1173. r = self._repo
  1174. c = r.get_config()
  1175. c.set((b"user",), b"name", b"Jelmer")
  1176. c.set((b"user",), b"email", b"<jelmer@apache.org>")
  1177. c.write_to_path()
  1178. commit_sha = r.do_commit(b"message")
  1179. self.assertEqual(b"Jelmer <jelmer@apache.org>", r[commit_sha].author)
  1180. self.assertEqual(b"Jelmer <jelmer@apache.org>", r[commit_sha].committer)
  1181. def test_commit_config_identity_in_memoryrepo(self) -> None:
  1182. # commit falls back to the users' identity if it wasn't specified
  1183. r = MemoryRepo.init_bare([], {})
  1184. c = r.get_config()
  1185. c.set((b"user",), b"name", b"Jelmer")
  1186. c.set((b"user",), b"email", b"jelmer@apache.org")
  1187. commit_sha = r.do_commit(b"message", tree=objects.Tree().id)
  1188. self.assertEqual(b"Jelmer <jelmer@apache.org>", r[commit_sha].author)
  1189. self.assertEqual(b"Jelmer <jelmer@apache.org>", r[commit_sha].committer)
  1190. def test_commit_config_identity_from_env(self) -> None:
  1191. # commit falls back to the users' identity if it wasn't specified
  1192. self.overrideEnv("GIT_COMMITTER_NAME", "joe")
  1193. self.overrideEnv("GIT_COMMITTER_EMAIL", "joe@example.com")
  1194. r = self._repo
  1195. c = r.get_config()
  1196. c.set((b"user",), b"name", b"Jelmer")
  1197. c.set((b"user",), b"email", b"jelmer@apache.org")
  1198. c.write_to_path()
  1199. commit_sha = r.do_commit(b"message")
  1200. self.assertEqual(b"Jelmer <jelmer@apache.org>", r[commit_sha].author)
  1201. self.assertEqual(b"joe <joe@example.com>", r[commit_sha].committer)
  1202. def test_commit_fail_ref(self) -> None:
  1203. r = self._repo
  1204. def set_if_equals(name, old_ref, new_ref, **kwargs) -> bool:
  1205. return False
  1206. r.refs.set_if_equals = set_if_equals
  1207. def add_if_new(name, new_ref, **kwargs) -> None:
  1208. self.fail("Unexpected call to add_if_new")
  1209. r.refs.add_if_new = add_if_new
  1210. old_shas = set(r.object_store)
  1211. self.assertRaises(
  1212. errors.CommitError,
  1213. r.do_commit,
  1214. b"failed commit",
  1215. committer=b"Test Committer <test@nodomain.com>",
  1216. author=b"Test Author <test@nodomain.com>",
  1217. commit_timestamp=12345,
  1218. commit_timezone=0,
  1219. author_timestamp=12345,
  1220. author_timezone=0,
  1221. )
  1222. new_shas = set(r.object_store) - old_shas
  1223. self.assertEqual(1, len(new_shas))
  1224. # Check that the new commit (now garbage) was added.
  1225. new_commit = r[new_shas.pop()]
  1226. self.assertEqual(r[self._root_commit].tree, new_commit.tree)
  1227. self.assertEqual(b"failed commit", new_commit.message)
  1228. def test_commit_branch(self) -> None:
  1229. r = self._repo
  1230. commit_sha = r.do_commit(
  1231. b"commit to branch",
  1232. committer=b"Test Committer <test@nodomain.com>",
  1233. author=b"Test Author <test@nodomain.com>",
  1234. commit_timestamp=12395,
  1235. commit_timezone=0,
  1236. author_timestamp=12395,
  1237. author_timezone=0,
  1238. ref=b"refs/heads/new_branch",
  1239. )
  1240. self.assertEqual(self._root_commit, r[b"HEAD"].id)
  1241. self.assertEqual(commit_sha, r[b"refs/heads/new_branch"].id)
  1242. self.assertEqual([], r[commit_sha].parents)
  1243. self.assertIn(b"refs/heads/new_branch", r)
  1244. new_branch_head = commit_sha
  1245. commit_sha = r.do_commit(
  1246. b"commit to branch 2",
  1247. committer=b"Test Committer <test@nodomain.com>",
  1248. author=b"Test Author <test@nodomain.com>",
  1249. commit_timestamp=12395,
  1250. commit_timezone=0,
  1251. author_timestamp=12395,
  1252. author_timezone=0,
  1253. ref=b"refs/heads/new_branch",
  1254. )
  1255. self.assertEqual(self._root_commit, r[b"HEAD"].id)
  1256. self.assertEqual(commit_sha, r[b"refs/heads/new_branch"].id)
  1257. self.assertEqual([new_branch_head], r[commit_sha].parents)
  1258. def test_commit_merge_heads(self) -> None:
  1259. r = self._repo
  1260. merge_1 = r.do_commit(
  1261. b"commit to branch 2",
  1262. committer=b"Test Committer <test@nodomain.com>",
  1263. author=b"Test Author <test@nodomain.com>",
  1264. commit_timestamp=12395,
  1265. commit_timezone=0,
  1266. author_timestamp=12395,
  1267. author_timezone=0,
  1268. ref=b"refs/heads/new_branch",
  1269. )
  1270. commit_sha = r.do_commit(
  1271. b"commit with merge",
  1272. committer=b"Test Committer <test@nodomain.com>",
  1273. author=b"Test Author <test@nodomain.com>",
  1274. commit_timestamp=12395,
  1275. commit_timezone=0,
  1276. author_timestamp=12395,
  1277. author_timezone=0,
  1278. merge_heads=[merge_1],
  1279. )
  1280. self.assertEqual([self._root_commit, merge_1], r[commit_sha].parents)
  1281. def test_commit_dangling_commit(self) -> None:
  1282. r = self._repo
  1283. old_shas = set(r.object_store)
  1284. old_refs = r.get_refs()
  1285. commit_sha = r.do_commit(
  1286. b"commit with no ref",
  1287. committer=b"Test Committer <test@nodomain.com>",
  1288. author=b"Test Author <test@nodomain.com>",
  1289. commit_timestamp=12395,
  1290. commit_timezone=0,
  1291. author_timestamp=12395,
  1292. author_timezone=0,
  1293. ref=None,
  1294. )
  1295. new_shas = set(r.object_store) - old_shas
  1296. # New sha is added, but no new refs
  1297. self.assertEqual(1, len(new_shas))
  1298. new_commit = r[new_shas.pop()]
  1299. self.assertEqual(r[self._root_commit].tree, new_commit.tree)
  1300. self.assertEqual([], r[commit_sha].parents)
  1301. self.assertEqual(old_refs, r.get_refs())
  1302. def test_commit_dangling_commit_with_parents(self) -> None:
  1303. r = self._repo
  1304. old_shas = set(r.object_store)
  1305. old_refs = r.get_refs()
  1306. commit_sha = r.do_commit(
  1307. b"commit with no ref",
  1308. committer=b"Test Committer <test@nodomain.com>",
  1309. author=b"Test Author <test@nodomain.com>",
  1310. commit_timestamp=12395,
  1311. commit_timezone=0,
  1312. author_timestamp=12395,
  1313. author_timezone=0,
  1314. ref=None,
  1315. merge_heads=[self._root_commit],
  1316. )
  1317. new_shas = set(r.object_store) - old_shas
  1318. # New sha is added, but no new refs
  1319. self.assertEqual(1, len(new_shas))
  1320. new_commit = r[new_shas.pop()]
  1321. self.assertEqual(r[self._root_commit].tree, new_commit.tree)
  1322. self.assertEqual([self._root_commit], r[commit_sha].parents)
  1323. self.assertEqual(old_refs, r.get_refs())
  1324. def test_stage_absolute(self) -> None:
  1325. r = self._repo
  1326. os.remove(os.path.join(r.path, "a"))
  1327. self.assertRaises(ValueError, r.stage, [os.path.join(r.path, "a")])
  1328. def test_stage_deleted(self) -> None:
  1329. r = self._repo
  1330. os.remove(os.path.join(r.path, "a"))
  1331. r.stage(["a"])
  1332. r.stage(["a"]) # double-stage a deleted path
  1333. self.assertEqual([], list(r.open_index()))
  1334. def test_stage_directory(self) -> None:
  1335. r = self._repo
  1336. os.mkdir(os.path.join(r.path, "c"))
  1337. r.stage(["c"])
  1338. self.assertEqual([b"a"], list(r.open_index()))
  1339. def test_stage_submodule(self) -> None:
  1340. r = self._repo
  1341. s = Repo.init(os.path.join(r.path, "sub"), mkdir=True)
  1342. s.do_commit(b"message")
  1343. r.stage(["sub"])
  1344. self.assertEqual([b"a", b"sub"], list(r.open_index()))
  1345. def test_unstage_midify_file_with_dir(self) -> None:
  1346. os.mkdir(os.path.join(self._repo.path, "new_dir"))
  1347. full_path = os.path.join(self._repo.path, "new_dir", "foo")
  1348. with open(full_path, "w") as f:
  1349. f.write("hello")
  1350. porcelain.add(self._repo, paths=[full_path])
  1351. porcelain.commit(
  1352. self._repo,
  1353. message=b"unitest",
  1354. committer=b"Jane <jane@example.com>",
  1355. author=b"John <john@example.com>",
  1356. )
  1357. with open(full_path, "a") as f:
  1358. f.write("something new")
  1359. self._repo.unstage(["new_dir/foo"])
  1360. status = list(porcelain.status(self._repo))
  1361. self.assertEqual(
  1362. [{"add": [], "delete": [], "modify": []}, [b"new_dir/foo"], []], status
  1363. )
  1364. def test_unstage_while_no_commit(self) -> None:
  1365. file = "foo"
  1366. full_path = os.path.join(self._repo.path, file)
  1367. with open(full_path, "w") as f:
  1368. f.write("hello")
  1369. porcelain.add(self._repo, paths=[full_path])
  1370. self._repo.unstage([file])
  1371. status = list(porcelain.status(self._repo))
  1372. self.assertEqual([{"add": [], "delete": [], "modify": []}, [], ["foo"]], status)
  1373. def test_unstage_add_file(self) -> None:
  1374. file = "foo"
  1375. full_path = os.path.join(self._repo.path, file)
  1376. porcelain.commit(
  1377. self._repo,
  1378. message=b"unitest",
  1379. committer=b"Jane <jane@example.com>",
  1380. author=b"John <john@example.com>",
  1381. )
  1382. with open(full_path, "w") as f:
  1383. f.write("hello")
  1384. porcelain.add(self._repo, paths=[full_path])
  1385. self._repo.unstage([file])
  1386. status = list(porcelain.status(self._repo))
  1387. self.assertEqual([{"add": [], "delete": [], "modify": []}, [], ["foo"]], status)
  1388. def test_unstage_modify_file(self) -> None:
  1389. file = "foo"
  1390. full_path = os.path.join(self._repo.path, file)
  1391. with open(full_path, "w") as f:
  1392. f.write("hello")
  1393. porcelain.add(self._repo, paths=[full_path])
  1394. porcelain.commit(
  1395. self._repo,
  1396. message=b"unitest",
  1397. committer=b"Jane <jane@example.com>",
  1398. author=b"John <john@example.com>",
  1399. )
  1400. with open(full_path, "a") as f:
  1401. f.write("broken")
  1402. porcelain.add(self._repo, paths=[full_path])
  1403. self._repo.unstage([file])
  1404. status = list(porcelain.status(self._repo))
  1405. self.assertEqual(
  1406. [{"add": [], "delete": [], "modify": []}, [b"foo"], []], status
  1407. )
  1408. def test_unstage_remove_file(self) -> None:
  1409. file = "foo"
  1410. full_path = os.path.join(self._repo.path, file)
  1411. with open(full_path, "w") as f:
  1412. f.write("hello")
  1413. porcelain.add(self._repo, paths=[full_path])
  1414. porcelain.commit(
  1415. self._repo,
  1416. message=b"unitest",
  1417. committer=b"Jane <jane@example.com>",
  1418. author=b"John <john@example.com>",
  1419. )
  1420. os.remove(full_path)
  1421. self._repo.unstage([file])
  1422. status = list(porcelain.status(self._repo))
  1423. self.assertEqual(
  1424. [{"add": [], "delete": [], "modify": []}, [b"foo"], []], status
  1425. )
  1426. def test_reset_index(self) -> None:
  1427. r = self._repo
  1428. with open(os.path.join(r.path, "a"), "wb") as f:
  1429. f.write(b"changed")
  1430. with open(os.path.join(r.path, "b"), "wb") as f:
  1431. f.write(b"added")
  1432. r.stage(["a", "b"])
  1433. status = list(porcelain.status(self._repo))
  1434. self.assertEqual(
  1435. [{"add": [b"b"], "delete": [], "modify": [b"a"]}, [], []], status
  1436. )
  1437. r.reset_index()
  1438. status = list(porcelain.status(self._repo))
  1439. self.assertEqual([{"add": [], "delete": [], "modify": []}, [], ["b"]], status)
  1440. @skipIf(
  1441. sys.platform in ("win32", "darwin"),
  1442. "tries to implicitly decode as utf8",
  1443. )
  1444. def test_commit_no_encode_decode(self) -> None:
  1445. r = self._repo
  1446. repo_path_bytes = os.fsencode(r.path)
  1447. encodings = ("utf8", "latin1")
  1448. names = ["À".encode(encoding) for encoding in encodings]
  1449. for name, encoding in zip(names, encodings):
  1450. full_path = os.path.join(repo_path_bytes, name)
  1451. with open(full_path, "wb") as f:
  1452. f.write(encoding.encode("ascii"))
  1453. # These files are break tear_down_repo, so cleanup these files
  1454. # ourselves.
  1455. self.addCleanup(os.remove, full_path)
  1456. r.stage(names)
  1457. commit_sha = r.do_commit(
  1458. b"Files with different encodings",
  1459. committer=b"Test Committer <test@nodomain.com>",
  1460. author=b"Test Author <test@nodomain.com>",
  1461. commit_timestamp=12395,
  1462. commit_timezone=0,
  1463. author_timestamp=12395,
  1464. author_timezone=0,
  1465. ref=None,
  1466. merge_heads=[self._root_commit],
  1467. )
  1468. for name, encoding in zip(names, encodings):
  1469. mode, id = tree_lookup_path(r.get_object, r[commit_sha].tree, name)
  1470. self.assertEqual(stat.S_IFREG | 0o644, mode)
  1471. self.assertEqual(encoding.encode("ascii"), r[id].data)
  1472. def test_discover_intended(self) -> None:
  1473. path = os.path.join(self._repo_dir, "b/c")
  1474. r = Repo.discover(path)
  1475. self.assertEqual(r.head(), self._repo.head())
  1476. def test_discover_isrepo(self) -> None:
  1477. r = Repo.discover(self._repo_dir)
  1478. self.assertEqual(r.head(), self._repo.head())
  1479. def test_discover_notrepo(self) -> None:
  1480. with self.assertRaises(NotGitRepository):
  1481. Repo.discover("/")
  1482. class CheckUserIdentityTests(TestCase):
  1483. def test_valid(self) -> None:
  1484. check_user_identity(b"Me <me@example.com>")
  1485. def test_invalid(self) -> None:
  1486. self.assertRaises(InvalidUserIdentity, check_user_identity, b"No Email")
  1487. self.assertRaises(
  1488. InvalidUserIdentity, check_user_identity, b"Fullname <missing"
  1489. )
  1490. self.assertRaises(
  1491. InvalidUserIdentity, check_user_identity, b"Fullname missing>"
  1492. )
  1493. self.assertRaises(
  1494. InvalidUserIdentity, check_user_identity, b"Fullname >order<>"
  1495. )
  1496. self.assertRaises(
  1497. InvalidUserIdentity, check_user_identity, b"Contains\0null byte <>"
  1498. )
  1499. self.assertRaises(
  1500. InvalidUserIdentity, check_user_identity, b"Contains\nnewline byte <>"
  1501. )
  1502. class RepoConfigIncludeIfTests(TestCase):
  1503. """Test includeIf functionality in repository config loading."""
  1504. def test_repo_config_includeif_gitdir(self) -> None:
  1505. """Test that includeIf gitdir conditions work when loading repo config."""
  1506. import tempfile
  1507. from dulwich.repo import Repo
  1508. with tempfile.TemporaryDirectory() as tmpdir:
  1509. # Create a repository
  1510. repo_path = os.path.join(tmpdir, "myrepo")
  1511. r = Repo.init(repo_path, mkdir=True)
  1512. # Use realpath to resolve any symlinks (important on macOS)
  1513. repo_path = os.path.realpath(repo_path)
  1514. # Create an included config file
  1515. included_path = os.path.join(tmpdir, "work.config")
  1516. with open(included_path, "wb") as f:
  1517. f.write(b"[user]\n email = work@example.com\n")
  1518. # Add includeIf to the repo config
  1519. config_path = os.path.join(repo_path, ".git", "config")
  1520. with open(config_path, "ab") as f:
  1521. f.write(f'\n[includeIf "gitdir:{repo_path}/.git/"]\n'.encode())
  1522. escaped_path = included_path.replace("\\", "\\\\")
  1523. f.write(f" path = {escaped_path}\n".encode())
  1524. # Close and reopen to reload config
  1525. r.close()
  1526. r = Repo(repo_path)
  1527. # Check if include was processed
  1528. config = r.get_config()
  1529. self.assertEqual(b"work@example.com", config.get((b"user",), b"email"))
  1530. r.close()
  1531. def test_repo_config_includeif_gitdir_pattern(self) -> None:
  1532. """Test includeIf gitdir pattern matching in repository config."""
  1533. import tempfile
  1534. from dulwich.repo import Repo
  1535. with tempfile.TemporaryDirectory() as tmpdir:
  1536. # Create a repository under "work" directory
  1537. work_dir = os.path.join(tmpdir, "work", "project1")
  1538. os.makedirs(os.path.dirname(work_dir), exist_ok=True)
  1539. r = Repo.init(work_dir, mkdir=True)
  1540. # Create an included config file
  1541. included_path = os.path.join(tmpdir, "work.config")
  1542. with open(included_path, "wb") as f:
  1543. f.write(b"[user]\n email = work@company.com\n")
  1544. # Add includeIf with pattern to the repo config
  1545. config_path = os.path.join(work_dir, ".git", "config")
  1546. with open(config_path, "ab") as f:
  1547. # Use a pattern that will match paths containing /work/
  1548. f.write(b'\n[includeIf "gitdir:**/work/**"]\n')
  1549. escaped_path = included_path.replace("\\", "\\\\")
  1550. f.write(f" path = {escaped_path}\n".encode())
  1551. # Close and reopen to reload config
  1552. r.close()
  1553. r = Repo(work_dir)
  1554. # Check if include was processed
  1555. config = r.get_config()
  1556. self.assertEqual(b"work@company.com", config.get((b"user",), b"email"))
  1557. r.close()
  1558. def test_repo_config_includeif_no_match(self) -> None:
  1559. """Test that includeIf doesn't include when condition doesn't match."""
  1560. import tempfile
  1561. from dulwich.repo import Repo
  1562. with tempfile.TemporaryDirectory() as tmpdir:
  1563. # Create a repository
  1564. repo_path = os.path.join(tmpdir, "personal", "project")
  1565. os.makedirs(os.path.dirname(repo_path), exist_ok=True)
  1566. r = Repo.init(repo_path, mkdir=True)
  1567. # Create an included config file
  1568. included_path = os.path.join(tmpdir, "work.config")
  1569. with open(included_path, "wb") as f:
  1570. f.write(b"[user]\n email = work@company.com\n")
  1571. # Add includeIf that won't match
  1572. config_path = os.path.join(repo_path, ".git", "config")
  1573. with open(config_path, "ab") as f:
  1574. f.write(b'\n[includeIf "gitdir:**/work/**"]\n')
  1575. escaped_path = included_path.replace("\\", "\\\\")
  1576. f.write(f" path = {escaped_path}\n".encode())
  1577. # Close and reopen to reload config
  1578. r.close()
  1579. r = Repo(repo_path)
  1580. # Check that include was NOT processed
  1581. config = r.get_config()
  1582. with self.assertRaises(KeyError):
  1583. config.get((b"user",), b"email")
  1584. r.close()
  1585. def test_bare_repo_config_includeif(self) -> None:
  1586. """Test includeIf in bare repository."""
  1587. import tempfile
  1588. from dulwich.repo import Repo
  1589. with tempfile.TemporaryDirectory() as tmpdir:
  1590. # Create a bare repository
  1591. repo_path = os.path.join(tmpdir, "bare.git")
  1592. r = Repo.init_bare(repo_path, mkdir=True)
  1593. # Use realpath to resolve any symlinks (important on macOS)
  1594. repo_path = os.path.realpath(repo_path)
  1595. # Create an included config file
  1596. included_path = os.path.join(tmpdir, "server.config")
  1597. with open(included_path, "wb") as f:
  1598. f.write(b"[receive]\n denyNonFastForwards = true\n")
  1599. # Add includeIf to the repo config
  1600. config_path = os.path.join(repo_path, "config")
  1601. with open(config_path, "ab") as f:
  1602. f.write(f'\n[includeIf "gitdir:{repo_path}/"]\n'.encode())
  1603. escaped_path = included_path.replace("\\", "\\\\")
  1604. f.write(f" path = {escaped_path}\n".encode())
  1605. # Close and reopen to reload config
  1606. r.close()
  1607. r = Repo(repo_path)
  1608. # Check if include was processed
  1609. config = r.get_config()
  1610. self.assertEqual(b"true", config.get((b"receive",), b"denyNonFastForwards"))
  1611. r.close()
  1612. def test_repo_config_includeif_hasconfig(self) -> None:
  1613. """Test includeIf hasconfig conditions in repository config."""
  1614. import tempfile
  1615. from dulwich.repo import Repo
  1616. with tempfile.TemporaryDirectory() as tmpdir:
  1617. # Create a repository
  1618. repo_path = os.path.join(tmpdir, "myrepo")
  1619. r = Repo.init(repo_path, mkdir=True)
  1620. # Create an included config file
  1621. included_path = os.path.join(tmpdir, "work.config")
  1622. with open(included_path, "wb") as f:
  1623. f.write(b"[user]\n name = WorkUser\n")
  1624. # Add a remote and includeIf hasconfig to the repo config
  1625. config_path = os.path.join(repo_path, ".git", "config")
  1626. with open(config_path, "ab") as f:
  1627. f.write(b'\n[remote "origin"]\n')
  1628. f.write(b" url = ssh://org-work@github.com/company/project\n")
  1629. f.write(
  1630. b'[includeIf "hasconfig:remote.*.url:ssh://org-*@github.com/**"]\n'
  1631. )
  1632. escaped_path = included_path.replace("\\", "\\\\")
  1633. f.write(f" path = {escaped_path}\n".encode())
  1634. # Close and reopen to reload config
  1635. r.close()
  1636. r = Repo(repo_path)
  1637. # Check if include was processed
  1638. config = r.get_config()
  1639. self.assertEqual(b"WorkUser", config.get((b"user",), b"name"))
  1640. r.close()
  1641. def test_repo_config_includeif_onbranch(self) -> None:
  1642. """Test includeIf onbranch conditions in repository config."""
  1643. import tempfile
  1644. from dulwich.repo import Repo
  1645. with tempfile.TemporaryDirectory() as tmpdir:
  1646. # Create a repository
  1647. repo_path = os.path.join(tmpdir, "myrepo")
  1648. r = Repo.init(repo_path, mkdir=True)
  1649. # Create HEAD pointing to main branch
  1650. refs_heads_dir = os.path.join(repo_path, ".git", "refs", "heads")
  1651. os.makedirs(refs_heads_dir, exist_ok=True)
  1652. main_ref_path = os.path.join(refs_heads_dir, "main")
  1653. with open(main_ref_path, "wb") as f:
  1654. f.write(b"0123456789012345678901234567890123456789\n")
  1655. head_path = os.path.join(repo_path, ".git", "HEAD")
  1656. with open(head_path, "wb") as f:
  1657. f.write(b"ref: refs/heads/main\n")
  1658. # Create an included config file
  1659. included_path = os.path.join(tmpdir, "main.config")
  1660. with open(included_path, "wb") as f:
  1661. f.write(b"[core]\n autocrlf = true\n")
  1662. # Add includeIf onbranch to the repo config
  1663. config_path = os.path.join(repo_path, ".git", "config")
  1664. with open(config_path, "ab") as f:
  1665. f.write(b'\n[includeIf "onbranch:main"]\n')
  1666. escaped_path = included_path.replace("\\", "\\\\")
  1667. f.write(f" path = {escaped_path}\n".encode())
  1668. # Close and reopen to reload config
  1669. r.close()
  1670. r = Repo(repo_path)
  1671. # Check if include was processed
  1672. config = r.get_config()
  1673. self.assertEqual(b"true", config.get((b"core",), b"autocrlf"))
  1674. r.close()