12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010 |
- # test_refs.py -- tests for refs.py
- # Copyright (C) 2013 Jelmer Vernooij <jelmer@jelmer.uk>
- #
- # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
- # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
- # General Public License as published by the Free Software Foundation; version 2.0
- # or (at your option) any later version. You can redistribute it and/or
- # modify it under the terms of either of these two licenses.
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- # You should have received a copy of the licenses; if not, see
- # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
- # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
- # License, Version 2.0.
- #
- """Tests for dulwich.refs."""
- import os
- import sys
- import tempfile
- from io import BytesIO
- from typing import ClassVar
- from dulwich import errors
- from dulwich.file import GitFile
- from dulwich.objects import ZERO_SHA
- from dulwich.refs import (
- DictRefsContainer,
- InfoRefsContainer,
- SymrefLoop,
- _split_ref_line,
- check_ref_format,
- parse_remote_ref,
- parse_symref_value,
- read_packed_refs,
- read_packed_refs_with_peeled,
- split_peeled_refs,
- strip_peeled_refs,
- write_packed_refs,
- )
- from dulwich.repo import Repo
- from dulwich.tests.utils import open_repo, tear_down_repo
- from . import SkipTest, TestCase
- class CheckRefFormatTests(TestCase):
- """Tests for the check_ref_format function.
- These are the same tests as in the git test suite.
- """
- def test_valid(self) -> None:
- self.assertTrue(check_ref_format(b"heads/foo"))
- self.assertTrue(check_ref_format(b"foo/bar/baz"))
- self.assertTrue(check_ref_format(b"refs///heads/foo"))
- self.assertTrue(check_ref_format(b"foo./bar"))
- self.assertTrue(check_ref_format(b"heads/foo@bar"))
- self.assertTrue(check_ref_format(b"heads/fix.lock.error"))
- def test_invalid(self) -> None:
- self.assertFalse(check_ref_format(b"foo"))
- self.assertFalse(check_ref_format(b"heads/foo/"))
- self.assertFalse(check_ref_format(b"./foo"))
- self.assertFalse(check_ref_format(b".refs/foo"))
- self.assertFalse(check_ref_format(b"heads/foo..bar"))
- self.assertFalse(check_ref_format(b"heads/foo?bar"))
- self.assertFalse(check_ref_format(b"heads/foo.lock"))
- self.assertFalse(check_ref_format(b"heads/v@{ation"))
- self.assertFalse(check_ref_format(b"heads/foo\bar"))
- ONES = b"1" * 40
- TWOS = b"2" * 40
- THREES = b"3" * 40
- FOURS = b"4" * 40
- class PackedRefsFileTests(TestCase):
- def test_split_ref_line_errors(self) -> None:
- self.assertRaises(errors.PackedRefsException, _split_ref_line, b"singlefield")
- self.assertRaises(errors.PackedRefsException, _split_ref_line, b"badsha name")
- self.assertRaises(
- errors.PackedRefsException,
- _split_ref_line,
- ONES + b" bad/../refname",
- )
- def test_read_without_peeled(self) -> None:
- f = BytesIO(b"\n".join([b"# comment", ONES + b" ref/1", TWOS + b" ref/2"]))
- self.assertEqual(
- [(ONES, b"ref/1"), (TWOS, b"ref/2")], list(read_packed_refs(f))
- )
- def test_read_without_peeled_errors(self) -> None:
- f = BytesIO(b"\n".join([ONES + b" ref/1", b"^" + TWOS]))
- self.assertRaises(errors.PackedRefsException, list, read_packed_refs(f))
- def test_read_with_peeled(self) -> None:
- f = BytesIO(
- b"\n".join(
- [
- ONES + b" ref/1",
- TWOS + b" ref/2",
- b"^" + THREES,
- FOURS + b" ref/4",
- ]
- )
- )
- self.assertEqual(
- [
- (ONES, b"ref/1", None),
- (TWOS, b"ref/2", THREES),
- (FOURS, b"ref/4", None),
- ],
- list(read_packed_refs_with_peeled(f)),
- )
- def test_read_with_peeled_errors(self) -> None:
- f = BytesIO(b"\n".join([b"^" + TWOS, ONES + b" ref/1"]))
- self.assertRaises(errors.PackedRefsException, list, read_packed_refs(f))
- f = BytesIO(b"\n".join([ONES + b" ref/1", b"^" + TWOS, b"^" + THREES]))
- self.assertRaises(errors.PackedRefsException, list, read_packed_refs(f))
- def test_write_with_peeled(self) -> None:
- f = BytesIO()
- write_packed_refs(f, {b"ref/1": ONES, b"ref/2": TWOS}, {b"ref/1": THREES})
- self.assertEqual(
- b"\n".join(
- [
- b"# pack-refs with: peeled",
- ONES + b" ref/1",
- b"^" + THREES,
- TWOS + b" ref/2",
- ]
- )
- + b"\n",
- f.getvalue(),
- )
- def test_write_without_peeled(self) -> None:
- f = BytesIO()
- write_packed_refs(f, {b"ref/1": ONES, b"ref/2": TWOS})
- self.assertEqual(
- b"\n".join([ONES + b" ref/1", TWOS + b" ref/2"]) + b"\n",
- f.getvalue(),
- )
- # Dict of refs that we expect all RefsContainerTests subclasses to define.
- _TEST_REFS = {
- b"HEAD": b"42d06bd4b77fed026b154d16493e5deab78f02ec",
- b"refs/heads/40-char-ref-aaaaaaaaaaaaaaaaaa": b"42d06bd4b77fed026b154d16493e5deab78f02ec",
- b"refs/heads/master": b"42d06bd4b77fed026b154d16493e5deab78f02ec",
- b"refs/heads/packed": b"42d06bd4b77fed026b154d16493e5deab78f02ec",
- b"refs/tags/refs-0.1": b"df6800012397fb85c56e7418dd4eb9405dee075c",
- b"refs/tags/refs-0.2": b"3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8",
- b"refs/heads/loop": b"ref: refs/heads/loop",
- }
- class RefsContainerTests:
- def test_keys(self) -> None:
- actual_keys = set(self._refs.keys())
- self.assertEqual(set(self._refs.allkeys()), actual_keys)
- self.assertEqual(set(_TEST_REFS.keys()), actual_keys)
- actual_keys = self._refs.keys(b"refs/heads")
- actual_keys.discard(b"loop")
- self.assertEqual(
- [b"40-char-ref-aaaaaaaaaaaaaaaaaa", b"master", b"packed"],
- sorted(actual_keys),
- )
- self.assertEqual(
- [b"refs-0.1", b"refs-0.2"], sorted(self._refs.keys(b"refs/tags"))
- )
- def test_iter(self) -> None:
- actual_keys = set(self._refs.keys())
- self.assertEqual(set(self._refs), actual_keys)
- self.assertEqual(set(_TEST_REFS.keys()), actual_keys)
- def test_as_dict(self) -> None:
- # refs/heads/loop does not show up even if it exists
- expected_refs = dict(_TEST_REFS)
- del expected_refs[b"refs/heads/loop"]
- self.assertEqual(expected_refs, self._refs.as_dict())
- def test_get_symrefs(self) -> None:
- self._refs.set_symbolic_ref(b"refs/heads/src", b"refs/heads/dst")
- symrefs = self._refs.get_symrefs()
- if b"HEAD" in symrefs:
- symrefs.pop(b"HEAD")
- self.assertEqual(
- {
- b"refs/heads/src": b"refs/heads/dst",
- b"refs/heads/loop": b"refs/heads/loop",
- },
- symrefs,
- )
- def test_setitem(self) -> None:
- self._refs[b"refs/some/ref"] = b"42d06bd4b77fed026b154d16493e5deab78f02ec"
- self.assertEqual(
- b"42d06bd4b77fed026b154d16493e5deab78f02ec",
- self._refs[b"refs/some/ref"],
- )
- # should accept symref
- self._refs[b"refs/heads/symbolic"] = b"ref: refs/heads/master"
- self.assertEqual(
- b"42d06bd4b77fed026b154d16493e5deab78f02ec",
- self._refs[b"refs/heads/symbolic"],
- )
- # should not accept bad ref names
- self.assertRaises(
- errors.RefFormatError,
- self._refs.__setitem__,
- b"notrefs/foo",
- b"42d06bd4b77fed026b154d16493e5deab78f02ec",
- )
- # should not accept short sha
- self.assertRaises(
- ValueError,
- self._refs.__setitem__,
- b"refs/some/ref",
- b"42d06bd",
- )
- def test_set_if_equals(self) -> None:
- nines = b"9" * 40
- self.assertFalse(self._refs.set_if_equals(b"HEAD", b"c0ffee", nines))
- self.assertEqual(
- b"42d06bd4b77fed026b154d16493e5deab78f02ec", self._refs[b"HEAD"]
- )
- self.assertTrue(
- self._refs.set_if_equals(
- b"HEAD", b"42d06bd4b77fed026b154d16493e5deab78f02ec", nines
- )
- )
- self.assertEqual(nines, self._refs[b"HEAD"])
- # Setting the ref again is a no-op, but will return True.
- self.assertTrue(self._refs.set_if_equals(b"HEAD", nines, nines))
- self.assertEqual(nines, self._refs[b"HEAD"])
- self.assertTrue(self._refs.set_if_equals(b"refs/heads/master", None, nines))
- self.assertEqual(nines, self._refs[b"refs/heads/master"])
- self.assertTrue(
- self._refs.set_if_equals(b"refs/heads/nonexistent", ZERO_SHA, nines)
- )
- self.assertEqual(nines, self._refs[b"refs/heads/nonexistent"])
- def test_add_if_new(self) -> None:
- nines = b"9" * 40
- self.assertFalse(self._refs.add_if_new(b"refs/heads/master", nines))
- self.assertEqual(
- b"42d06bd4b77fed026b154d16493e5deab78f02ec",
- self._refs[b"refs/heads/master"],
- )
- self.assertTrue(self._refs.add_if_new(b"refs/some/ref", nines))
- self.assertEqual(nines, self._refs[b"refs/some/ref"])
- def test_set_symbolic_ref(self) -> None:
- self._refs.set_symbolic_ref(b"refs/heads/symbolic", b"refs/heads/master")
- self.assertEqual(
- b"ref: refs/heads/master",
- self._refs.read_loose_ref(b"refs/heads/symbolic"),
- )
- self.assertEqual(
- b"42d06bd4b77fed026b154d16493e5deab78f02ec",
- self._refs[b"refs/heads/symbolic"],
- )
- def test_set_symbolic_ref_overwrite(self) -> None:
- nines = b"9" * 40
- self.assertNotIn(b"refs/heads/symbolic", self._refs)
- self._refs[b"refs/heads/symbolic"] = nines
- self.assertEqual(nines, self._refs.read_loose_ref(b"refs/heads/symbolic"))
- self._refs.set_symbolic_ref(b"refs/heads/symbolic", b"refs/heads/master")
- self.assertEqual(
- b"ref: refs/heads/master",
- self._refs.read_loose_ref(b"refs/heads/symbolic"),
- )
- self.assertEqual(
- b"42d06bd4b77fed026b154d16493e5deab78f02ec",
- self._refs[b"refs/heads/symbolic"],
- )
- def test_check_refname(self) -> None:
- self._refs._check_refname(b"HEAD")
- self._refs._check_refname(b"refs/stash")
- self._refs._check_refname(b"refs/heads/foo")
- self.assertRaises(errors.RefFormatError, self._refs._check_refname, b"refs")
- self.assertRaises(
- errors.RefFormatError, self._refs._check_refname, b"notrefs/foo"
- )
- def test_contains(self) -> None:
- self.assertIn(b"refs/heads/master", self._refs)
- self.assertNotIn(b"refs/heads/bar", self._refs)
- def test_delitem(self) -> None:
- self.assertEqual(
- b"42d06bd4b77fed026b154d16493e5deab78f02ec",
- self._refs[b"refs/heads/master"],
- )
- del self._refs[b"refs/heads/master"]
- self.assertRaises(KeyError, lambda: self._refs[b"refs/heads/master"])
- def test_remove_if_equals(self) -> None:
- self.assertFalse(self._refs.remove_if_equals(b"HEAD", b"c0ffee"))
- self.assertEqual(
- b"42d06bd4b77fed026b154d16493e5deab78f02ec", self._refs[b"HEAD"]
- )
- self.assertTrue(
- self._refs.remove_if_equals(
- b"refs/tags/refs-0.2",
- b"3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8",
- )
- )
- self.assertTrue(self._refs.remove_if_equals(b"refs/tags/refs-0.2", ZERO_SHA))
- self.assertNotIn(b"refs/tags/refs-0.2", self._refs)
- def test_import_refs_name(self) -> None:
- self._refs[b"refs/remotes/origin/other"] = (
- b"48d01bd4b77fed026b154d16493e5deab78f02ec"
- )
- self._refs.import_refs(
- b"refs/remotes/origin",
- {b"master": b"42d06bd4b77fed026b154d16493e5deab78f02ec"},
- )
- self.assertEqual(
- b"42d06bd4b77fed026b154d16493e5deab78f02ec",
- self._refs[b"refs/remotes/origin/master"],
- )
- self.assertEqual(
- b"48d01bd4b77fed026b154d16493e5deab78f02ec",
- self._refs[b"refs/remotes/origin/other"],
- )
- def test_import_refs_name_prune(self) -> None:
- self._refs[b"refs/remotes/origin/other"] = (
- b"48d01bd4b77fed026b154d16493e5deab78f02ec"
- )
- self._refs.import_refs(
- b"refs/remotes/origin",
- {b"master": b"42d06bd4b77fed026b154d16493e5deab78f02ec"},
- prune=True,
- )
- self.assertEqual(
- b"42d06bd4b77fed026b154d16493e5deab78f02ec",
- self._refs[b"refs/remotes/origin/master"],
- )
- self.assertNotIn(b"refs/remotes/origin/other", self._refs)
- class DictRefsContainerTests(RefsContainerTests, TestCase):
- def setUp(self) -> None:
- TestCase.setUp(self)
- self._refs = DictRefsContainer(dict(_TEST_REFS))
- def test_invalid_refname(self) -> None:
- # FIXME: Move this test into RefsContainerTests, but requires
- # some way of injecting invalid refs.
- self._refs._refs[b"refs/stash"] = b"00" * 20
- expected_refs = dict(_TEST_REFS)
- del expected_refs[b"refs/heads/loop"]
- expected_refs[b"refs/stash"] = b"00" * 20
- self.assertEqual(expected_refs, self._refs.as_dict())
- def test_set_if_equals_with_symbolic_ref(self) -> None:
- # Test that set_if_equals only updates the requested ref,
- # not all refs in a symbolic reference chain
- # The bug in the original implementation was that when follow()
- # was called on a ref, it would return all refs in the chain,
- # and set_if_equals would update ALL of them instead of just the
- # requested ref.
- # Set up refs
- master_sha = b"1" * 40
- feature_sha = b"2" * 40
- new_sha = b"3" * 40
- self._refs[b"refs/heads/master"] = master_sha
- self._refs[b"refs/heads/feature"] = feature_sha
- # Create a second symbolic ref pointing to feature
- self._refs.set_symbolic_ref(b"refs/heads/other", b"refs/heads/feature")
- # Update refs/heads/other through set_if_equals
- # With the bug, this would update BOTH refs/heads/other AND refs/heads/feature
- # Without the bug, only refs/heads/other should be updated
- # Note: old_ref needs to be the actual stored value (the symref)
- self.assertTrue(
- self._refs.set_if_equals(
- b"refs/heads/other", b"ref: refs/heads/feature", new_sha
- )
- )
- # refs/heads/other should now directly point to new_sha
- self.assertEqual(self._refs.read_ref(b"refs/heads/other"), new_sha)
- # refs/heads/feature should remain unchanged
- # With the bug, refs/heads/feature would also be incorrectly updated to new_sha
- self.assertEqual(self._refs[b"refs/heads/feature"], feature_sha)
- self.assertEqual(self._refs[b"refs/heads/master"], master_sha)
- class DiskRefsContainerTests(RefsContainerTests, TestCase):
- def setUp(self) -> None:
- TestCase.setUp(self)
- self._repo = open_repo("refs.git")
- self.addCleanup(tear_down_repo, self._repo)
- self._refs = self._repo.refs
- def test_get_packed_refs(self) -> None:
- self.assertEqual(
- {
- b"refs/heads/packed": b"42d06bd4b77fed026b154d16493e5deab78f02ec",
- b"refs/tags/refs-0.1": b"df6800012397fb85c56e7418dd4eb9405dee075c",
- },
- self._refs.get_packed_refs(),
- )
- def test_get_peeled_not_packed(self) -> None:
- # not packed
- self.assertEqual(None, self._refs.get_peeled(b"refs/tags/refs-0.2"))
- self.assertEqual(
- b"3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8",
- self._refs[b"refs/tags/refs-0.2"],
- )
- # packed, known not peelable
- self.assertEqual(
- self._refs[b"refs/heads/packed"],
- self._refs.get_peeled(b"refs/heads/packed"),
- )
- # packed, peeled
- self.assertEqual(
- b"42d06bd4b77fed026b154d16493e5deab78f02ec",
- self._refs.get_peeled(b"refs/tags/refs-0.1"),
- )
- def test_setitem(self) -> None:
- RefsContainerTests.test_setitem(self)
- path = os.path.join(self._refs.path, b"refs", b"some", b"ref")
- with open(path, "rb") as f:
- self.assertEqual(b"42d06bd4b77fed026b154d16493e5deab78f02ec", f.read()[:40])
- self.assertRaises(
- OSError,
- self._refs.__setitem__,
- b"refs/some/ref/sub",
- b"42d06bd4b77fed026b154d16493e5deab78f02ec",
- )
- def test_delete_refs_container(self) -> None:
- # We shouldn't delete the refs directory
- self._refs[b"refs/heads/blah"] = b"42d06bd4b77fed026b154d16493e5deab78f02ec"
- for ref in self._refs.allkeys():
- del self._refs[ref]
- self.assertTrue(os.path.exists(os.path.join(self._refs.path, b"refs")))
- def test_setitem_packed(self) -> None:
- with open(os.path.join(self._refs.path, b"packed-refs"), "w") as f:
- f.write("# pack-refs with: peeled fully-peeled sorted \n")
- f.write("42d06bd4b77fed026b154d16493e5deab78f02ec refs/heads/packed\n")
- # It's allowed to set a new ref on a packed ref, the new ref will be
- # placed outside on refs/
- self._refs[b"refs/heads/packed"] = b"3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8"
- packed_ref_path = os.path.join(self._refs.path, b"refs", b"heads", b"packed")
- with open(packed_ref_path, "rb") as f:
- self.assertEqual(b"3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8", f.read()[:40])
- self.assertRaises(
- OSError,
- self._refs.__setitem__,
- b"refs/heads/packed/sub",
- b"42d06bd4b77fed026b154d16493e5deab78f02ec",
- )
- # this shouldn't overwrite the packed refs
- self.assertEqual(
- {b"refs/heads/packed": b"42d06bd4b77fed026b154d16493e5deab78f02ec"},
- self._refs.get_packed_refs(),
- )
- def test_add_packed_refs(self) -> None:
- # first, create a non-packed ref
- self._refs[b"refs/heads/packed"] = b"3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8"
- packed_ref_path = os.path.join(self._refs.path, b"refs", b"heads", b"packed")
- self.assertTrue(os.path.exists(packed_ref_path))
- # now overwrite that with a packed ref
- packed_refs_file_path = os.path.join(self._refs.path, b"packed-refs")
- self._refs.add_packed_refs(
- {
- b"refs/heads/packed": b"42d06bd4b77fed026b154d16493e5deab78f02ec",
- }
- )
- # that should kill the file
- self.assertFalse(os.path.exists(packed_ref_path))
- # now delete the packed ref
- self._refs.add_packed_refs(
- {
- b"refs/heads/packed": None,
- }
- )
- # and it's gone!
- self.assertFalse(os.path.exists(packed_ref_path))
- self.assertRaises(
- KeyError,
- self._refs.__getitem__,
- b"refs/heads/packed",
- )
- # just in case, make sure we can't pack HEAD
- self.assertRaises(
- ValueError,
- self._refs.add_packed_refs,
- {b"HEAD": "02ac81614bcdbd585a37b4b0edf8cb8a"},
- )
- # delete all packed refs
- self._refs.add_packed_refs({ref: None for ref in self._refs.get_packed_refs()})
- self.assertEqual({}, self._refs.get_packed_refs())
- # remove the packed ref file, and check that adding nothing doesn't affect that
- os.remove(packed_refs_file_path)
- # adding nothing doesn't make it reappear
- self._refs.add_packed_refs({})
- self.assertFalse(os.path.exists(packed_refs_file_path))
- def test_setitem_symbolic(self) -> None:
- ones = b"1" * 40
- self._refs[b"HEAD"] = ones
- self.assertEqual(ones, self._refs[b"HEAD"])
- # ensure HEAD was not modified
- f = open(os.path.join(self._refs.path, b"HEAD"), "rb")
- v = next(iter(f)).rstrip(b"\n\r")
- f.close()
- self.assertEqual(b"ref: refs/heads/master", v)
- # ensure the symbolic link was written through
- f = open(os.path.join(self._refs.path, b"refs", b"heads", b"master"), "rb")
- self.assertEqual(ones, f.read()[:40])
- f.close()
- def test_set_if_equals(self) -> None:
- RefsContainerTests.test_set_if_equals(self)
- # ensure symref was followed
- self.assertEqual(b"9" * 40, self._refs[b"refs/heads/master"])
- # ensure lockfile was deleted
- self.assertFalse(
- os.path.exists(
- os.path.join(self._refs.path, b"refs", b"heads", b"master.lock")
- )
- )
- self.assertFalse(os.path.exists(os.path.join(self._refs.path, b"HEAD.lock")))
- def test_add_if_new_packed(self) -> None:
- # don't overwrite packed ref
- self.assertFalse(self._refs.add_if_new(b"refs/tags/refs-0.1", b"9" * 40))
- self.assertEqual(
- b"df6800012397fb85c56e7418dd4eb9405dee075c",
- self._refs[b"refs/tags/refs-0.1"],
- )
- def test_add_if_new_symbolic(self) -> None:
- # Use an empty repo instead of the default.
- repo_dir = os.path.join(tempfile.mkdtemp(), "test")
- os.makedirs(repo_dir)
- repo = Repo.init(repo_dir)
- self.addCleanup(tear_down_repo, repo)
- refs = repo.refs
- nines = b"9" * 40
- self.assertEqual(b"ref: refs/heads/master", refs.read_ref(b"HEAD"))
- self.assertNotIn(b"refs/heads/master", refs)
- self.assertTrue(refs.add_if_new(b"HEAD", nines))
- self.assertEqual(b"ref: refs/heads/master", refs.read_ref(b"HEAD"))
- self.assertEqual(nines, refs[b"HEAD"])
- self.assertEqual(nines, refs[b"refs/heads/master"])
- self.assertFalse(refs.add_if_new(b"HEAD", b"1" * 40))
- self.assertEqual(nines, refs[b"HEAD"])
- self.assertEqual(nines, refs[b"refs/heads/master"])
- def test_follow(self) -> None:
- self.assertEqual(
- (
- [b"HEAD", b"refs/heads/master"],
- b"42d06bd4b77fed026b154d16493e5deab78f02ec",
- ),
- self._refs.follow(b"HEAD"),
- )
- self.assertEqual(
- (
- [b"refs/heads/master"],
- b"42d06bd4b77fed026b154d16493e5deab78f02ec",
- ),
- self._refs.follow(b"refs/heads/master"),
- )
- self.assertRaises(SymrefLoop, self._refs.follow, b"refs/heads/loop")
- def test_set_overwrite_loop(self) -> None:
- self.assertRaises(SymrefLoop, self._refs.follow, b"refs/heads/loop")
- self._refs[b"refs/heads/loop"] = b"42d06bd4b77fed026b154d16493e5deab78f02ec"
- self.assertEqual(
- ([b"refs/heads/loop"], b"42d06bd4b77fed026b154d16493e5deab78f02ec"),
- self._refs.follow(b"refs/heads/loop"),
- )
- def test_delitem(self) -> None:
- RefsContainerTests.test_delitem(self)
- ref_file = os.path.join(self._refs.path, b"refs", b"heads", b"master")
- self.assertFalse(os.path.exists(ref_file))
- self.assertNotIn(b"refs/heads/master", self._refs.get_packed_refs())
- def test_delitem_symbolic(self) -> None:
- self.assertEqual(b"ref: refs/heads/master", self._refs.read_loose_ref(b"HEAD"))
- del self._refs[b"HEAD"]
- self.assertRaises(KeyError, lambda: self._refs[b"HEAD"])
- self.assertEqual(
- b"42d06bd4b77fed026b154d16493e5deab78f02ec",
- self._refs[b"refs/heads/master"],
- )
- self.assertFalse(os.path.exists(os.path.join(self._refs.path, b"HEAD")))
- def test_remove_if_equals_symref(self) -> None:
- # HEAD is a symref, so shouldn't equal its dereferenced value
- self.assertFalse(
- self._refs.remove_if_equals(
- b"HEAD", b"42d06bd4b77fed026b154d16493e5deab78f02ec"
- )
- )
- self.assertTrue(
- self._refs.remove_if_equals(
- b"refs/heads/master",
- b"42d06bd4b77fed026b154d16493e5deab78f02ec",
- )
- )
- self.assertRaises(KeyError, lambda: self._refs[b"refs/heads/master"])
- # HEAD is now a broken symref
- self.assertRaises(KeyError, lambda: self._refs[b"HEAD"])
- self.assertEqual(b"ref: refs/heads/master", self._refs.read_loose_ref(b"HEAD"))
- self.assertFalse(
- os.path.exists(
- os.path.join(self._refs.path, b"refs", b"heads", b"master.lock")
- )
- )
- self.assertFalse(os.path.exists(os.path.join(self._refs.path, b"HEAD.lock")))
- def test_remove_packed_without_peeled(self) -> None:
- refs_file = os.path.join(self._repo.path, "packed-refs")
- f = GitFile(refs_file)
- refs_data = f.read()
- f.close()
- f = GitFile(refs_file, "wb")
- f.write(
- b"\n".join(
- line
- for line in refs_data.split(b"\n")
- if not line or line[0] not in b"#^"
- )
- )
- f.close()
- self._repo = Repo(self._repo.path)
- refs = self._repo.refs
- self.assertTrue(
- refs.remove_if_equals(
- b"refs/heads/packed",
- b"42d06bd4b77fed026b154d16493e5deab78f02ec",
- )
- )
- def test_remove_if_equals_packed(self) -> None:
- # test removing ref that is only packed
- self.assertEqual(
- b"df6800012397fb85c56e7418dd4eb9405dee075c",
- self._refs[b"refs/tags/refs-0.1"],
- )
- self.assertTrue(
- self._refs.remove_if_equals(
- b"refs/tags/refs-0.1",
- b"df6800012397fb85c56e7418dd4eb9405dee075c",
- )
- )
- self.assertRaises(KeyError, lambda: self._refs[b"refs/tags/refs-0.1"])
- def test_remove_parent(self) -> None:
- self._refs[b"refs/heads/foo/bar"] = b"df6800012397fb85c56e7418dd4eb9405dee075c"
- del self._refs[b"refs/heads/foo/bar"]
- ref_file = os.path.join(
- self._refs.path,
- b"refs",
- b"heads",
- b"foo",
- b"bar",
- )
- self.assertFalse(os.path.exists(ref_file))
- ref_file = os.path.join(self._refs.path, b"refs", b"heads", b"foo")
- self.assertFalse(os.path.exists(ref_file))
- ref_file = os.path.join(self._refs.path, b"refs", b"heads")
- self.assertTrue(os.path.exists(ref_file))
- self._refs[b"refs/heads/foo"] = b"df6800012397fb85c56e7418dd4eb9405dee075c"
- def test_read_ref(self) -> None:
- self.assertEqual(b"ref: refs/heads/master", self._refs.read_ref(b"HEAD"))
- self.assertEqual(
- b"42d06bd4b77fed026b154d16493e5deab78f02ec",
- self._refs.read_ref(b"refs/heads/packed"),
- )
- self.assertEqual(None, self._refs.read_ref(b"nonexistent"))
- def test_read_loose_ref(self) -> None:
- self._refs[b"refs/heads/foo"] = b"df6800012397fb85c56e7418dd4eb9405dee075c"
- self.assertEqual(None, self._refs.read_ref(b"refs/heads/foo/bar"))
- def test_non_ascii(self) -> None:
- try:
- encoded_ref = os.fsencode("refs/tags/schön")
- except UnicodeEncodeError as exc:
- raise SkipTest(
- "filesystem encoding doesn't support special character"
- ) from exc
- p = os.path.join(os.fsencode(self._repo.path), encoded_ref)
- with open(p, "w") as f:
- f.write("00" * 20)
- expected_refs = dict(_TEST_REFS)
- expected_refs[encoded_ref] = b"00" * 20
- del expected_refs[b"refs/heads/loop"]
- self.assertEqual(expected_refs, self._repo.get_refs())
- def test_cyrillic(self) -> None:
- if sys.platform in ("darwin", "win32"):
- raise SkipTest("filesystem encoding doesn't support arbitrary bytes")
- # reported in https://github.com/dulwich/dulwich/issues/608
- name = b"\xcd\xee\xe2\xe0\xff\xe2\xe5\xf2\xea\xe01"
- encoded_ref = b"refs/heads/" + name
- with open(os.path.join(os.fsencode(self._repo.path), encoded_ref), "w") as f:
- f.write("00" * 20)
- expected_refs = set(_TEST_REFS.keys())
- expected_refs.add(encoded_ref)
- self.assertEqual(expected_refs, set(self._repo.refs.allkeys()))
- self.assertEqual(
- {r[len(b"refs/") :] for r in expected_refs if r.startswith(b"refs/")},
- set(self._repo.refs.subkeys(b"refs/")),
- )
- expected_refs.remove(b"refs/heads/loop")
- expected_refs.add(b"HEAD")
- self.assertEqual(expected_refs, set(self._repo.get_refs().keys()))
- def test_write_unchanged_ref_optimization(self):
- # Test that writing unchanged ref avoids fsync but still checks locks
- ref_name = b"refs/heads/unchanged"
- ref_value = b"a" * 40
- # Set initial ref value
- self._refs[ref_name] = ref_value
- # Test 1: Writing same value should succeed without changes
- result = self._refs.set_if_equals(ref_name, ref_value, ref_value)
- self.assertTrue(result)
- # Test 2: Writing same value with wrong old_ref should fail
- wrong_old = b"b" * 40
- result = self._refs.set_if_equals(ref_name, wrong_old, ref_value)
- self.assertFalse(result)
- # Test 3: Writing different value should update normally
- new_value = b"c" * 40
- result = self._refs.set_if_equals(ref_name, ref_value, new_value)
- self.assertTrue(result)
- self.assertEqual(new_value, self._refs[ref_name])
- def test_write_unchanged_ref_with_lock(self):
- # Test that file locking is still detected when ref unchanged
- from dulwich.file import FileLocked
- ref_name = b"refs/heads/locktest"
- ref_value = b"d" * 40
- # Set initial ref value
- self._refs[ref_name] = ref_value
- # Get the actual file path
- ref_file = os.path.join(os.fsencode(self._refs.path), ref_name)
- lock_file = ref_file + b".lock"
- # Create lock file to simulate another process holding lock
- with open(lock_file, "wb") as f:
- f.write(b"locked by another process")
- # Try to write same value - should raise FileLocked
- with self.assertRaises(FileLocked):
- self._refs[ref_name] = ref_value
- # Clean up lock file
- if os.path.exists(lock_file):
- os.unlink(lock_file)
- # Now it should work
- self._refs[ref_name] = ref_value
- _TEST_REFS_SERIALIZED = (
- b"42d06bd4b77fed026b154d16493e5deab78f02ec\t"
- b"refs/heads/40-char-ref-aaaaaaaaaaaaaaaaaa\n"
- b"42d06bd4b77fed026b154d16493e5deab78f02ec\trefs/heads/master\n"
- b"42d06bd4b77fed026b154d16493e5deab78f02ec\trefs/heads/packed\n"
- b"df6800012397fb85c56e7418dd4eb9405dee075c\trefs/tags/refs-0.1\n"
- b"3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8\trefs/tags/refs-0.2\n"
- )
- class DiskRefsContainerPathlibTests(TestCase):
- def test_pathlib_init(self) -> None:
- from pathlib import Path
- from dulwich.refs import DiskRefsContainer
- # Create a temporary directory
- temp_dir = tempfile.mkdtemp()
- self.addCleanup(os.rmdir, temp_dir)
- # Test with pathlib.Path
- path_obj = Path(temp_dir)
- refs = DiskRefsContainer(path_obj)
- self.assertEqual(refs.path, temp_dir.encode())
- # Test refpath with pathlib initialized container
- ref_path = refs.refpath(b"HEAD")
- self.assertTrue(isinstance(ref_path, bytes))
- self.assertEqual(ref_path, os.path.join(temp_dir.encode(), b"HEAD"))
- def test_pathlib_worktree_path(self) -> None:
- from pathlib import Path
- from dulwich.refs import DiskRefsContainer
- # Create temporary directories
- temp_dir = tempfile.mkdtemp()
- worktree_dir = tempfile.mkdtemp()
- self.addCleanup(os.rmdir, temp_dir)
- self.addCleanup(os.rmdir, worktree_dir)
- # Test with pathlib.Path for both paths
- path_obj = Path(temp_dir)
- worktree_obj = Path(worktree_dir)
- refs = DiskRefsContainer(path_obj, worktree_path=worktree_obj)
- self.assertEqual(refs.path, temp_dir.encode())
- self.assertEqual(refs.worktree_path, worktree_dir.encode())
- # Test refpath returns worktree path for HEAD
- ref_path = refs.refpath(b"HEAD")
- self.assertEqual(ref_path, os.path.join(worktree_dir.encode(), b"HEAD"))
- class InfoRefsContainerTests(TestCase):
- def test_invalid_refname(self) -> None:
- text = _TEST_REFS_SERIALIZED + b"00" * 20 + b"\trefs/stash\n"
- refs = InfoRefsContainer(BytesIO(text))
- expected_refs = dict(_TEST_REFS)
- del expected_refs[b"HEAD"]
- expected_refs[b"refs/stash"] = b"00" * 20
- del expected_refs[b"refs/heads/loop"]
- self.assertEqual(expected_refs, refs.as_dict())
- def test_keys(self) -> None:
- refs = InfoRefsContainer(BytesIO(_TEST_REFS_SERIALIZED))
- actual_keys = set(refs.keys())
- self.assertEqual(set(refs.allkeys()), actual_keys)
- expected_refs = dict(_TEST_REFS)
- del expected_refs[b"HEAD"]
- del expected_refs[b"refs/heads/loop"]
- self.assertEqual(set(expected_refs.keys()), actual_keys)
- actual_keys = refs.keys(b"refs/heads")
- actual_keys.discard(b"loop")
- self.assertEqual(
- [b"40-char-ref-aaaaaaaaaaaaaaaaaa", b"master", b"packed"],
- sorted(actual_keys),
- )
- self.assertEqual([b"refs-0.1", b"refs-0.2"], sorted(refs.keys(b"refs/tags")))
- def test_as_dict(self) -> None:
- refs = InfoRefsContainer(BytesIO(_TEST_REFS_SERIALIZED))
- # refs/heads/loop does not show up even if it exists
- expected_refs = dict(_TEST_REFS)
- del expected_refs[b"HEAD"]
- del expected_refs[b"refs/heads/loop"]
- self.assertEqual(expected_refs, refs.as_dict())
- def test_contains(self) -> None:
- refs = InfoRefsContainer(BytesIO(_TEST_REFS_SERIALIZED))
- self.assertIn(b"refs/heads/master", refs)
- self.assertNotIn(b"refs/heads/bar", refs)
- def test_get_peeled(self) -> None:
- refs = InfoRefsContainer(BytesIO(_TEST_REFS_SERIALIZED))
- # refs/heads/loop does not show up even if it exists
- self.assertEqual(
- _TEST_REFS[b"refs/heads/master"],
- refs.get_peeled(b"refs/heads/master"),
- )
- class ParseSymrefValueTests(TestCase):
- def test_valid(self) -> None:
- self.assertEqual(b"refs/heads/foo", parse_symref_value(b"ref: refs/heads/foo"))
- def test_invalid(self) -> None:
- self.assertRaises(ValueError, parse_symref_value, b"foobar")
- class ParseRemoteRefTests(TestCase):
- def test_valid(self) -> None:
- # Test simple case
- remote, branch = parse_remote_ref(b"refs/remotes/origin/main")
- self.assertEqual(b"origin", remote)
- self.assertEqual(b"main", branch)
- # Test with branch containing slashes
- remote, branch = parse_remote_ref(b"refs/remotes/upstream/feature/new-ui")
- self.assertEqual(b"upstream", remote)
- self.assertEqual(b"feature/new-ui", branch)
- def test_invalid_not_remote_ref(self) -> None:
- # Not a remote ref
- with self.assertRaises(ValueError) as cm:
- parse_remote_ref(b"refs/heads/main")
- self.assertIn("Not a remote ref", str(cm.exception))
- def test_invalid_format(self) -> None:
- # Missing branch name
- with self.assertRaises(ValueError) as cm:
- parse_remote_ref(b"refs/remotes/origin")
- self.assertIn("Invalid remote ref format", str(cm.exception))
- # Just the prefix
- with self.assertRaises(ValueError) as cm:
- parse_remote_ref(b"refs/remotes/")
- self.assertIn("Invalid remote ref format", str(cm.exception))
- class StripPeeledRefsTests(TestCase):
- all_refs: ClassVar[dict[bytes, bytes]] = {
- b"refs/heads/master": b"8843d7f92416211de9ebb963ff4ce28125932878",
- b"refs/heads/testing": b"186a005b134d8639a58b6731c7c1ea821a6eedba",
- b"refs/tags/1.0.0": b"a93db4b0360cc635a2b93675010bac8d101f73f0",
- b"refs/tags/1.0.0^{}": b"a93db4b0360cc635a2b93675010bac8d101f73f0",
- b"refs/tags/2.0.0": b"0749936d0956c661ac8f8d3483774509c165f89e",
- b"refs/tags/2.0.0^{}": b"0749936d0956c661ac8f8d3483774509c165f89e",
- }
- non_peeled_refs: ClassVar[dict[bytes, bytes]] = {
- b"refs/heads/master": b"8843d7f92416211de9ebb963ff4ce28125932878",
- b"refs/heads/testing": b"186a005b134d8639a58b6731c7c1ea821a6eedba",
- b"refs/tags/1.0.0": b"a93db4b0360cc635a2b93675010bac8d101f73f0",
- b"refs/tags/2.0.0": b"0749936d0956c661ac8f8d3483774509c165f89e",
- }
- def test_strip_peeled_refs(self) -> None:
- # Simple check of two dicts
- self.assertEqual(strip_peeled_refs(self.all_refs), self.non_peeled_refs)
- def test_split_peeled_refs(self) -> None:
- (regular, peeled) = split_peeled_refs(self.all_refs)
- self.assertEqual(regular, self.non_peeled_refs)
- self.assertEqual(
- peeled,
- {
- b"refs/tags/2.0.0": b"0749936d0956c661ac8f8d3483774509c165f89e",
- b"refs/tags/1.0.0": b"a93db4b0360cc635a2b93675010bac8d101f73f0",
- },
- )
|