test_refs.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787
  1. # test_refs.py -- tests for refs.py
  2. # encoding: utf-8
  3. # Copyright (C) 2013 Jelmer Vernooij <jelmer@jelmer.uk>
  4. #
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as public by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for dulwich.refs."""
  22. from io import BytesIO
  23. import os
  24. import sys
  25. import tempfile
  26. from dulwich import errors
  27. from dulwich.file import (
  28. GitFile,
  29. )
  30. from dulwich.objects import ZERO_SHA
  31. from dulwich.refs import (
  32. DictRefsContainer,
  33. InfoRefsContainer,
  34. check_ref_format,
  35. _split_ref_line,
  36. parse_symref_value,
  37. read_packed_refs_with_peeled,
  38. read_packed_refs,
  39. strip_peeled_refs,
  40. write_packed_refs,
  41. )
  42. from dulwich.repo import Repo
  43. from dulwich.tests import (
  44. SkipTest,
  45. TestCase,
  46. )
  47. from dulwich.tests.utils import (
  48. open_repo,
  49. tear_down_repo,
  50. )
  51. class CheckRefFormatTests(TestCase):
  52. """Tests for the check_ref_format function.
  53. These are the same tests as in the git test suite.
  54. """
  55. def test_valid(self):
  56. self.assertTrue(check_ref_format(b"heads/foo"))
  57. self.assertTrue(check_ref_format(b"foo/bar/baz"))
  58. self.assertTrue(check_ref_format(b"refs///heads/foo"))
  59. self.assertTrue(check_ref_format(b"foo./bar"))
  60. self.assertTrue(check_ref_format(b"heads/foo@bar"))
  61. self.assertTrue(check_ref_format(b"heads/fix.lock.error"))
  62. def test_invalid(self):
  63. self.assertFalse(check_ref_format(b"foo"))
  64. self.assertFalse(check_ref_format(b"heads/foo/"))
  65. self.assertFalse(check_ref_format(b"./foo"))
  66. self.assertFalse(check_ref_format(b".refs/foo"))
  67. self.assertFalse(check_ref_format(b"heads/foo..bar"))
  68. self.assertFalse(check_ref_format(b"heads/foo?bar"))
  69. self.assertFalse(check_ref_format(b"heads/foo.lock"))
  70. self.assertFalse(check_ref_format(b"heads/v@{ation"))
  71. self.assertFalse(check_ref_format(b"heads/foo\bar"))
  72. ONES = b"1" * 40
  73. TWOS = b"2" * 40
  74. THREES = b"3" * 40
  75. FOURS = b"4" * 40
  76. class PackedRefsFileTests(TestCase):
  77. def test_split_ref_line_errors(self):
  78. self.assertRaises(errors.PackedRefsException, _split_ref_line, b"singlefield")
  79. self.assertRaises(errors.PackedRefsException, _split_ref_line, b"badsha name")
  80. self.assertRaises(
  81. errors.PackedRefsException,
  82. _split_ref_line,
  83. ONES + b" bad/../refname",
  84. )
  85. def test_read_without_peeled(self):
  86. f = BytesIO(b"\n".join([b"# comment", ONES + b" ref/1", TWOS + b" ref/2"]))
  87. self.assertEqual(
  88. [(ONES, b"ref/1"), (TWOS, b"ref/2")], list(read_packed_refs(f))
  89. )
  90. def test_read_without_peeled_errors(self):
  91. f = BytesIO(b"\n".join([ONES + b" ref/1", b"^" + TWOS]))
  92. self.assertRaises(errors.PackedRefsException, list, read_packed_refs(f))
  93. def test_read_with_peeled(self):
  94. f = BytesIO(
  95. b"\n".join(
  96. [
  97. ONES + b" ref/1",
  98. TWOS + b" ref/2",
  99. b"^" + THREES,
  100. FOURS + b" ref/4",
  101. ]
  102. )
  103. )
  104. self.assertEqual(
  105. [
  106. (ONES, b"ref/1", None),
  107. (TWOS, b"ref/2", THREES),
  108. (FOURS, b"ref/4", None),
  109. ],
  110. list(read_packed_refs_with_peeled(f)),
  111. )
  112. def test_read_with_peeled_errors(self):
  113. f = BytesIO(b"\n".join([b"^" + TWOS, ONES + b" ref/1"]))
  114. self.assertRaises(errors.PackedRefsException, list, read_packed_refs(f))
  115. f = BytesIO(b"\n".join([ONES + b" ref/1", b"^" + TWOS, b"^" + THREES]))
  116. self.assertRaises(errors.PackedRefsException, list, read_packed_refs(f))
  117. def test_write_with_peeled(self):
  118. f = BytesIO()
  119. write_packed_refs(f, {b"ref/1": ONES, b"ref/2": TWOS}, {b"ref/1": THREES})
  120. self.assertEqual(
  121. b"\n".join(
  122. [
  123. b"# pack-refs with: peeled",
  124. ONES + b" ref/1",
  125. b"^" + THREES,
  126. TWOS + b" ref/2",
  127. ]
  128. )
  129. + b"\n",
  130. f.getvalue(),
  131. )
  132. def test_write_without_peeled(self):
  133. f = BytesIO()
  134. write_packed_refs(f, {b"ref/1": ONES, b"ref/2": TWOS})
  135. self.assertEqual(
  136. b"\n".join([ONES + b" ref/1", TWOS + b" ref/2"]) + b"\n",
  137. f.getvalue(),
  138. )
  139. # Dict of refs that we expect all RefsContainerTests subclasses to define.
  140. _TEST_REFS = {
  141. b"HEAD": b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  142. b"refs/heads/40-char-ref-aaaaaaaaaaaaaaaaaa": b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  143. b"refs/heads/master": b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  144. b"refs/heads/packed": b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  145. b"refs/tags/refs-0.1": b"df6800012397fb85c56e7418dd4eb9405dee075c",
  146. b"refs/tags/refs-0.2": b"3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8",
  147. b"refs/heads/loop": b"ref: refs/heads/loop",
  148. }
  149. class RefsContainerTests(object):
  150. def test_keys(self):
  151. actual_keys = set(self._refs.keys())
  152. self.assertEqual(set(self._refs.allkeys()), actual_keys)
  153. self.assertEqual(set(_TEST_REFS.keys()), actual_keys)
  154. actual_keys = self._refs.keys(b"refs/heads")
  155. actual_keys.discard(b"loop")
  156. self.assertEqual(
  157. [b"40-char-ref-aaaaaaaaaaaaaaaaaa", b"master", b"packed"],
  158. sorted(actual_keys),
  159. )
  160. self.assertEqual(
  161. [b"refs-0.1", b"refs-0.2"], sorted(self._refs.keys(b"refs/tags"))
  162. )
  163. def test_iter(self):
  164. actual_keys = set(self._refs.keys())
  165. self.assertEqual(set(self._refs), actual_keys)
  166. self.assertEqual(set(_TEST_REFS.keys()), actual_keys)
  167. def test_as_dict(self):
  168. # refs/heads/loop does not show up even if it exists
  169. expected_refs = dict(_TEST_REFS)
  170. del expected_refs[b"refs/heads/loop"]
  171. self.assertEqual(expected_refs, self._refs.as_dict())
  172. def test_get_symrefs(self):
  173. self._refs.set_symbolic_ref(b"refs/heads/src", b"refs/heads/dst")
  174. symrefs = self._refs.get_symrefs()
  175. if b"HEAD" in symrefs:
  176. symrefs.pop(b"HEAD")
  177. self.assertEqual(
  178. {
  179. b"refs/heads/src": b"refs/heads/dst",
  180. b"refs/heads/loop": b"refs/heads/loop",
  181. },
  182. symrefs,
  183. )
  184. def test_setitem(self):
  185. self._refs[b"refs/some/ref"] = b"42d06bd4b77fed026b154d16493e5deab78f02ec"
  186. self.assertEqual(
  187. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  188. self._refs[b"refs/some/ref"],
  189. )
  190. self.assertRaises(
  191. errors.RefFormatError,
  192. self._refs.__setitem__,
  193. b"notrefs/foo",
  194. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  195. )
  196. def test_set_if_equals(self):
  197. nines = b"9" * 40
  198. self.assertFalse(self._refs.set_if_equals(b"HEAD", b"c0ffee", nines))
  199. self.assertEqual(
  200. b"42d06bd4b77fed026b154d16493e5deab78f02ec", self._refs[b"HEAD"]
  201. )
  202. self.assertTrue(
  203. self._refs.set_if_equals(
  204. b"HEAD", b"42d06bd4b77fed026b154d16493e5deab78f02ec", nines
  205. )
  206. )
  207. self.assertEqual(nines, self._refs[b"HEAD"])
  208. # Setting the ref again is a no-op, but will return True.
  209. self.assertTrue(self._refs.set_if_equals(b"HEAD", nines, nines))
  210. self.assertEqual(nines, self._refs[b"HEAD"])
  211. self.assertTrue(self._refs.set_if_equals(b"refs/heads/master", None, nines))
  212. self.assertEqual(nines, self._refs[b"refs/heads/master"])
  213. self.assertTrue(
  214. self._refs.set_if_equals(b"refs/heads/nonexistant", ZERO_SHA, nines)
  215. )
  216. self.assertEqual(nines, self._refs[b"refs/heads/nonexistant"])
  217. def test_add_if_new(self):
  218. nines = b"9" * 40
  219. self.assertFalse(self._refs.add_if_new(b"refs/heads/master", nines))
  220. self.assertEqual(
  221. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  222. self._refs[b"refs/heads/master"],
  223. )
  224. self.assertTrue(self._refs.add_if_new(b"refs/some/ref", nines))
  225. self.assertEqual(nines, self._refs[b"refs/some/ref"])
  226. def test_set_symbolic_ref(self):
  227. self._refs.set_symbolic_ref(b"refs/heads/symbolic", b"refs/heads/master")
  228. self.assertEqual(
  229. b"ref: refs/heads/master",
  230. self._refs.read_loose_ref(b"refs/heads/symbolic"),
  231. )
  232. self.assertEqual(
  233. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  234. self._refs[b"refs/heads/symbolic"],
  235. )
  236. def test_set_symbolic_ref_overwrite(self):
  237. nines = b"9" * 40
  238. self.assertNotIn(b"refs/heads/symbolic", self._refs)
  239. self._refs[b"refs/heads/symbolic"] = nines
  240. self.assertEqual(nines, self._refs.read_loose_ref(b"refs/heads/symbolic"))
  241. self._refs.set_symbolic_ref(b"refs/heads/symbolic", b"refs/heads/master")
  242. self.assertEqual(
  243. b"ref: refs/heads/master",
  244. self._refs.read_loose_ref(b"refs/heads/symbolic"),
  245. )
  246. self.assertEqual(
  247. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  248. self._refs[b"refs/heads/symbolic"],
  249. )
  250. def test_check_refname(self):
  251. self._refs._check_refname(b"HEAD")
  252. self._refs._check_refname(b"refs/stash")
  253. self._refs._check_refname(b"refs/heads/foo")
  254. self.assertRaises(errors.RefFormatError, self._refs._check_refname, b"refs")
  255. self.assertRaises(
  256. errors.RefFormatError, self._refs._check_refname, b"notrefs/foo"
  257. )
  258. def test_contains(self):
  259. self.assertIn(b"refs/heads/master", self._refs)
  260. self.assertNotIn(b"refs/heads/bar", self._refs)
  261. def test_delitem(self):
  262. self.assertEqual(
  263. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  264. self._refs[b"refs/heads/master"],
  265. )
  266. del self._refs[b"refs/heads/master"]
  267. self.assertRaises(KeyError, lambda: self._refs[b"refs/heads/master"])
  268. def test_remove_if_equals(self):
  269. self.assertFalse(self._refs.remove_if_equals(b"HEAD", b"c0ffee"))
  270. self.assertEqual(
  271. b"42d06bd4b77fed026b154d16493e5deab78f02ec", self._refs[b"HEAD"]
  272. )
  273. self.assertTrue(
  274. self._refs.remove_if_equals(
  275. b"refs/tags/refs-0.2",
  276. b"3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8",
  277. )
  278. )
  279. self.assertTrue(self._refs.remove_if_equals(b"refs/tags/refs-0.2", ZERO_SHA))
  280. self.assertNotIn(b"refs/tags/refs-0.2", self._refs)
  281. def test_import_refs_name(self):
  282. self._refs[
  283. b"refs/remotes/origin/other"
  284. ] = b"48d01bd4b77fed026b154d16493e5deab78f02ec"
  285. self._refs.import_refs(
  286. b"refs/remotes/origin",
  287. {b"master": b"42d06bd4b77fed026b154d16493e5deab78f02ec"},
  288. )
  289. self.assertEqual(
  290. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  291. self._refs[b"refs/remotes/origin/master"],
  292. )
  293. self.assertEqual(
  294. b"48d01bd4b77fed026b154d16493e5deab78f02ec",
  295. self._refs[b"refs/remotes/origin/other"],
  296. )
  297. def test_import_refs_name_prune(self):
  298. self._refs[
  299. b"refs/remotes/origin/other"
  300. ] = b"48d01bd4b77fed026b154d16493e5deab78f02ec"
  301. self._refs.import_refs(
  302. b"refs/remotes/origin",
  303. {b"master": b"42d06bd4b77fed026b154d16493e5deab78f02ec"},
  304. prune=True,
  305. )
  306. self.assertEqual(
  307. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  308. self._refs[b"refs/remotes/origin/master"],
  309. )
  310. self.assertNotIn(b"refs/remotes/origin/other", self._refs)
  311. def test_watch(self):
  312. try:
  313. watcher = self._refs.watch()
  314. except (NotImplementedError, ImportError):
  315. self.skipTest("watching not supported")
  316. with watcher:
  317. self._refs[
  318. b"refs/remotes/origin/other"
  319. ] = b"48d01bd4b77fed026b154d16493e5deab78f02ec"
  320. change = next(watcher)
  321. self.assertEqual(
  322. (
  323. b"refs/remotes/origin/other",
  324. b"48d01bd4b77fed026b154d16493e5deab78f02ec",
  325. ),
  326. change,
  327. )
  328. self._refs[
  329. b"refs/remotes/origin/other"
  330. ] = b"48d01bd4b77fed026b154d16493e5deab78f02ed"
  331. change = next(watcher)
  332. self.assertEqual(
  333. (
  334. b"refs/remotes/origin/other",
  335. b"48d01bd4b77fed026b154d16493e5deab78f02ed",
  336. ),
  337. change,
  338. )
  339. del self._refs[b"refs/remotes/origin/other"]
  340. change = next(watcher)
  341. self.assertEqual((b"refs/remotes/origin/other", None), change)
  342. class DictRefsContainerTests(RefsContainerTests, TestCase):
  343. def setUp(self):
  344. TestCase.setUp(self)
  345. self._refs = DictRefsContainer(dict(_TEST_REFS))
  346. def test_invalid_refname(self):
  347. # FIXME: Move this test into RefsContainerTests, but requires
  348. # some way of injecting invalid refs.
  349. self._refs._refs[b"refs/stash"] = b"00" * 20
  350. expected_refs = dict(_TEST_REFS)
  351. del expected_refs[b"refs/heads/loop"]
  352. expected_refs[b"refs/stash"] = b"00" * 20
  353. self.assertEqual(expected_refs, self._refs.as_dict())
  354. class DiskRefsContainerTests(RefsContainerTests, TestCase):
  355. def setUp(self):
  356. TestCase.setUp(self)
  357. self._repo = open_repo("refs.git")
  358. self.addCleanup(tear_down_repo, self._repo)
  359. self._refs = self._repo.refs
  360. def test_get_packed_refs(self):
  361. self.assertEqual(
  362. {
  363. b"refs/heads/packed": b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  364. b"refs/tags/refs-0.1": b"df6800012397fb85c56e7418dd4eb9405dee075c",
  365. },
  366. self._refs.get_packed_refs(),
  367. )
  368. def test_get_peeled_not_packed(self):
  369. # not packed
  370. self.assertEqual(None, self._refs.get_peeled(b"refs/tags/refs-0.2"))
  371. self.assertEqual(
  372. b"3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8",
  373. self._refs[b"refs/tags/refs-0.2"],
  374. )
  375. # packed, known not peelable
  376. self.assertEqual(
  377. self._refs[b"refs/heads/packed"],
  378. self._refs.get_peeled(b"refs/heads/packed"),
  379. )
  380. # packed, peeled
  381. self.assertEqual(
  382. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  383. self._refs.get_peeled(b"refs/tags/refs-0.1"),
  384. )
  385. def test_setitem(self):
  386. RefsContainerTests.test_setitem(self)
  387. path = os.path.join(self._refs.path, b"refs", b"some", b"ref")
  388. with open(path, "rb") as f:
  389. self.assertEqual(b"42d06bd4b77fed026b154d16493e5deab78f02ec", f.read()[:40])
  390. self.assertRaises(
  391. OSError,
  392. self._refs.__setitem__,
  393. b"refs/some/ref/sub",
  394. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  395. )
  396. def test_delete_refs_container(self):
  397. # We shouldn't delete the refs directory
  398. self._refs[b'refs/heads/blah'] = b"42d06bd4b77fed026b154d16493e5deab78f02ec"
  399. for ref in self._refs.allkeys():
  400. del self._refs[ref]
  401. self.assertTrue(os.path.exists(os.path.join(self._refs.path, b'refs')))
  402. def test_setitem_packed(self):
  403. with open(os.path.join(self._refs.path, b"packed-refs"), "w") as f:
  404. f.write("# pack-refs with: peeled fully-peeled sorted \n")
  405. f.write("42d06bd4b77fed026b154d16493e5deab78f02ec refs/heads/packed\n")
  406. # It's allowed to set a new ref on a packed ref, the new ref will be
  407. # placed outside on refs/
  408. self._refs[b"refs/heads/packed"] = b"3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8"
  409. packed_ref_path = os.path.join(self._refs.path, b"refs", b"heads", b"packed")
  410. with open(packed_ref_path, "rb") as f:
  411. self.assertEqual(b"3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8", f.read()[:40])
  412. self.assertRaises(
  413. OSError,
  414. self._refs.__setitem__,
  415. b"refs/heads/packed/sub",
  416. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  417. )
  418. def test_setitem_symbolic(self):
  419. ones = b"1" * 40
  420. self._refs[b"HEAD"] = ones
  421. self.assertEqual(ones, self._refs[b"HEAD"])
  422. # ensure HEAD was not modified
  423. f = open(os.path.join(self._refs.path, b"HEAD"), "rb")
  424. v = next(iter(f)).rstrip(b"\n\r")
  425. f.close()
  426. self.assertEqual(b"ref: refs/heads/master", v)
  427. # ensure the symbolic link was written through
  428. f = open(os.path.join(self._refs.path, b"refs", b"heads", b"master"), "rb")
  429. self.assertEqual(ones, f.read()[:40])
  430. f.close()
  431. def test_set_if_equals(self):
  432. RefsContainerTests.test_set_if_equals(self)
  433. # ensure symref was followed
  434. self.assertEqual(b"9" * 40, self._refs[b"refs/heads/master"])
  435. # ensure lockfile was deleted
  436. self.assertFalse(
  437. os.path.exists(
  438. os.path.join(self._refs.path, b"refs", b"heads", b"master.lock")
  439. )
  440. )
  441. self.assertFalse(os.path.exists(os.path.join(self._refs.path, b"HEAD.lock")))
  442. def test_add_if_new_packed(self):
  443. # don't overwrite packed ref
  444. self.assertFalse(self._refs.add_if_new(b"refs/tags/refs-0.1", b"9" * 40))
  445. self.assertEqual(
  446. b"df6800012397fb85c56e7418dd4eb9405dee075c",
  447. self._refs[b"refs/tags/refs-0.1"],
  448. )
  449. def test_add_if_new_symbolic(self):
  450. # Use an empty repo instead of the default.
  451. repo_dir = os.path.join(tempfile.mkdtemp(), "test")
  452. os.makedirs(repo_dir)
  453. repo = Repo.init(repo_dir)
  454. self.addCleanup(tear_down_repo, repo)
  455. refs = repo.refs
  456. nines = b"9" * 40
  457. self.assertEqual(b"ref: refs/heads/master", refs.read_ref(b"HEAD"))
  458. self.assertNotIn(b"refs/heads/master", refs)
  459. self.assertTrue(refs.add_if_new(b"HEAD", nines))
  460. self.assertEqual(b"ref: refs/heads/master", refs.read_ref(b"HEAD"))
  461. self.assertEqual(nines, refs[b"HEAD"])
  462. self.assertEqual(nines, refs[b"refs/heads/master"])
  463. self.assertFalse(refs.add_if_new(b"HEAD", b"1" * 40))
  464. self.assertEqual(nines, refs[b"HEAD"])
  465. self.assertEqual(nines, refs[b"refs/heads/master"])
  466. def test_follow(self):
  467. self.assertEqual(
  468. (
  469. [b"HEAD", b"refs/heads/master"],
  470. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  471. ),
  472. self._refs.follow(b"HEAD"),
  473. )
  474. self.assertEqual(
  475. (
  476. [b"refs/heads/master"],
  477. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  478. ),
  479. self._refs.follow(b"refs/heads/master"),
  480. )
  481. self.assertRaises(KeyError, self._refs.follow, b"refs/heads/loop")
  482. def test_delitem(self):
  483. RefsContainerTests.test_delitem(self)
  484. ref_file = os.path.join(self._refs.path, b"refs", b"heads", b"master")
  485. self.assertFalse(os.path.exists(ref_file))
  486. self.assertNotIn(b"refs/heads/master", self._refs.get_packed_refs())
  487. def test_delitem_symbolic(self):
  488. self.assertEqual(b"ref: refs/heads/master", self._refs.read_loose_ref(b"HEAD"))
  489. del self._refs[b"HEAD"]
  490. self.assertRaises(KeyError, lambda: self._refs[b"HEAD"])
  491. self.assertEqual(
  492. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  493. self._refs[b"refs/heads/master"],
  494. )
  495. self.assertFalse(os.path.exists(os.path.join(self._refs.path, b"HEAD")))
  496. def test_remove_if_equals_symref(self):
  497. # HEAD is a symref, so shouldn't equal its dereferenced value
  498. self.assertFalse(
  499. self._refs.remove_if_equals(
  500. b"HEAD", b"42d06bd4b77fed026b154d16493e5deab78f02ec"
  501. )
  502. )
  503. self.assertTrue(
  504. self._refs.remove_if_equals(
  505. b"refs/heads/master",
  506. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  507. )
  508. )
  509. self.assertRaises(KeyError, lambda: self._refs[b"refs/heads/master"])
  510. # HEAD is now a broken symref
  511. self.assertRaises(KeyError, lambda: self._refs[b"HEAD"])
  512. self.assertEqual(b"ref: refs/heads/master", self._refs.read_loose_ref(b"HEAD"))
  513. self.assertFalse(
  514. os.path.exists(
  515. os.path.join(self._refs.path, b"refs", b"heads", b"master.lock")
  516. )
  517. )
  518. self.assertFalse(os.path.exists(os.path.join(self._refs.path, b"HEAD.lock")))
  519. def test_remove_packed_without_peeled(self):
  520. refs_file = os.path.join(self._repo.path, "packed-refs")
  521. f = GitFile(refs_file)
  522. refs_data = f.read()
  523. f.close()
  524. f = GitFile(refs_file, "wb")
  525. f.write(
  526. b"\n".join(
  527. line
  528. for line in refs_data.split(b"\n")
  529. if not line or line[0] not in b"#^"
  530. )
  531. )
  532. f.close()
  533. self._repo = Repo(self._repo.path)
  534. refs = self._repo.refs
  535. self.assertTrue(
  536. refs.remove_if_equals(
  537. b"refs/heads/packed",
  538. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  539. )
  540. )
  541. def test_remove_if_equals_packed(self):
  542. # test removing ref that is only packed
  543. self.assertEqual(
  544. b"df6800012397fb85c56e7418dd4eb9405dee075c",
  545. self._refs[b"refs/tags/refs-0.1"],
  546. )
  547. self.assertTrue(
  548. self._refs.remove_if_equals(
  549. b"refs/tags/refs-0.1",
  550. b"df6800012397fb85c56e7418dd4eb9405dee075c",
  551. )
  552. )
  553. self.assertRaises(KeyError, lambda: self._refs[b"refs/tags/refs-0.1"])
  554. def test_remove_parent(self):
  555. self._refs[b"refs/heads/foo/bar"] = b"df6800012397fb85c56e7418dd4eb9405dee075c"
  556. del self._refs[b"refs/heads/foo/bar"]
  557. ref_file = os.path.join(
  558. self._refs.path,
  559. b"refs",
  560. b"heads",
  561. b"foo",
  562. b"bar",
  563. )
  564. self.assertFalse(os.path.exists(ref_file))
  565. ref_file = os.path.join(self._refs.path, b"refs", b"heads", b"foo")
  566. self.assertFalse(os.path.exists(ref_file))
  567. ref_file = os.path.join(self._refs.path, b"refs", b"heads")
  568. self.assertTrue(os.path.exists(ref_file))
  569. self._refs[b"refs/heads/foo"] = b"df6800012397fb85c56e7418dd4eb9405dee075c"
  570. def test_read_ref(self):
  571. self.assertEqual(b"ref: refs/heads/master", self._refs.read_ref(b"HEAD"))
  572. self.assertEqual(
  573. b"42d06bd4b77fed026b154d16493e5deab78f02ec",
  574. self._refs.read_ref(b"refs/heads/packed"),
  575. )
  576. self.assertEqual(None, self._refs.read_ref(b"nonexistant"))
  577. def test_read_loose_ref(self):
  578. self._refs[b"refs/heads/foo"] = b"df6800012397fb85c56e7418dd4eb9405dee075c"
  579. self.assertEqual(None, self._refs.read_ref(b"refs/heads/foo/bar"))
  580. def test_non_ascii(self):
  581. try:
  582. encoded_ref = os.fsencode(u"refs/tags/schön")
  583. except UnicodeEncodeError:
  584. raise SkipTest("filesystem encoding doesn't support special character")
  585. p = os.path.join(os.fsencode(self._repo.path), encoded_ref)
  586. with open(p, "w") as f:
  587. f.write("00" * 20)
  588. expected_refs = dict(_TEST_REFS)
  589. expected_refs[encoded_ref] = b"00" * 20
  590. del expected_refs[b"refs/heads/loop"]
  591. self.assertEqual(expected_refs, self._repo.get_refs())
  592. def test_cyrillic(self):
  593. if sys.platform in ("darwin", "win32"):
  594. raise SkipTest("filesystem encoding doesn't support arbitrary bytes")
  595. # reported in https://github.com/dulwich/dulwich/issues/608
  596. name = b"\xcd\xee\xe2\xe0\xff\xe2\xe5\xf2\xea\xe01"
  597. encoded_ref = b"refs/heads/" + name
  598. with open(os.path.join(os.fsencode(self._repo.path), encoded_ref), "w") as f:
  599. f.write("00" * 20)
  600. expected_refs = set(_TEST_REFS.keys())
  601. expected_refs.add(encoded_ref)
  602. self.assertEqual(expected_refs, set(self._repo.refs.allkeys()))
  603. self.assertEqual(
  604. {r[len(b"refs/") :] for r in expected_refs if r.startswith(b"refs/")},
  605. set(self._repo.refs.subkeys(b"refs/")),
  606. )
  607. expected_refs.remove(b"refs/heads/loop")
  608. expected_refs.add(b"HEAD")
  609. self.assertEqual(expected_refs, set(self._repo.get_refs().keys()))
  610. _TEST_REFS_SERIALIZED = (
  611. b"42d06bd4b77fed026b154d16493e5deab78f02ec\t"
  612. b"refs/heads/40-char-ref-aaaaaaaaaaaaaaaaaa\n"
  613. b"42d06bd4b77fed026b154d16493e5deab78f02ec\trefs/heads/master\n"
  614. b"42d06bd4b77fed026b154d16493e5deab78f02ec\trefs/heads/packed\n"
  615. b"df6800012397fb85c56e7418dd4eb9405dee075c\trefs/tags/refs-0.1\n"
  616. b"3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8\trefs/tags/refs-0.2\n"
  617. )
  618. class InfoRefsContainerTests(TestCase):
  619. def test_invalid_refname(self):
  620. text = _TEST_REFS_SERIALIZED + b"00" * 20 + b"\trefs/stash\n"
  621. refs = InfoRefsContainer(BytesIO(text))
  622. expected_refs = dict(_TEST_REFS)
  623. del expected_refs[b"HEAD"]
  624. expected_refs[b"refs/stash"] = b"00" * 20
  625. del expected_refs[b"refs/heads/loop"]
  626. self.assertEqual(expected_refs, refs.as_dict())
  627. def test_keys(self):
  628. refs = InfoRefsContainer(BytesIO(_TEST_REFS_SERIALIZED))
  629. actual_keys = set(refs.keys())
  630. self.assertEqual(set(refs.allkeys()), actual_keys)
  631. expected_refs = dict(_TEST_REFS)
  632. del expected_refs[b"HEAD"]
  633. del expected_refs[b"refs/heads/loop"]
  634. self.assertEqual(set(expected_refs.keys()), actual_keys)
  635. actual_keys = refs.keys(b"refs/heads")
  636. actual_keys.discard(b"loop")
  637. self.assertEqual(
  638. [b"40-char-ref-aaaaaaaaaaaaaaaaaa", b"master", b"packed"],
  639. sorted(actual_keys),
  640. )
  641. self.assertEqual([b"refs-0.1", b"refs-0.2"], sorted(refs.keys(b"refs/tags")))
  642. def test_as_dict(self):
  643. refs = InfoRefsContainer(BytesIO(_TEST_REFS_SERIALIZED))
  644. # refs/heads/loop does not show up even if it exists
  645. expected_refs = dict(_TEST_REFS)
  646. del expected_refs[b"HEAD"]
  647. del expected_refs[b"refs/heads/loop"]
  648. self.assertEqual(expected_refs, refs.as_dict())
  649. def test_contains(self):
  650. refs = InfoRefsContainer(BytesIO(_TEST_REFS_SERIALIZED))
  651. self.assertIn(b"refs/heads/master", refs)
  652. self.assertNotIn(b"refs/heads/bar", refs)
  653. def test_get_peeled(self):
  654. refs = InfoRefsContainer(BytesIO(_TEST_REFS_SERIALIZED))
  655. # refs/heads/loop does not show up even if it exists
  656. self.assertEqual(
  657. _TEST_REFS[b"refs/heads/master"],
  658. refs.get_peeled(b"refs/heads/master"),
  659. )
  660. class ParseSymrefValueTests(TestCase):
  661. def test_valid(self):
  662. self.assertEqual(b"refs/heads/foo", parse_symref_value(b"ref: refs/heads/foo"))
  663. def test_invalid(self):
  664. self.assertRaises(ValueError, parse_symref_value, b"foobar")
  665. class StripPeeledRefsTests(TestCase):
  666. all_refs = {
  667. b"refs/heads/master": b"8843d7f92416211de9ebb963ff4ce28125932878",
  668. b"refs/heads/testing": b"186a005b134d8639a58b6731c7c1ea821a6eedba",
  669. b"refs/tags/1.0.0": b"a93db4b0360cc635a2b93675010bac8d101f73f0",
  670. b"refs/tags/1.0.0^{}": b"a93db4b0360cc635a2b93675010bac8d101f73f0",
  671. b"refs/tags/2.0.0": b"0749936d0956c661ac8f8d3483774509c165f89e",
  672. b"refs/tags/2.0.0^{}": b"0749936d0956c661ac8f8d3483774509c165f89e",
  673. }
  674. non_peeled_refs = {
  675. b"refs/heads/master": b"8843d7f92416211de9ebb963ff4ce28125932878",
  676. b"refs/heads/testing": b"186a005b134d8639a58b6731c7c1ea821a6eedba",
  677. b"refs/tags/1.0.0": b"a93db4b0360cc635a2b93675010bac8d101f73f0",
  678. b"refs/tags/2.0.0": b"0749936d0956c661ac8f8d3483774509c165f89e",
  679. }
  680. def test_strip_peeled_refs(self):
  681. # Simple check of two dicts
  682. self.assertEqual(strip_peeled_refs(self.all_refs), self.non_peeled_refs)