test_refs.py 55 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452
  1. # test_refs.py -- tests for refs.py
  2. # Copyright (C) 2013 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for dulwich.refs."""
  22. import os
  23. import sys
  24. import tempfile
  25. from io import BytesIO
  26. from typing import ClassVar
  27. from dulwich import errors
  28. from dulwich.file import GitFile
  29. from dulwich.objects import ZERO_SHA
  30. from dulwich.refs import (
  31. DictRefsContainer,
  32. InfoRefsContainer,
  33. NamespacedRefsContainer,
  34. SymrefLoop,
  35. _split_ref_line,
  36. check_ref_format,
  37. is_per_worktree_ref,
  38. parse_remote_ref,
  39. parse_symref_value,
  40. read_packed_refs,
  41. read_packed_refs_with_peeled,
  42. shorten_ref_name,
  43. split_peeled_refs,
  44. strip_peeled_refs,
  45. write_packed_refs,
  46. )
  47. from dulwich.repo import Repo
  48. from dulwich.tests.utils import open_repo, tear_down_repo
  49. from dulwich.worktree import add_worktree
  50. from . import SkipTest, TestCase
  51. class CheckRefFormatTests(TestCase):
  52. """Tests for the check_ref_format function.
  53. These are the same tests as in the git test suite.
  54. """
  55. def test_valid(self) -> None:
  56. self.assertTrue(check_ref_format(b"heads/foo"))
  57. self.assertTrue(check_ref_format(b"foo/bar/baz"))
  58. self.assertTrue(check_ref_format(b"refs///heads/foo"))
  59. self.assertTrue(check_ref_format(b"foo./bar"))
  60. self.assertTrue(check_ref_format(b"heads/foo@bar"))
  61. self.assertTrue(check_ref_format(b"heads/fix.lock.error"))
  62. def test_invalid(self) -> None:
  63. self.assertFalse(check_ref_format(b"foo"))
  64. self.assertFalse(check_ref_format(b"heads/foo/"))
  65. self.assertFalse(check_ref_format(b"./foo"))
  66. self.assertFalse(check_ref_format(b".refs/foo"))
  67. self.assertFalse(check_ref_format(b"heads/foo..bar"))
  68. self.assertFalse(check_ref_format(b"heads/foo?bar"))
  69. self.assertFalse(check_ref_format(b"heads/foo.lock"))
  70. self.assertFalse(check_ref_format(b"heads/v@{ation"))
  71. self.assertFalse(check_ref_format(b"heads/foo\bar"))
  72. ONES = b"1" * 40
  73. TWOS = b"2" * 40
  74. THREES = b"3" * 40
  75. FOURS = b"4" * 40
  76. class PackedRefsFileTests(TestCase):
  77. def test_split_ref_line_errors(self) -> None:
  78. self.assertRaises(errors.PackedRefsException, _split_ref_line, b"singlefield")
  79. self.assertRaises(errors.PackedRefsException, _split_ref_line, b"badsha name")
  80. self.assertRaises(
  81. errors.PackedRefsException,
  82. _split_ref_line,
  83. ONES + b" bad/../refname",
  84. )
  85. def test_read_without_peeled(self) -> None:
  86. f = BytesIO(b"\n".join([b"# comment", ONES + b" ref/1", TWOS + b" ref/2"]))
  87. self.assertEqual(
  88. [(ONES, b"ref/1"), (TWOS, b"ref/2")], list(read_packed_refs(f))
  89. )
  90. def test_read_without_peeled_errors(self) -> None:
  91. f = BytesIO(b"\n".join([ONES + b" ref/1", b"^" + TWOS]))
  92. self.assertRaises(errors.PackedRefsException, list, read_packed_refs(f))
  93. def test_read_with_peeled(self) -> None:
  94. f = BytesIO(
  95. b"\n".join(
  96. [
  97. ONES + b" ref/1",
  98. TWOS + b" ref/2",
  99. b"^" + THREES,
  100. FOURS + b" ref/4",
  101. ]
  102. )
  103. )
  104. self.assertEqual(
  105. [
  106. (ONES, b"ref/1", None),
  107. (TWOS, b"ref/2", THREES),
  108. (FOURS, b"ref/4", None),
  109. ],
  110. list(read_packed_refs_with_peeled(f)),
  111. )
  112. def test_read_with_peeled_errors(self) -> None:
  113. f = BytesIO(b"\n".join([b"^" + TWOS, ONES + b" ref/1"]))
  114. self.assertRaises(errors.PackedRefsException, list, read_packed_refs(f))
  115. f = BytesIO(b"\n".join([ONES + b" ref/1", b"^" + TWOS, b"^" + THREES]))
  116. self.assertRaises(errors.PackedRefsException, list, read_packed_refs(f))
  117. def test_write_with_peeled(self) -> None:
  118. f = BytesIO()
  119. write_packed_refs(f, {b"ref/1": ONES, b"ref/2": TWOS}, {b"ref/1": THREES})
  120. self.assertEqual(
  121. b"\n".join(
  122. [
  123. b"# pack-refs with: peeled",
  124. ONES + b" ref/1",
  125. b"^" + THREES,
  126. TWOS + b" ref/2",
  127. ]
  128. )
  129. + b"\n",
  130. f.getvalue(),
  131. )
  132. def test_write_without_peeled(self) -> None:
  133. f = BytesIO()
  134. write_packed_refs(f, {b"ref/1": ONES, b"ref/2": TWOS})
  135. self.assertEqual(
  136. b"\n".join([ONES + b" ref/1", TWOS + b" ref/2"]) + b"\n",
  137. f.getvalue(),
  138. )
  139. # Dict of refs that we expect all RefsContainerTests subclasses to define.
  140. _TEST_REFS = {
  141. b"HEAD": b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  142. b"refs/heads/40-char-ref-aaaaaaaaaaaaaaaaaa": b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  143. b"refs/heads/master": b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  144. b"refs/heads/packed": b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  145. b"refs/tags/refs-0.1": b"df6800012397fb85c56e7418dd4eb9405dee075c",
  146. b"refs/tags/refs-0.2": b"3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8",
  147. b"refs/heads/loop": b"ref: refs/heads/loop",
  148. }
  149. class RefsContainerTests:
  150. def test_keys(self) -> None:
  151. actual_keys = set(self._refs.keys())
  152. self.assertEqual(set(self._refs.allkeys()), actual_keys)
  153. self.assertEqual(set(_TEST_REFS.keys()), actual_keys)
  154. actual_keys = self._refs.keys(b"refs/heads")
  155. actual_keys.discard(b"loop")
  156. self.assertEqual(
  157. [b"40-char-ref-aaaaaaaaaaaaaaaaaa", b"master", b"packed"],
  158. sorted(actual_keys),
  159. )
  160. self.assertEqual(
  161. [b"refs-0.1", b"refs-0.2"], sorted(self._refs.keys(b"refs/tags"))
  162. )
  163. def test_iter(self) -> None:
  164. actual_keys = set(self._refs.keys())
  165. self.assertEqual(set(self._refs), actual_keys)
  166. self.assertEqual(set(_TEST_REFS.keys()), actual_keys)
  167. def test_as_dict(self) -> None:
  168. # refs/heads/loop does not show up even if it exists
  169. expected_refs = dict(_TEST_REFS)
  170. del expected_refs[b"refs/heads/loop"]
  171. self.assertEqual(expected_refs, self._refs.as_dict())
  172. def test_get_symrefs(self) -> None:
  173. self._refs.set_symbolic_ref(b"refs/heads/src", b"refs/heads/dst")
  174. symrefs = self._refs.get_symrefs()
  175. if b"HEAD" in symrefs:
  176. symrefs.pop(b"HEAD")
  177. self.assertEqual(
  178. {
  179. b"refs/heads/src": b"refs/heads/dst",
  180. b"refs/heads/loop": b"refs/heads/loop",
  181. },
  182. symrefs,
  183. )
  184. def test_setitem(self) -> None:
  185. self._refs[b"refs/some/ref"] = b"42d06bd4b77fed026b154d16493e5deab78f02ec"
  186. self.assertEqual(
  187. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  188. self._refs[b"refs/some/ref"],
  189. )
  190. # should accept symref
  191. self._refs[b"refs/heads/symbolic"] = b"ref: refs/heads/master"
  192. self.assertEqual(
  193. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  194. self._refs[b"refs/heads/symbolic"],
  195. )
  196. # should not accept bad ref names
  197. self.assertRaises(
  198. errors.RefFormatError,
  199. self._refs.__setitem__,
  200. b"notrefs/foo",
  201. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  202. )
  203. # should not accept short sha
  204. self.assertRaises(
  205. ValueError,
  206. self._refs.__setitem__,
  207. b"refs/some/ref",
  208. b"42d06bd",
  209. )
  210. def test_set_if_equals(self) -> None:
  211. nines = b"9" * 40
  212. self.assertFalse(self._refs.set_if_equals(b"HEAD", b"c0ffee", nines))
  213. self.assertEqual(
  214. b"42d06bd4b77fed026b154d16493e5deab78f02ec", self._refs[b"HEAD"]
  215. )
  216. self.assertTrue(
  217. self._refs.set_if_equals(
  218. b"HEAD", b"42d06bd4b77fed026b154d16493e5deab78f02ec", nines
  219. )
  220. )
  221. self.assertEqual(nines, self._refs[b"HEAD"])
  222. # Setting the ref again is a no-op, but will return True.
  223. self.assertTrue(self._refs.set_if_equals(b"HEAD", nines, nines))
  224. self.assertEqual(nines, self._refs[b"HEAD"])
  225. self.assertTrue(self._refs.set_if_equals(b"refs/heads/master", None, nines))
  226. self.assertEqual(nines, self._refs[b"refs/heads/master"])
  227. self.assertTrue(
  228. self._refs.set_if_equals(b"refs/heads/nonexistent", ZERO_SHA, nines)
  229. )
  230. self.assertEqual(nines, self._refs[b"refs/heads/nonexistent"])
  231. def test_add_if_new(self) -> None:
  232. nines = b"9" * 40
  233. self.assertFalse(self._refs.add_if_new(b"refs/heads/master", nines))
  234. self.assertEqual(
  235. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  236. self._refs[b"refs/heads/master"],
  237. )
  238. self.assertTrue(self._refs.add_if_new(b"refs/some/ref", nines))
  239. self.assertEqual(nines, self._refs[b"refs/some/ref"])
  240. def test_set_symbolic_ref(self) -> None:
  241. self._refs.set_symbolic_ref(b"refs/heads/symbolic", b"refs/heads/master")
  242. self.assertEqual(
  243. b"ref: refs/heads/master",
  244. self._refs.read_loose_ref(b"refs/heads/symbolic"),
  245. )
  246. self.assertEqual(
  247. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  248. self._refs[b"refs/heads/symbolic"],
  249. )
  250. def test_set_symbolic_ref_overwrite(self) -> None:
  251. nines = b"9" * 40
  252. self.assertNotIn(b"refs/heads/symbolic", self._refs)
  253. self._refs[b"refs/heads/symbolic"] = nines
  254. self.assertEqual(nines, self._refs.read_loose_ref(b"refs/heads/symbolic"))
  255. self._refs.set_symbolic_ref(b"refs/heads/symbolic", b"refs/heads/master")
  256. self.assertEqual(
  257. b"ref: refs/heads/master",
  258. self._refs.read_loose_ref(b"refs/heads/symbolic"),
  259. )
  260. self.assertEqual(
  261. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  262. self._refs[b"refs/heads/symbolic"],
  263. )
  264. def test_check_refname(self) -> None:
  265. self._refs._check_refname(b"HEAD")
  266. self._refs._check_refname(b"refs/stash")
  267. self._refs._check_refname(b"refs/heads/foo")
  268. self.assertRaises(errors.RefFormatError, self._refs._check_refname, b"refs")
  269. self.assertRaises(
  270. errors.RefFormatError, self._refs._check_refname, b"notrefs/foo"
  271. )
  272. def test_contains(self) -> None:
  273. self.assertIn(b"refs/heads/master", self._refs)
  274. self.assertNotIn(b"refs/heads/bar", self._refs)
  275. def test_delitem(self) -> None:
  276. self.assertEqual(
  277. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  278. self._refs[b"refs/heads/master"],
  279. )
  280. del self._refs[b"refs/heads/master"]
  281. self.assertRaises(KeyError, lambda: self._refs[b"refs/heads/master"])
  282. def test_remove_if_equals(self) -> None:
  283. self.assertFalse(self._refs.remove_if_equals(b"HEAD", b"c0ffee"))
  284. self.assertEqual(
  285. b"42d06bd4b77fed026b154d16493e5deab78f02ec", self._refs[b"HEAD"]
  286. )
  287. self.assertTrue(
  288. self._refs.remove_if_equals(
  289. b"refs/tags/refs-0.2",
  290. b"3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8",
  291. )
  292. )
  293. self.assertTrue(self._refs.remove_if_equals(b"refs/tags/refs-0.2", ZERO_SHA))
  294. self.assertNotIn(b"refs/tags/refs-0.2", self._refs)
  295. def test_import_refs_name(self) -> None:
  296. self._refs[b"refs/remotes/origin/other"] = (
  297. b"48d01bd4b77fed026b154d16493e5deab78f02ec"
  298. )
  299. self._refs.import_refs(
  300. b"refs/remotes/origin",
  301. {b"master": b"42d06bd4b77fed026b154d16493e5deab78f02ec"},
  302. )
  303. self.assertEqual(
  304. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  305. self._refs[b"refs/remotes/origin/master"],
  306. )
  307. self.assertEqual(
  308. b"48d01bd4b77fed026b154d16493e5deab78f02ec",
  309. self._refs[b"refs/remotes/origin/other"],
  310. )
  311. def test_import_refs_name_prune(self) -> None:
  312. self._refs[b"refs/remotes/origin/other"] = (
  313. b"48d01bd4b77fed026b154d16493e5deab78f02ec"
  314. )
  315. self._refs.import_refs(
  316. b"refs/remotes/origin",
  317. {b"master": b"42d06bd4b77fed026b154d16493e5deab78f02ec"},
  318. prune=True,
  319. )
  320. self.assertEqual(
  321. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  322. self._refs[b"refs/remotes/origin/master"],
  323. )
  324. self.assertNotIn(b"refs/remotes/origin/other", self._refs)
  325. class DictRefsContainerTests(RefsContainerTests, TestCase):
  326. def setUp(self) -> None:
  327. TestCase.setUp(self)
  328. self._refs = DictRefsContainer(dict(_TEST_REFS))
  329. def test_invalid_refname(self) -> None:
  330. # FIXME: Move this test into RefsContainerTests, but requires
  331. # some way of injecting invalid refs.
  332. self._refs._refs[b"refs/stash"] = b"00" * 20
  333. expected_refs = dict(_TEST_REFS)
  334. del expected_refs[b"refs/heads/loop"]
  335. expected_refs[b"refs/stash"] = b"00" * 20
  336. self.assertEqual(expected_refs, self._refs.as_dict())
  337. def test_set_if_equals_with_symbolic_ref(self) -> None:
  338. # Test that set_if_equals only updates the requested ref,
  339. # not all refs in a symbolic reference chain
  340. # The bug in the original implementation was that when follow()
  341. # was called on a ref, it would return all refs in the chain,
  342. # and set_if_equals would update ALL of them instead of just the
  343. # requested ref.
  344. # Set up refs
  345. master_sha = b"1" * 40
  346. feature_sha = b"2" * 40
  347. new_sha = b"3" * 40
  348. self._refs[b"refs/heads/master"] = master_sha
  349. self._refs[b"refs/heads/feature"] = feature_sha
  350. # Create a second symbolic ref pointing to feature
  351. self._refs.set_symbolic_ref(b"refs/heads/other", b"refs/heads/feature")
  352. # Update refs/heads/other through set_if_equals
  353. # With the bug, this would update BOTH refs/heads/other AND refs/heads/feature
  354. # Without the bug, only refs/heads/other should be updated
  355. # Note: old_ref needs to be the actual stored value (the symref)
  356. self.assertTrue(
  357. self._refs.set_if_equals(
  358. b"refs/heads/other", b"ref: refs/heads/feature", new_sha
  359. )
  360. )
  361. # refs/heads/other should now directly point to new_sha
  362. self.assertEqual(self._refs.read_ref(b"refs/heads/other"), new_sha)
  363. # refs/heads/feature should remain unchanged
  364. # With the bug, refs/heads/feature would also be incorrectly updated to new_sha
  365. self.assertEqual(self._refs[b"refs/heads/feature"], feature_sha)
  366. self.assertEqual(self._refs[b"refs/heads/master"], master_sha)
  367. class DiskRefsContainerTests(RefsContainerTests, TestCase):
  368. def setUp(self) -> None:
  369. TestCase.setUp(self)
  370. self._repo = open_repo("refs.git")
  371. self.addCleanup(tear_down_repo, self._repo)
  372. self._refs = self._repo.refs
  373. def test_get_packed_refs(self) -> None:
  374. self.assertEqual(
  375. {
  376. b"refs/heads/packed": b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  377. b"refs/tags/refs-0.1": b"df6800012397fb85c56e7418dd4eb9405dee075c",
  378. },
  379. self._refs.get_packed_refs(),
  380. )
  381. def test_get_peeled_not_packed(self) -> None:
  382. # not packed
  383. self.assertEqual(None, self._refs.get_peeled(b"refs/tags/refs-0.2"))
  384. self.assertEqual(
  385. b"3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8",
  386. self._refs[b"refs/tags/refs-0.2"],
  387. )
  388. # packed, known not peelable
  389. self.assertEqual(
  390. self._refs[b"refs/heads/packed"],
  391. self._refs.get_peeled(b"refs/heads/packed"),
  392. )
  393. # packed, peeled
  394. self.assertEqual(
  395. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  396. self._refs.get_peeled(b"refs/tags/refs-0.1"),
  397. )
  398. def test_setitem(self) -> None:
  399. RefsContainerTests.test_setitem(self)
  400. path = os.path.join(self._refs.path, b"refs", b"some", b"ref")
  401. with open(path, "rb") as f:
  402. self.assertEqual(b"42d06bd4b77fed026b154d16493e5deab78f02ec", f.read()[:40])
  403. self.assertRaises(
  404. OSError,
  405. self._refs.__setitem__,
  406. b"refs/some/ref/sub",
  407. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  408. )
  409. def test_delete_refs_container(self) -> None:
  410. # We shouldn't delete the refs directory
  411. self._refs[b"refs/heads/blah"] = b"42d06bd4b77fed026b154d16493e5deab78f02ec"
  412. for ref in self._refs.allkeys():
  413. del self._refs[ref]
  414. self.assertTrue(os.path.exists(os.path.join(self._refs.path, b"refs")))
  415. def test_setitem_packed(self) -> None:
  416. with open(os.path.join(self._refs.path, b"packed-refs"), "w") as f:
  417. f.write("# pack-refs with: peeled fully-peeled sorted \n")
  418. f.write("42d06bd4b77fed026b154d16493e5deab78f02ec refs/heads/packed\n")
  419. # It's allowed to set a new ref on a packed ref, the new ref will be
  420. # placed outside on refs/
  421. self._refs[b"refs/heads/packed"] = b"3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8"
  422. packed_ref_path = os.path.join(self._refs.path, b"refs", b"heads", b"packed")
  423. with open(packed_ref_path, "rb") as f:
  424. self.assertEqual(b"3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8", f.read()[:40])
  425. self.assertRaises(
  426. OSError,
  427. self._refs.__setitem__,
  428. b"refs/heads/packed/sub",
  429. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  430. )
  431. # this shouldn't overwrite the packed refs
  432. self.assertEqual(
  433. {b"refs/heads/packed": b"42d06bd4b77fed026b154d16493e5deab78f02ec"},
  434. self._refs.get_packed_refs(),
  435. )
  436. def test_add_packed_refs(self) -> None:
  437. # first, create a non-packed ref
  438. self._refs[b"refs/heads/packed"] = b"3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8"
  439. packed_ref_path = os.path.join(self._refs.path, b"refs", b"heads", b"packed")
  440. self.assertTrue(os.path.exists(packed_ref_path))
  441. # now overwrite that with a packed ref
  442. packed_refs_file_path = os.path.join(self._refs.path, b"packed-refs")
  443. self._refs.add_packed_refs(
  444. {
  445. b"refs/heads/packed": b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  446. }
  447. )
  448. # that should kill the file
  449. self.assertFalse(os.path.exists(packed_ref_path))
  450. # now delete the packed ref
  451. self._refs.add_packed_refs(
  452. {
  453. b"refs/heads/packed": None,
  454. }
  455. )
  456. # and it's gone!
  457. self.assertFalse(os.path.exists(packed_ref_path))
  458. self.assertRaises(
  459. KeyError,
  460. self._refs.__getitem__,
  461. b"refs/heads/packed",
  462. )
  463. # just in case, make sure we can't pack HEAD
  464. self.assertRaises(
  465. ValueError,
  466. self._refs.add_packed_refs,
  467. {b"HEAD": "02ac81614bcdbd585a37b4b0edf8cb8a"},
  468. )
  469. # delete all packed refs
  470. self._refs.add_packed_refs({ref: None for ref in self._refs.get_packed_refs()})
  471. self.assertEqual({}, self._refs.get_packed_refs())
  472. # remove the packed ref file, and check that adding nothing doesn't affect that
  473. os.remove(packed_refs_file_path)
  474. # adding nothing doesn't make it reappear
  475. self._refs.add_packed_refs({})
  476. self.assertFalse(os.path.exists(packed_refs_file_path))
  477. def test_setitem_symbolic(self) -> None:
  478. ones = b"1" * 40
  479. self._refs[b"HEAD"] = ones
  480. self.assertEqual(ones, self._refs[b"HEAD"])
  481. # ensure HEAD was not modified
  482. f = open(os.path.join(self._refs.path, b"HEAD"), "rb")
  483. v = next(iter(f)).rstrip(b"\n\r")
  484. f.close()
  485. self.assertEqual(b"ref: refs/heads/master", v)
  486. # ensure the symbolic link was written through
  487. f = open(os.path.join(self._refs.path, b"refs", b"heads", b"master"), "rb")
  488. self.assertEqual(ones, f.read()[:40])
  489. f.close()
  490. def test_set_if_equals(self) -> None:
  491. RefsContainerTests.test_set_if_equals(self)
  492. # ensure symref was followed
  493. self.assertEqual(b"9" * 40, self._refs[b"refs/heads/master"])
  494. # ensure lockfile was deleted
  495. self.assertFalse(
  496. os.path.exists(
  497. os.path.join(self._refs.path, b"refs", b"heads", b"master.lock")
  498. )
  499. )
  500. self.assertFalse(os.path.exists(os.path.join(self._refs.path, b"HEAD.lock")))
  501. def test_add_if_new_packed(self) -> None:
  502. # don't overwrite packed ref
  503. self.assertFalse(self._refs.add_if_new(b"refs/tags/refs-0.1", b"9" * 40))
  504. self.assertEqual(
  505. b"df6800012397fb85c56e7418dd4eb9405dee075c",
  506. self._refs[b"refs/tags/refs-0.1"],
  507. )
  508. def test_add_if_new_symbolic(self) -> None:
  509. # Use an empty repo instead of the default.
  510. repo_dir = os.path.join(tempfile.mkdtemp(), "test")
  511. os.makedirs(repo_dir)
  512. repo = Repo.init(repo_dir)
  513. self.addCleanup(tear_down_repo, repo)
  514. refs = repo.refs
  515. nines = b"9" * 40
  516. self.assertEqual(b"ref: refs/heads/master", refs.read_ref(b"HEAD"))
  517. self.assertNotIn(b"refs/heads/master", refs)
  518. self.assertTrue(refs.add_if_new(b"HEAD", nines))
  519. self.assertEqual(b"ref: refs/heads/master", refs.read_ref(b"HEAD"))
  520. self.assertEqual(nines, refs[b"HEAD"])
  521. self.assertEqual(nines, refs[b"refs/heads/master"])
  522. self.assertFalse(refs.add_if_new(b"HEAD", b"1" * 40))
  523. self.assertEqual(nines, refs[b"HEAD"])
  524. self.assertEqual(nines, refs[b"refs/heads/master"])
  525. def test_follow(self) -> None:
  526. self.assertEqual(
  527. (
  528. [b"HEAD", b"refs/heads/master"],
  529. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  530. ),
  531. self._refs.follow(b"HEAD"),
  532. )
  533. self.assertEqual(
  534. (
  535. [b"refs/heads/master"],
  536. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  537. ),
  538. self._refs.follow(b"refs/heads/master"),
  539. )
  540. self.assertRaises(SymrefLoop, self._refs.follow, b"refs/heads/loop")
  541. def test_set_overwrite_loop(self) -> None:
  542. self.assertRaises(SymrefLoop, self._refs.follow, b"refs/heads/loop")
  543. self._refs[b"refs/heads/loop"] = b"42d06bd4b77fed026b154d16493e5deab78f02ec"
  544. self.assertEqual(
  545. ([b"refs/heads/loop"], b"42d06bd4b77fed026b154d16493e5deab78f02ec"),
  546. self._refs.follow(b"refs/heads/loop"),
  547. )
  548. def test_delitem(self) -> None:
  549. RefsContainerTests.test_delitem(self)
  550. ref_file = os.path.join(self._refs.path, b"refs", b"heads", b"master")
  551. self.assertFalse(os.path.exists(ref_file))
  552. self.assertNotIn(b"refs/heads/master", self._refs.get_packed_refs())
  553. def test_delitem_symbolic(self) -> None:
  554. self.assertEqual(b"ref: refs/heads/master", self._refs.read_loose_ref(b"HEAD"))
  555. del self._refs[b"HEAD"]
  556. self.assertRaises(KeyError, lambda: self._refs[b"HEAD"])
  557. self.assertEqual(
  558. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  559. self._refs[b"refs/heads/master"],
  560. )
  561. self.assertFalse(os.path.exists(os.path.join(self._refs.path, b"HEAD")))
  562. def test_remove_if_equals_symref(self) -> None:
  563. # HEAD is a symref, so shouldn't equal its dereferenced value
  564. self.assertFalse(
  565. self._refs.remove_if_equals(
  566. b"HEAD", b"42d06bd4b77fed026b154d16493e5deab78f02ec"
  567. )
  568. )
  569. self.assertTrue(
  570. self._refs.remove_if_equals(
  571. b"refs/heads/master",
  572. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  573. )
  574. )
  575. self.assertRaises(KeyError, lambda: self._refs[b"refs/heads/master"])
  576. # HEAD is now a broken symref
  577. self.assertRaises(KeyError, lambda: self._refs[b"HEAD"])
  578. self.assertEqual(b"ref: refs/heads/master", self._refs.read_loose_ref(b"HEAD"))
  579. self.assertFalse(
  580. os.path.exists(
  581. os.path.join(self._refs.path, b"refs", b"heads", b"master.lock")
  582. )
  583. )
  584. self.assertFalse(os.path.exists(os.path.join(self._refs.path, b"HEAD.lock")))
  585. def test_remove_packed_without_peeled(self) -> None:
  586. refs_file = os.path.join(self._repo.path, "packed-refs")
  587. f = GitFile(refs_file)
  588. refs_data = f.read()
  589. f.close()
  590. f = GitFile(refs_file, "wb")
  591. f.write(
  592. b"\n".join(
  593. line
  594. for line in refs_data.split(b"\n")
  595. if not line or line[0] not in b"#^"
  596. )
  597. )
  598. f.close()
  599. self._repo = Repo(self._repo.path)
  600. refs = self._repo.refs
  601. self.assertTrue(
  602. refs.remove_if_equals(
  603. b"refs/heads/packed",
  604. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  605. )
  606. )
  607. def test_remove_if_equals_packed(self) -> None:
  608. # test removing ref that is only packed
  609. self.assertEqual(
  610. b"df6800012397fb85c56e7418dd4eb9405dee075c",
  611. self._refs[b"refs/tags/refs-0.1"],
  612. )
  613. self.assertTrue(
  614. self._refs.remove_if_equals(
  615. b"refs/tags/refs-0.1",
  616. b"df6800012397fb85c56e7418dd4eb9405dee075c",
  617. )
  618. )
  619. self.assertRaises(KeyError, lambda: self._refs[b"refs/tags/refs-0.1"])
  620. def test_remove_parent(self) -> None:
  621. self._refs[b"refs/heads/foo/bar"] = b"df6800012397fb85c56e7418dd4eb9405dee075c"
  622. del self._refs[b"refs/heads/foo/bar"]
  623. ref_file = os.path.join(
  624. self._refs.path,
  625. b"refs",
  626. b"heads",
  627. b"foo",
  628. b"bar",
  629. )
  630. self.assertFalse(os.path.exists(ref_file))
  631. ref_file = os.path.join(self._refs.path, b"refs", b"heads", b"foo")
  632. self.assertFalse(os.path.exists(ref_file))
  633. ref_file = os.path.join(self._refs.path, b"refs", b"heads")
  634. self.assertTrue(os.path.exists(ref_file))
  635. self._refs[b"refs/heads/foo"] = b"df6800012397fb85c56e7418dd4eb9405dee075c"
  636. def test_read_ref(self) -> None:
  637. self.assertEqual(b"ref: refs/heads/master", self._refs.read_ref(b"HEAD"))
  638. self.assertEqual(
  639. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  640. self._refs.read_ref(b"refs/heads/packed"),
  641. )
  642. self.assertEqual(None, self._refs.read_ref(b"nonexistent"))
  643. def test_read_loose_ref(self) -> None:
  644. self._refs[b"refs/heads/foo"] = b"df6800012397fb85c56e7418dd4eb9405dee075c"
  645. self.assertEqual(None, self._refs.read_ref(b"refs/heads/foo/bar"))
  646. def test_non_ascii(self) -> None:
  647. try:
  648. encoded_ref = os.fsencode("refs/tags/schön")
  649. except UnicodeEncodeError as exc:
  650. raise SkipTest(
  651. "filesystem encoding doesn't support special character"
  652. ) from exc
  653. p = os.path.join(os.fsencode(self._repo.path), encoded_ref)
  654. with open(p, "w") as f:
  655. f.write("00" * 20)
  656. expected_refs = dict(_TEST_REFS)
  657. expected_refs[encoded_ref] = b"00" * 20
  658. del expected_refs[b"refs/heads/loop"]
  659. self.assertEqual(expected_refs, self._repo.get_refs())
  660. def test_cyrillic(self) -> None:
  661. if sys.platform in ("darwin", "win32"):
  662. raise SkipTest("filesystem encoding doesn't support arbitrary bytes")
  663. # reported in https://github.com/dulwich/dulwich/issues/608
  664. name = b"\xcd\xee\xe2\xe0\xff\xe2\xe5\xf2\xea\xe01"
  665. encoded_ref = b"refs/heads/" + name
  666. with open(os.path.join(os.fsencode(self._repo.path), encoded_ref), "w") as f:
  667. f.write("00" * 20)
  668. expected_refs = set(_TEST_REFS.keys())
  669. expected_refs.add(encoded_ref)
  670. self.assertEqual(expected_refs, set(self._repo.refs.allkeys()))
  671. self.assertEqual(
  672. {r[len(b"refs/") :] for r in expected_refs if r.startswith(b"refs/")},
  673. set(self._repo.refs.subkeys(b"refs/")),
  674. )
  675. expected_refs.remove(b"refs/heads/loop")
  676. expected_refs.add(b"HEAD")
  677. self.assertEqual(expected_refs, set(self._repo.get_refs().keys()))
  678. def test_write_unchanged_ref_optimization(self):
  679. # Test that writing unchanged ref avoids fsync but still checks locks
  680. ref_name = b"refs/heads/unchanged"
  681. ref_value = b"a" * 40
  682. # Set initial ref value
  683. self._refs[ref_name] = ref_value
  684. # Test 1: Writing same value should succeed without changes
  685. result = self._refs.set_if_equals(ref_name, ref_value, ref_value)
  686. self.assertTrue(result)
  687. # Test 2: Writing same value with wrong old_ref should fail
  688. wrong_old = b"b" * 40
  689. result = self._refs.set_if_equals(ref_name, wrong_old, ref_value)
  690. self.assertFalse(result)
  691. # Test 3: Writing different value should update normally
  692. new_value = b"c" * 40
  693. result = self._refs.set_if_equals(ref_name, ref_value, new_value)
  694. self.assertTrue(result)
  695. self.assertEqual(new_value, self._refs[ref_name])
  696. def test_write_unchanged_ref_with_lock(self):
  697. # Test that file locking is still detected when ref unchanged
  698. from dulwich.file import FileLocked
  699. ref_name = b"refs/heads/locktest"
  700. ref_value = b"d" * 40
  701. # Set initial ref value
  702. self._refs[ref_name] = ref_value
  703. # Get the actual file path
  704. ref_file = os.path.join(os.fsencode(self._refs.path), ref_name)
  705. lock_file = ref_file + b".lock"
  706. # Create lock file to simulate another process holding lock
  707. with open(lock_file, "wb") as f:
  708. f.write(b"locked by another process")
  709. # Try to write same value - should raise FileLocked
  710. with self.assertRaises(FileLocked):
  711. self._refs[ref_name] = ref_value
  712. # Clean up lock file
  713. if os.path.exists(lock_file):
  714. os.unlink(lock_file)
  715. # Now it should work
  716. self._refs[ref_name] = ref_value
  717. class IsPerWorktreeRefsTests(TestCase):
  718. def test(self) -> None:
  719. cases = [
  720. (b"HEAD", True),
  721. (b"refs/bisect/good", True),
  722. (b"refs/worktree/foo", True),
  723. (b"refs/rewritten/onto", True),
  724. (b"refs/stash", False),
  725. (b"refs/heads/main", False),
  726. (b"refs/tags/v1.0", False),
  727. (b"refs/remotes/origin/main", False),
  728. (b"refs/custom/foo", False),
  729. (b"refs/replace/aaaaaa", False),
  730. ]
  731. for ref, expected in cases:
  732. with self.subTest(ref=ref, expected=expected):
  733. self.assertEqual(is_per_worktree_ref(ref), expected)
  734. class DiskRefsContainerWorktreeRefsTest(TestCase):
  735. def setUp(self) -> None:
  736. # Create temporary directories
  737. temp_dir = tempfile.mkdtemp()
  738. test_dir = os.path.join(temp_dir, "main")
  739. os.makedirs(test_dir)
  740. repo = Repo.init(test_dir, default_branch=b"main")
  741. main_worktree = repo.get_worktree()
  742. with open(os.path.join(test_dir, "test.txt"), "wb") as f:
  743. f.write(b"test content")
  744. main_worktree.stage(["test.txt"])
  745. self.first_commit = main_worktree.commit(message=b"Initial commit")
  746. worktree_dir = os.path.join(temp_dir, "worktree")
  747. wt_repo = add_worktree(repo, worktree_dir, branch="wt-main")
  748. linked_worktree = wt_repo.get_worktree()
  749. with open(os.path.join(test_dir, "test2.txt"), "wb") as f:
  750. f.write(b"test content")
  751. linked_worktree.stage(["test2.txt"])
  752. self.second_commit = linked_worktree.commit(message=b"second commit")
  753. self.refs = repo.refs
  754. self.wt_refs = wt_repo.refs
  755. def test_refpath(self) -> None:
  756. main_path = self.refs.path
  757. common = self.wt_refs.path
  758. wt_path = self.wt_refs.worktree_path
  759. cases = [
  760. (self.refs, b"HEAD", main_path),
  761. (self.refs, b"refs/heads/main", main_path),
  762. (self.refs, b"refs/heads/wt-main", main_path),
  763. (self.refs, b"refs/worktree/foo", main_path),
  764. (self.refs, b"refs/bisect/good", main_path),
  765. (self.wt_refs, b"HEAD", wt_path),
  766. (self.wt_refs, b"refs/heads/main", common),
  767. (self.wt_refs, b"refs/heads/wt-main", common),
  768. (self.wt_refs, b"refs/worktree/foo", wt_path),
  769. (self.wt_refs, b"refs/bisect/good", wt_path),
  770. ]
  771. for refs, refname, git_dir in cases:
  772. with self.subTest(refs=refs, refname=refname, git_dir=git_dir):
  773. refpath = refs.refpath(refname)
  774. expected_path = os.path.join(
  775. git_dir, refname.replace(b"/", os.fsencode(os.sep))
  776. )
  777. self.assertEqual(refpath, expected_path)
  778. def test_shared_ref(self) -> None:
  779. self.assertEqual(self.refs[b"refs/heads/main"], self.first_commit)
  780. self.assertEqual(self.refs[b"refs/heads/wt-main"], self.second_commit)
  781. self.assertEqual(self.wt_refs[b"refs/heads/main"], self.first_commit)
  782. self.assertEqual(self.wt_refs[b"refs/heads/wt-main"], self.second_commit)
  783. expected = {b"HEAD", b"refs/heads/main", b"refs/heads/wt-main"}
  784. self.assertEqual(expected, self.refs.keys())
  785. self.assertEqual(expected, self.wt_refs.keys())
  786. self.assertEqual({b"main", b"wt-main"}, set(self.refs.keys(b"refs/heads/")))
  787. self.assertEqual({b"main", b"wt-main"}, set(self.wt_refs.keys(b"refs/heads/")))
  788. ref_path = os.path.join(self.refs.path, b"refs", b"heads", b"main")
  789. self.assertTrue(os.path.exists(ref_path))
  790. ref_path = os.path.join(self.wt_refs.worktree_path, b"refs", b"heads", b"main")
  791. self.assertFalse(os.path.exists(ref_path))
  792. def test_per_worktree_ref(self) -> None:
  793. path = self.refs.path
  794. wt_path = self.wt_refs.worktree_path
  795. self.assertEqual(self.refs[b"HEAD"], self.first_commit)
  796. self.assertEqual(self.wt_refs[b"HEAD"], self.second_commit)
  797. self.refs[b"refs/bisect/good"] = self.first_commit
  798. self.wt_refs[b"refs/bisect/good"] = self.second_commit
  799. self.refs[b"refs/bisect/start"] = self.first_commit
  800. self.wt_refs[b"refs/bisect/bad"] = self.second_commit
  801. self.assertEqual(self.refs[b"refs/bisect/good"], self.first_commit)
  802. self.assertEqual(self.wt_refs[b"refs/bisect/good"], self.second_commit)
  803. self.assertTrue(os.path.exists(os.path.join(path, b"refs", b"bisect", b"good")))
  804. self.assertTrue(
  805. os.path.exists(os.path.join(wt_path, b"refs", b"bisect", b"good"))
  806. )
  807. self.assertEqual(self.refs[b"refs/bisect/start"], self.first_commit)
  808. with self.assertRaises(KeyError):
  809. self.wt_refs[b"refs/bisect/start"]
  810. self.assertTrue(
  811. os.path.exists(os.path.join(path, b"refs", b"bisect", b"start"))
  812. )
  813. self.assertFalse(
  814. os.path.exists(os.path.join(wt_path, b"refs", b"bisect", b"start"))
  815. )
  816. with self.assertRaises(KeyError):
  817. self.refs[b"refs/bisect/bad"]
  818. self.assertEqual(self.wt_refs[b"refs/bisect/bad"], self.second_commit)
  819. self.assertFalse(os.path.exists(os.path.join(path, b"refs", b"bisect", b"bad")))
  820. self.assertTrue(
  821. os.path.exists(os.path.join(wt_path, b"refs", b"bisect", b"bad"))
  822. )
  823. expected_refs = {
  824. b"HEAD",
  825. b"refs/heads/main",
  826. b"refs/heads/wt-main",
  827. b"refs/bisect/good",
  828. b"refs/bisect/start",
  829. }
  830. self.assertEqual(self.refs.keys(), expected_refs)
  831. self.assertEqual({b"good", b"start"}, self.refs.keys(b"refs/bisect/"))
  832. expected_wt_refs = {
  833. b"HEAD",
  834. b"refs/heads/main",
  835. b"refs/heads/wt-main",
  836. b"refs/bisect/good",
  837. b"refs/bisect/bad",
  838. }
  839. self.assertEqual(self.wt_refs.keys(), expected_wt_refs)
  840. self.assertEqual({b"good", b"bad"}, self.wt_refs.keys(b"refs/bisect/"))
  841. def test_delete_per_worktree_ref(self) -> None:
  842. self.refs[b"refs/worktree/foo"] = self.first_commit
  843. self.wt_refs[b"refs/worktree/foo"] = self.second_commit
  844. del self.wt_refs[b"refs/worktree/foo"]
  845. with self.assertRaises(KeyError):
  846. self.wt_refs[b"refs/worktree/foo"]
  847. del self.refs[b"refs/worktree/foo"]
  848. with self.assertRaises(KeyError):
  849. self.refs[b"refs/worktree/foo"]
  850. def test_delete_shared_ref(self) -> None:
  851. self.refs[b"refs/heads/branch"] = self.first_commit
  852. del self.wt_refs[b"refs/heads/branch"]
  853. with self.assertRaises(KeyError):
  854. self.wt_refs[b"refs/heads/branch"]
  855. with self.assertRaises(KeyError):
  856. self.refs[b"refs/heads/branch"]
  857. def test_contains_shared_ref(self):
  858. self.assertIn(b"refs/heads/main", self.refs)
  859. self.assertIn(b"refs/heads/main", self.wt_refs)
  860. self.assertIn(b"refs/heads/wt-main", self.refs)
  861. self.assertIn(b"refs/heads/wt-main", self.wt_refs)
  862. def test_contains_per_worktree_ref(self):
  863. self.refs[b"refs/worktree/foo"] = self.first_commit
  864. self.wt_refs[b"refs/worktree/bar"] = self.second_commit
  865. self.assertIn(b"refs/worktree/foo", self.refs)
  866. self.assertNotIn(b"refs/worktree/bar", self.refs)
  867. self.assertNotIn(b"refs/worktree/foo", self.wt_refs)
  868. self.assertIn(b"refs/worktree/bar", self.wt_refs)
  869. _TEST_REFS_SERIALIZED = (
  870. b"42d06bd4b77fed026b154d16493e5deab78f02ec\t"
  871. b"refs/heads/40-char-ref-aaaaaaaaaaaaaaaaaa\n"
  872. b"42d06bd4b77fed026b154d16493e5deab78f02ec\trefs/heads/master\n"
  873. b"42d06bd4b77fed026b154d16493e5deab78f02ec\trefs/heads/packed\n"
  874. b"df6800012397fb85c56e7418dd4eb9405dee075c\trefs/tags/refs-0.1\n"
  875. b"3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8\trefs/tags/refs-0.2\n"
  876. )
  877. class DiskRefsContainerPathlibTests(TestCase):
  878. def test_pathlib_init(self) -> None:
  879. from pathlib import Path
  880. from dulwich.refs import DiskRefsContainer
  881. # Create a temporary directory
  882. temp_dir = tempfile.mkdtemp()
  883. self.addCleanup(os.rmdir, temp_dir)
  884. # Test with pathlib.Path
  885. path_obj = Path(temp_dir)
  886. refs = DiskRefsContainer(path_obj)
  887. self.assertEqual(refs.path, temp_dir.encode())
  888. # Test refpath with pathlib initialized container
  889. ref_path = refs.refpath(b"HEAD")
  890. self.assertTrue(isinstance(ref_path, bytes))
  891. self.assertEqual(ref_path, os.path.join(temp_dir.encode(), b"HEAD"))
  892. def test_pathlib_worktree_path(self) -> None:
  893. from pathlib import Path
  894. from dulwich.refs import DiskRefsContainer
  895. # Create temporary directories
  896. temp_dir = tempfile.mkdtemp()
  897. worktree_dir = tempfile.mkdtemp()
  898. self.addCleanup(os.rmdir, temp_dir)
  899. self.addCleanup(os.rmdir, worktree_dir)
  900. # Test with pathlib.Path for both paths
  901. path_obj = Path(temp_dir)
  902. worktree_obj = Path(worktree_dir)
  903. refs = DiskRefsContainer(path_obj, worktree_path=worktree_obj)
  904. self.assertEqual(refs.path, temp_dir.encode())
  905. self.assertEqual(refs.worktree_path, worktree_dir.encode())
  906. # Test refpath returns worktree path for HEAD
  907. ref_path = refs.refpath(b"HEAD")
  908. self.assertEqual(ref_path, os.path.join(worktree_dir.encode(), b"HEAD"))
  909. class InfoRefsContainerTests(TestCase):
  910. def test_invalid_refname(self) -> None:
  911. text = _TEST_REFS_SERIALIZED + b"00" * 20 + b"\trefs/stash\n"
  912. refs = InfoRefsContainer(BytesIO(text))
  913. expected_refs = dict(_TEST_REFS)
  914. del expected_refs[b"HEAD"]
  915. expected_refs[b"refs/stash"] = b"00" * 20
  916. del expected_refs[b"refs/heads/loop"]
  917. self.assertEqual(expected_refs, refs.as_dict())
  918. def test_keys(self) -> None:
  919. refs = InfoRefsContainer(BytesIO(_TEST_REFS_SERIALIZED))
  920. actual_keys = set(refs.keys())
  921. self.assertEqual(set(refs.allkeys()), actual_keys)
  922. expected_refs = dict(_TEST_REFS)
  923. del expected_refs[b"HEAD"]
  924. del expected_refs[b"refs/heads/loop"]
  925. self.assertEqual(set(expected_refs.keys()), actual_keys)
  926. actual_keys = refs.keys(b"refs/heads")
  927. actual_keys.discard(b"loop")
  928. self.assertEqual(
  929. [b"40-char-ref-aaaaaaaaaaaaaaaaaa", b"master", b"packed"],
  930. sorted(actual_keys),
  931. )
  932. self.assertEqual([b"refs-0.1", b"refs-0.2"], sorted(refs.keys(b"refs/tags")))
  933. def test_as_dict(self) -> None:
  934. refs = InfoRefsContainer(BytesIO(_TEST_REFS_SERIALIZED))
  935. # refs/heads/loop does not show up even if it exists
  936. expected_refs = dict(_TEST_REFS)
  937. del expected_refs[b"HEAD"]
  938. del expected_refs[b"refs/heads/loop"]
  939. self.assertEqual(expected_refs, refs.as_dict())
  940. def test_contains(self) -> None:
  941. refs = InfoRefsContainer(BytesIO(_TEST_REFS_SERIALIZED))
  942. self.assertIn(b"refs/heads/master", refs)
  943. self.assertNotIn(b"refs/heads/bar", refs)
  944. def test_get_peeled(self) -> None:
  945. refs = InfoRefsContainer(BytesIO(_TEST_REFS_SERIALIZED))
  946. # refs/heads/loop does not show up even if it exists
  947. self.assertEqual(
  948. _TEST_REFS[b"refs/heads/master"],
  949. refs.get_peeled(b"refs/heads/master"),
  950. )
  951. class ParseSymrefValueTests(TestCase):
  952. def test_valid(self) -> None:
  953. self.assertEqual(b"refs/heads/foo", parse_symref_value(b"ref: refs/heads/foo"))
  954. def test_invalid(self) -> None:
  955. self.assertRaises(ValueError, parse_symref_value, b"foobar")
  956. class ParseRemoteRefTests(TestCase):
  957. def test_valid(self) -> None:
  958. # Test simple case
  959. remote, branch = parse_remote_ref(b"refs/remotes/origin/main")
  960. self.assertEqual(b"origin", remote)
  961. self.assertEqual(b"main", branch)
  962. # Test with branch containing slashes
  963. remote, branch = parse_remote_ref(b"refs/remotes/upstream/feature/new-ui")
  964. self.assertEqual(b"upstream", remote)
  965. self.assertEqual(b"feature/new-ui", branch)
  966. def test_invalid_not_remote_ref(self) -> None:
  967. # Not a remote ref
  968. with self.assertRaises(ValueError) as cm:
  969. parse_remote_ref(b"refs/heads/main")
  970. self.assertIn("Not a remote ref", str(cm.exception))
  971. def test_invalid_format(self) -> None:
  972. # Missing branch name
  973. with self.assertRaises(ValueError) as cm:
  974. parse_remote_ref(b"refs/remotes/origin")
  975. self.assertIn("Invalid remote ref format", str(cm.exception))
  976. # Just the prefix
  977. with self.assertRaises(ValueError) as cm:
  978. parse_remote_ref(b"refs/remotes/")
  979. self.assertIn("Invalid remote ref format", str(cm.exception))
  980. class StripPeeledRefsTests(TestCase):
  981. all_refs: ClassVar[dict[bytes, bytes]] = {
  982. b"refs/heads/master": b"8843d7f92416211de9ebb963ff4ce28125932878",
  983. b"refs/heads/testing": b"186a005b134d8639a58b6731c7c1ea821a6eedba",
  984. b"refs/tags/1.0.0": b"a93db4b0360cc635a2b93675010bac8d101f73f0",
  985. b"refs/tags/1.0.0^{}": b"a93db4b0360cc635a2b93675010bac8d101f73f0",
  986. b"refs/tags/2.0.0": b"0749936d0956c661ac8f8d3483774509c165f89e",
  987. b"refs/tags/2.0.0^{}": b"0749936d0956c661ac8f8d3483774509c165f89e",
  988. }
  989. non_peeled_refs: ClassVar[dict[bytes, bytes]] = {
  990. b"refs/heads/master": b"8843d7f92416211de9ebb963ff4ce28125932878",
  991. b"refs/heads/testing": b"186a005b134d8639a58b6731c7c1ea821a6eedba",
  992. b"refs/tags/1.0.0": b"a93db4b0360cc635a2b93675010bac8d101f73f0",
  993. b"refs/tags/2.0.0": b"0749936d0956c661ac8f8d3483774509c165f89e",
  994. }
  995. def test_strip_peeled_refs(self) -> None:
  996. # Simple check of two dicts
  997. self.assertEqual(strip_peeled_refs(self.all_refs), self.non_peeled_refs)
  998. def test_split_peeled_refs(self) -> None:
  999. (regular, peeled) = split_peeled_refs(self.all_refs)
  1000. self.assertEqual(regular, self.non_peeled_refs)
  1001. self.assertEqual(
  1002. peeled,
  1003. {
  1004. b"refs/tags/2.0.0": b"0749936d0956c661ac8f8d3483774509c165f89e",
  1005. b"refs/tags/1.0.0": b"a93db4b0360cc635a2b93675010bac8d101f73f0",
  1006. },
  1007. )
  1008. class ShortenRefNameTests(TestCase):
  1009. """Tests for shorten_ref_name function."""
  1010. def test_branch_ref(self) -> None:
  1011. """Test shortening branch references."""
  1012. self.assertEqual(b"master", shorten_ref_name(b"refs/heads/master"))
  1013. self.assertEqual(b"develop", shorten_ref_name(b"refs/heads/develop"))
  1014. self.assertEqual(
  1015. b"feature/new-ui", shorten_ref_name(b"refs/heads/feature/new-ui")
  1016. )
  1017. def test_remote_ref(self) -> None:
  1018. """Test shortening remote references."""
  1019. self.assertEqual(b"origin/main", shorten_ref_name(b"refs/remotes/origin/main"))
  1020. self.assertEqual(
  1021. b"upstream/master", shorten_ref_name(b"refs/remotes/upstream/master")
  1022. )
  1023. self.assertEqual(
  1024. b"origin/feature/test",
  1025. shorten_ref_name(b"refs/remotes/origin/feature/test"),
  1026. )
  1027. def test_tag_ref(self) -> None:
  1028. """Test shortening tag references."""
  1029. self.assertEqual(b"v1.0", shorten_ref_name(b"refs/tags/v1.0"))
  1030. self.assertEqual(b"release-2.0", shorten_ref_name(b"refs/tags/release-2.0"))
  1031. def test_special_refs(self) -> None:
  1032. """Test that special refs are not shortened."""
  1033. self.assertEqual(b"HEAD", shorten_ref_name(b"HEAD"))
  1034. self.assertEqual(b"FETCH_HEAD", shorten_ref_name(b"FETCH_HEAD"))
  1035. self.assertEqual(b"ORIG_HEAD", shorten_ref_name(b"ORIG_HEAD"))
  1036. def test_other_refs(self) -> None:
  1037. """Test refs that don't match standard prefixes."""
  1038. # Refs that don't match any standard prefix are returned as-is
  1039. self.assertEqual(b"refs/stash", shorten_ref_name(b"refs/stash"))
  1040. self.assertEqual(b"refs/bisect/good", shorten_ref_name(b"refs/bisect/good"))
  1041. class RefUtilityFunctionsTests(TestCase):
  1042. """Tests for the new ref utility functions."""
  1043. def test_local_branch_name(self) -> None:
  1044. """Test local_branch_name function."""
  1045. from dulwich.refs import local_branch_name
  1046. # Test adding prefix to branch name
  1047. self.assertEqual(b"refs/heads/master", local_branch_name(b"master"))
  1048. self.assertEqual(b"refs/heads/develop", local_branch_name(b"develop"))
  1049. self.assertEqual(
  1050. b"refs/heads/feature/new-ui", local_branch_name(b"feature/new-ui")
  1051. )
  1052. # Test idempotency - already has prefix
  1053. self.assertEqual(b"refs/heads/master", local_branch_name(b"refs/heads/master"))
  1054. def test_local_tag_name(self) -> None:
  1055. """Test local_tag_name function."""
  1056. from dulwich.refs import local_tag_name
  1057. # Test adding prefix to tag name
  1058. self.assertEqual(b"refs/tags/v1.0", local_tag_name(b"v1.0"))
  1059. self.assertEqual(b"refs/tags/release-2.0", local_tag_name(b"release-2.0"))
  1060. # Test idempotency - already has prefix
  1061. self.assertEqual(b"refs/tags/v1.0", local_tag_name(b"refs/tags/v1.0"))
  1062. def test_extract_branch_name(self) -> None:
  1063. """Test extract_branch_name function."""
  1064. from dulwich.refs import extract_branch_name
  1065. # Test extracting branch name from full ref
  1066. self.assertEqual(b"master", extract_branch_name(b"refs/heads/master"))
  1067. self.assertEqual(b"develop", extract_branch_name(b"refs/heads/develop"))
  1068. self.assertEqual(
  1069. b"feature/new-ui", extract_branch_name(b"refs/heads/feature/new-ui")
  1070. )
  1071. # Test error on invalid ref
  1072. with self.assertRaises(ValueError) as cm:
  1073. extract_branch_name(b"refs/tags/v1.0")
  1074. self.assertIn("Not a local branch ref", str(cm.exception))
  1075. with self.assertRaises(ValueError):
  1076. extract_branch_name(b"master")
  1077. def test_extract_tag_name(self) -> None:
  1078. """Test extract_tag_name function."""
  1079. from dulwich.refs import extract_tag_name
  1080. # Test extracting tag name from full ref
  1081. self.assertEqual(b"v1.0", extract_tag_name(b"refs/tags/v1.0"))
  1082. self.assertEqual(b"release-2.0", extract_tag_name(b"refs/tags/release-2.0"))
  1083. # Test error on invalid ref
  1084. with self.assertRaises(ValueError) as cm:
  1085. extract_tag_name(b"refs/heads/master")
  1086. self.assertIn("Not a local tag ref", str(cm.exception))
  1087. with self.assertRaises(ValueError):
  1088. extract_tag_name(b"v1.0")
  1089. class NamespacedRefsContainerTests(TestCase):
  1090. """Tests for NamespacedRefsContainer."""
  1091. def setUp(self) -> None:
  1092. TestCase.setUp(self)
  1093. # Create an underlying refs container
  1094. self._underlying_refs = DictRefsContainer(dict(_TEST_REFS))
  1095. # Create a namespaced view
  1096. self._refs = NamespacedRefsContainer(self._underlying_refs, b"foo")
  1097. def test_namespace_prefix_simple(self) -> None:
  1098. """Test simple namespace prefix."""
  1099. refs = NamespacedRefsContainer(self._underlying_refs, b"foo")
  1100. self.assertEqual(b"refs/namespaces/foo/", refs._namespace_prefix)
  1101. def test_namespace_prefix_nested(self) -> None:
  1102. """Test nested namespace prefix."""
  1103. refs = NamespacedRefsContainer(self._underlying_refs, b"foo/bar")
  1104. self.assertEqual(
  1105. b"refs/namespaces/foo/refs/namespaces/bar/", refs._namespace_prefix
  1106. )
  1107. def test_allkeys_empty_namespace(self) -> None:
  1108. """Test that newly created namespace has no refs except HEAD."""
  1109. # HEAD is shared across namespaces, so it appears even in empty namespace
  1110. self.assertEqual({b"HEAD"}, self._refs.allkeys())
  1111. def test_setitem_and_getitem(self) -> None:
  1112. """Test setting and getting refs in namespace."""
  1113. sha = b"9" * 40
  1114. self._refs[b"refs/heads/master"] = sha
  1115. self.assertEqual(sha, self._refs[b"refs/heads/master"])
  1116. # Verify it's stored with the namespace prefix in underlying container
  1117. self.assertIn(
  1118. b"refs/namespaces/foo/refs/heads/master", self._underlying_refs.allkeys()
  1119. )
  1120. self.assertEqual(
  1121. sha, self._underlying_refs[b"refs/namespaces/foo/refs/heads/master"]
  1122. )
  1123. def test_head_not_namespaced(self) -> None:
  1124. """Test that HEAD is not namespaced."""
  1125. sha = b"a" * 40
  1126. self._refs[b"HEAD"] = sha
  1127. self.assertEqual(sha, self._refs[b"HEAD"])
  1128. # HEAD should be directly in the underlying container, not namespaced
  1129. self.assertIn(b"HEAD", self._underlying_refs.allkeys())
  1130. self.assertNotIn(b"refs/namespaces/foo/HEAD", self._underlying_refs.allkeys())
  1131. def test_isolation_between_namespaces(self) -> None:
  1132. """Test that different namespaces are isolated."""
  1133. sha1 = b"a" * 40
  1134. sha2 = b"b" * 40
  1135. # Create two different namespaces
  1136. refs_foo = NamespacedRefsContainer(self._underlying_refs, b"foo")
  1137. refs_bar = NamespacedRefsContainer(self._underlying_refs, b"bar")
  1138. # Set ref in foo namespace
  1139. refs_foo[b"refs/heads/master"] = sha1
  1140. # Set ref in bar namespace
  1141. refs_bar[b"refs/heads/master"] = sha2
  1142. # Each namespace should only see its own refs (plus shared HEAD)
  1143. self.assertEqual(sha1, refs_foo[b"refs/heads/master"])
  1144. self.assertEqual(sha2, refs_bar[b"refs/heads/master"])
  1145. self.assertEqual({b"HEAD", b"refs/heads/master"}, refs_foo.allkeys())
  1146. self.assertEqual({b"HEAD", b"refs/heads/master"}, refs_bar.allkeys())
  1147. def test_allkeys_filters_namespace(self) -> None:
  1148. """Test that allkeys only returns refs in the namespace."""
  1149. # Add refs in multiple namespaces
  1150. self._underlying_refs[b"refs/namespaces/foo/refs/heads/master"] = b"a" * 40
  1151. self._underlying_refs[b"refs/namespaces/foo/refs/heads/develop"] = b"b" * 40
  1152. self._underlying_refs[b"refs/namespaces/bar/refs/heads/feature"] = b"c" * 40
  1153. self._underlying_refs[b"refs/heads/global"] = b"d" * 40
  1154. # Only refs in 'foo' namespace should be visible (plus HEAD which is shared)
  1155. foo_refs = NamespacedRefsContainer(self._underlying_refs, b"foo")
  1156. self.assertEqual(
  1157. {b"HEAD", b"refs/heads/master", b"refs/heads/develop"}, foo_refs.allkeys()
  1158. )
  1159. def test_set_symbolic_ref(self) -> None:
  1160. """Test symbolic ref creation in namespace."""
  1161. sha = b"e" * 40
  1162. self._refs[b"refs/heads/develop"] = sha
  1163. self._refs.set_symbolic_ref(b"refs/heads/main", b"refs/heads/develop")
  1164. # Both target and link should be namespaced
  1165. self.assertIn(
  1166. b"refs/namespaces/foo/refs/heads/main", self._underlying_refs.allkeys()
  1167. )
  1168. self.assertEqual(
  1169. b"ref: refs/namespaces/foo/refs/heads/develop",
  1170. self._underlying_refs.read_loose_ref(
  1171. b"refs/namespaces/foo/refs/heads/main"
  1172. ),
  1173. )
  1174. def test_remove_if_equals(self) -> None:
  1175. """Test removing refs from namespace."""
  1176. sha = b"f" * 40
  1177. self._refs[b"refs/heads/temp"] = sha
  1178. # Remove the ref
  1179. self.assertTrue(self._refs.remove_if_equals(b"refs/heads/temp", sha))
  1180. self.assertNotIn(b"refs/heads/temp", self._refs.allkeys())
  1181. self.assertNotIn(
  1182. b"refs/namespaces/foo/refs/heads/temp", self._underlying_refs.allkeys()
  1183. )
  1184. def test_get_packed_refs(self) -> None:
  1185. """Test get_packed_refs returns empty dict for DictRefsContainer."""
  1186. # DictRefsContainer doesn't support packed refs, so just verify
  1187. # the wrapper returns an empty dict
  1188. packed = self._refs.get_packed_refs()
  1189. self.assertEqual({}, packed)
  1190. def test_add_if_new(self) -> None:
  1191. """Test add_if_new in namespace."""
  1192. sha = b"1" * 40
  1193. # Should succeed - ref doesn't exist
  1194. self.assertTrue(self._refs.add_if_new(b"refs/heads/new", sha))
  1195. self.assertEqual(sha, self._refs[b"refs/heads/new"])
  1196. # Should fail - ref already exists
  1197. self.assertFalse(self._refs.add_if_new(b"refs/heads/new", b"2" * 40))
  1198. self.assertEqual(sha, self._refs[b"refs/heads/new"])
  1199. def test_set_if_equals(self) -> None:
  1200. """Test set_if_equals in namespace."""
  1201. sha1 = b"a" * 40
  1202. sha2 = b"b" * 40
  1203. self._refs[b"refs/heads/test"] = sha1
  1204. # Should fail with wrong old value
  1205. self.assertFalse(self._refs.set_if_equals(b"refs/heads/test", b"c" * 40, sha2))
  1206. self.assertEqual(sha1, self._refs[b"refs/heads/test"])
  1207. # Should succeed with correct old value
  1208. self.assertTrue(self._refs.set_if_equals(b"refs/heads/test", sha1, sha2))
  1209. self.assertEqual(sha2, self._refs[b"refs/heads/test"])