|
@@ -0,0 +1,740 @@
|
|
|
+# refs.py -- For dealing with git refs
|
|
|
+# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@samba.org>
|
|
|
+#
|
|
|
+# This program is free software; you can redistribute it and/or
|
|
|
+# modify it under the terms of the GNU General Public License
|
|
|
+# as published by the Free Software Foundation; version 2
|
|
|
+# of the License or (at your option) any later version of
|
|
|
+# the License.
|
|
|
+#
|
|
|
+# This program is distributed in the hope that it will be useful,
|
|
|
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
+# GNU General Public License for more details.
|
|
|
+#
|
|
|
+# You should have received a copy of the GNU General Public License
|
|
|
+# along with this program; if not, write to the Free Software
|
|
|
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
|
+# MA 02110-1301, USA.
|
|
|
+
|
|
|
+
|
|
|
+"""Ref handling.
|
|
|
+
|
|
|
+"""
|
|
|
+import errno
|
|
|
+import os
|
|
|
+
|
|
|
+from dulwich.errors import (
|
|
|
+ PackedRefsException,
|
|
|
+ RefFormatError,
|
|
|
+ )
|
|
|
+from dulwich.objects import (
|
|
|
+ hex_to_sha,
|
|
|
+ )
|
|
|
+from dulwich.file import (
|
|
|
+ GitFile,
|
|
|
+ ensure_dir_exists,
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+SYMREF = 'ref: '
|
|
|
+
|
|
|
+
|
|
|
+def check_ref_format(refname):
|
|
|
+ """Check if a refname is correctly formatted.
|
|
|
+
|
|
|
+ Implements all the same rules as git-check-ref-format[1].
|
|
|
+
|
|
|
+ [1] http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html
|
|
|
+
|
|
|
+ :param refname: The refname to check
|
|
|
+ :return: True if refname is valid, False otherwise
|
|
|
+ """
|
|
|
+ # These could be combined into one big expression, but are listed separately
|
|
|
+ # to parallel [1].
|
|
|
+ if '/.' in refname or refname.startswith('.'):
|
|
|
+ return False
|
|
|
+ if '/' not in refname:
|
|
|
+ return False
|
|
|
+ if '..' in refname:
|
|
|
+ return False
|
|
|
+ for c in refname:
|
|
|
+ if ord(c) < 040 or c in '\177 ~^:?*[':
|
|
|
+ return False
|
|
|
+ if refname[-1] in '/.':
|
|
|
+ return False
|
|
|
+ if refname.endswith('.lock'):
|
|
|
+ return False
|
|
|
+ if '@{' in refname:
|
|
|
+ return False
|
|
|
+ if '\\' in refname:
|
|
|
+ return False
|
|
|
+ return True
|
|
|
+
|
|
|
+
|
|
|
+class RefsContainer(object):
|
|
|
+ """A container for refs."""
|
|
|
+
|
|
|
+ def set_symbolic_ref(self, name, other):
|
|
|
+ """Make a ref point at another ref.
|
|
|
+
|
|
|
+ :param name: Name of the ref to set
|
|
|
+ :param other: Name of the ref to point at
|
|
|
+ """
|
|
|
+ raise NotImplementedError(self.set_symbolic_ref)
|
|
|
+
|
|
|
+ def get_packed_refs(self):
|
|
|
+ """Get contents of the packed-refs file.
|
|
|
+
|
|
|
+ :return: Dictionary mapping ref names to SHA1s
|
|
|
+
|
|
|
+ :note: Will return an empty dictionary when no packed-refs file is
|
|
|
+ present.
|
|
|
+ """
|
|
|
+ raise NotImplementedError(self.get_packed_refs)
|
|
|
+
|
|
|
+ def get_peeled(self, name):
|
|
|
+ """Return the cached peeled value of a ref, if available.
|
|
|
+
|
|
|
+ :param name: Name of the ref to peel
|
|
|
+ :return: The peeled value of the ref. If the ref is known not point to a
|
|
|
+ tag, this will be the SHA the ref refers to. If the ref may point to
|
|
|
+ a tag, but no cached information is available, None is returned.
|
|
|
+ """
|
|
|
+ return None
|
|
|
+
|
|
|
+ def import_refs(self, base, other):
|
|
|
+ for name, value in other.iteritems():
|
|
|
+ self["%s/%s" % (base, name)] = value
|
|
|
+
|
|
|
+ def allkeys(self):
|
|
|
+ """All refs present in this container."""
|
|
|
+ raise NotImplementedError(self.allkeys)
|
|
|
+
|
|
|
+ def keys(self, base=None):
|
|
|
+ """Refs present in this container.
|
|
|
+
|
|
|
+ :param base: An optional base to return refs under.
|
|
|
+ :return: An unsorted set of valid refs in this container, including
|
|
|
+ packed refs.
|
|
|
+ """
|
|
|
+ if base is not None:
|
|
|
+ return self.subkeys(base)
|
|
|
+ else:
|
|
|
+ return self.allkeys()
|
|
|
+
|
|
|
+ def subkeys(self, base):
|
|
|
+ """Refs present in this container under a base.
|
|
|
+
|
|
|
+ :param base: The base to return refs under.
|
|
|
+ :return: A set of valid refs in this container under the base; the base
|
|
|
+ prefix is stripped from the ref names returned.
|
|
|
+ """
|
|
|
+ keys = set()
|
|
|
+ base_len = len(base) + 1
|
|
|
+ for refname in self.allkeys():
|
|
|
+ if refname.startswith(base):
|
|
|
+ keys.add(refname[base_len:])
|
|
|
+ return keys
|
|
|
+
|
|
|
+ def as_dict(self, base=None):
|
|
|
+ """Return the contents of this container as a dictionary.
|
|
|
+
|
|
|
+ """
|
|
|
+ ret = {}
|
|
|
+ keys = self.keys(base)
|
|
|
+ if base is None:
|
|
|
+ base = ""
|
|
|
+ for key in keys:
|
|
|
+ try:
|
|
|
+ ret[key] = self[("%s/%s" % (base, key)).strip("/")]
|
|
|
+ except KeyError:
|
|
|
+ continue # Unable to resolve
|
|
|
+
|
|
|
+ return ret
|
|
|
+
|
|
|
+ def _check_refname(self, name):
|
|
|
+ """Ensure a refname is valid and lives in refs or is HEAD.
|
|
|
+
|
|
|
+ HEAD is not a valid refname according to git-check-ref-format, but this
|
|
|
+ class needs to be able to touch HEAD. Also, check_ref_format expects
|
|
|
+ refnames without the leading 'refs/', but this class requires that
|
|
|
+ so it cannot touch anything outside the refs dir (or HEAD).
|
|
|
+
|
|
|
+ :param name: The name of the reference.
|
|
|
+ :raises KeyError: if a refname is not HEAD or is otherwise not valid.
|
|
|
+ """
|
|
|
+ if name in ('HEAD', 'refs/stash'):
|
|
|
+ return
|
|
|
+ if not name.startswith('refs/') or not check_ref_format(name[5:]):
|
|
|
+ raise RefFormatError(name)
|
|
|
+
|
|
|
+ def read_ref(self, refname):
|
|
|
+ """Read a reference without following any references.
|
|
|
+
|
|
|
+ :param refname: The name of the reference
|
|
|
+ :return: The contents of the ref file, or None if it does
|
|
|
+ not exist.
|
|
|
+ """
|
|
|
+ contents = self.read_loose_ref(refname)
|
|
|
+ if not contents:
|
|
|
+ contents = self.get_packed_refs().get(refname, None)
|
|
|
+ return contents
|
|
|
+
|
|
|
+ def read_loose_ref(self, name):
|
|
|
+ """Read a loose reference and return its contents.
|
|
|
+
|
|
|
+ :param name: the refname to read
|
|
|
+ :return: The contents of the ref file, or None if it does
|
|
|
+ not exist.
|
|
|
+ """
|
|
|
+ raise NotImplementedError(self.read_loose_ref)
|
|
|
+
|
|
|
+ def _follow(self, name):
|
|
|
+ """Follow a reference name.
|
|
|
+
|
|
|
+ :return: a tuple of (refname, sha), where refname is the name of the
|
|
|
+ last reference in the symbolic reference chain
|
|
|
+ """
|
|
|
+ contents = SYMREF + name
|
|
|
+ depth = 0
|
|
|
+ while contents.startswith(SYMREF):
|
|
|
+ refname = contents[len(SYMREF):]
|
|
|
+ contents = self.read_ref(refname)
|
|
|
+ if not contents:
|
|
|
+ break
|
|
|
+ depth += 1
|
|
|
+ if depth > 5:
|
|
|
+ raise KeyError(name)
|
|
|
+ return refname, contents
|
|
|
+
|
|
|
+ def __contains__(self, refname):
|
|
|
+ if self.read_ref(refname):
|
|
|
+ return True
|
|
|
+ return False
|
|
|
+
|
|
|
+ def __getitem__(self, name):
|
|
|
+ """Get the SHA1 for a reference name.
|
|
|
+
|
|
|
+ This method follows all symbolic references.
|
|
|
+ """
|
|
|
+ _, sha = self._follow(name)
|
|
|
+ if sha is None:
|
|
|
+ raise KeyError(name)
|
|
|
+ return sha
|
|
|
+
|
|
|
+ def set_if_equals(self, name, old_ref, new_ref):
|
|
|
+ """Set a refname to new_ref only if it currently equals old_ref.
|
|
|
+
|
|
|
+ This method follows all symbolic references if applicable for the
|
|
|
+ subclass, and can be used to perform an atomic compare-and-swap
|
|
|
+ operation.
|
|
|
+
|
|
|
+ :param name: The refname to set.
|
|
|
+ :param old_ref: The old sha the refname must refer to, or None to set
|
|
|
+ unconditionally.
|
|
|
+ :param new_ref: The new sha the refname will refer to.
|
|
|
+ :return: True if the set was successful, False otherwise.
|
|
|
+ """
|
|
|
+ raise NotImplementedError(self.set_if_equals)
|
|
|
+
|
|
|
+ def add_if_new(self, name, ref):
|
|
|
+ """Add a new reference only if it does not already exist."""
|
|
|
+ raise NotImplementedError(self.add_if_new)
|
|
|
+
|
|
|
+ def __setitem__(self, name, ref):
|
|
|
+ """Set a reference name to point to the given SHA1.
|
|
|
+
|
|
|
+ This method follows all symbolic references if applicable for the
|
|
|
+ subclass.
|
|
|
+
|
|
|
+ :note: This method unconditionally overwrites the contents of a
|
|
|
+ reference. To update atomically only if the reference has not
|
|
|
+ changed, use set_if_equals().
|
|
|
+ :param name: The refname to set.
|
|
|
+ :param ref: The new sha the refname will refer to.
|
|
|
+ """
|
|
|
+ self.set_if_equals(name, None, ref)
|
|
|
+
|
|
|
+ def remove_if_equals(self, name, old_ref):
|
|
|
+ """Remove a refname only if it currently equals old_ref.
|
|
|
+
|
|
|
+ This method does not follow symbolic references, even if applicable for
|
|
|
+ the subclass. It can be used to perform an atomic compare-and-delete
|
|
|
+ operation.
|
|
|
+
|
|
|
+ :param name: The refname to delete.
|
|
|
+ :param old_ref: The old sha the refname must refer to, or None to delete
|
|
|
+ unconditionally.
|
|
|
+ :return: True if the delete was successful, False otherwise.
|
|
|
+ """
|
|
|
+ raise NotImplementedError(self.remove_if_equals)
|
|
|
+
|
|
|
+ def __delitem__(self, name):
|
|
|
+ """Remove a refname.
|
|
|
+
|
|
|
+ This method does not follow symbolic references, even if applicable for
|
|
|
+ the subclass.
|
|
|
+
|
|
|
+ :note: This method unconditionally deletes the contents of a reference.
|
|
|
+ To delete atomically only if the reference has not changed, use
|
|
|
+ remove_if_equals().
|
|
|
+
|
|
|
+ :param name: The refname to delete.
|
|
|
+ """
|
|
|
+ self.remove_if_equals(name, None)
|
|
|
+
|
|
|
+
|
|
|
+class DictRefsContainer(RefsContainer):
|
|
|
+ """RefsContainer backed by a simple dict.
|
|
|
+
|
|
|
+ This container does not support symbolic or packed references and is not
|
|
|
+ threadsafe.
|
|
|
+ """
|
|
|
+
|
|
|
+ def __init__(self, refs):
|
|
|
+ self._refs = refs
|
|
|
+ self._peeled = {}
|
|
|
+
|
|
|
+ def allkeys(self):
|
|
|
+ return self._refs.keys()
|
|
|
+
|
|
|
+ def read_loose_ref(self, name):
|
|
|
+ return self._refs.get(name, None)
|
|
|
+
|
|
|
+ def get_packed_refs(self):
|
|
|
+ return {}
|
|
|
+
|
|
|
+ def set_symbolic_ref(self, name, other):
|
|
|
+ self._refs[name] = SYMREF + other
|
|
|
+
|
|
|
+ def set_if_equals(self, name, old_ref, new_ref):
|
|
|
+ if old_ref is not None and self._refs.get(name, None) != old_ref:
|
|
|
+ return False
|
|
|
+ realname, _ = self._follow(name)
|
|
|
+ self._check_refname(realname)
|
|
|
+ self._refs[realname] = new_ref
|
|
|
+ return True
|
|
|
+
|
|
|
+ def add_if_new(self, name, ref):
|
|
|
+ if name in self._refs:
|
|
|
+ return False
|
|
|
+ self._refs[name] = ref
|
|
|
+ return True
|
|
|
+
|
|
|
+ def remove_if_equals(self, name, old_ref):
|
|
|
+ if old_ref is not None and self._refs.get(name, None) != old_ref:
|
|
|
+ return False
|
|
|
+ del self._refs[name]
|
|
|
+ return True
|
|
|
+
|
|
|
+ def get_peeled(self, name):
|
|
|
+ return self._peeled.get(name)
|
|
|
+
|
|
|
+ def _update(self, refs):
|
|
|
+ """Update multiple refs; intended only for testing."""
|
|
|
+ # TODO(dborowitz): replace this with a public function that uses
|
|
|
+ # set_if_equal.
|
|
|
+ self._refs.update(refs)
|
|
|
+
|
|
|
+ def _update_peeled(self, peeled):
|
|
|
+ """Update cached peeled refs; intended only for testing."""
|
|
|
+ self._peeled.update(peeled)
|
|
|
+
|
|
|
+
|
|
|
+class InfoRefsContainer(RefsContainer):
|
|
|
+ """Refs container that reads refs from a info/refs file."""
|
|
|
+
|
|
|
+ def __init__(self, f):
|
|
|
+ self._refs = {}
|
|
|
+ self._peeled = {}
|
|
|
+ for l in f.readlines():
|
|
|
+ sha, name = l.rstrip("\n").split("\t")
|
|
|
+ if name.endswith("^{}"):
|
|
|
+ name = name[:-3]
|
|
|
+ if not check_ref_format(name):
|
|
|
+ raise ValueError("invalid ref name '%s'" % name)
|
|
|
+ self._peeled[name] = sha
|
|
|
+ else:
|
|
|
+ if not check_ref_format(name):
|
|
|
+ raise ValueError("invalid ref name '%s'" % name)
|
|
|
+ self._refs[name] = sha
|
|
|
+
|
|
|
+ def allkeys(self):
|
|
|
+ return self._refs.keys()
|
|
|
+
|
|
|
+ def read_loose_ref(self, name):
|
|
|
+ return self._refs.get(name, None)
|
|
|
+
|
|
|
+ def get_packed_refs(self):
|
|
|
+ return {}
|
|
|
+
|
|
|
+ def get_peeled(self, name):
|
|
|
+ try:
|
|
|
+ return self._peeled[name]
|
|
|
+ except KeyError:
|
|
|
+ return self._refs[name]
|
|
|
+
|
|
|
+
|
|
|
+class DiskRefsContainer(RefsContainer):
|
|
|
+ """Refs container that reads refs from disk."""
|
|
|
+
|
|
|
+ def __init__(self, path):
|
|
|
+ self.path = path
|
|
|
+ self._packed_refs = None
|
|
|
+ self._peeled_refs = None
|
|
|
+
|
|
|
+ def __repr__(self):
|
|
|
+ return "%s(%r)" % (self.__class__.__name__, self.path)
|
|
|
+
|
|
|
+ def subkeys(self, base):
|
|
|
+ keys = set()
|
|
|
+ path = self.refpath(base)
|
|
|
+ for root, dirs, files in os.walk(path):
|
|
|
+ dir = root[len(path):].strip(os.path.sep).replace(os.path.sep, "/")
|
|
|
+ for filename in files:
|
|
|
+ refname = ("%s/%s" % (dir, filename)).strip("/")
|
|
|
+ # check_ref_format requires at least one /, so we prepend the
|
|
|
+ # base before calling it.
|
|
|
+ if check_ref_format("%s/%s" % (base, refname)):
|
|
|
+ keys.add(refname)
|
|
|
+ for key in self.get_packed_refs():
|
|
|
+ if key.startswith(base):
|
|
|
+ keys.add(key[len(base):].strip("/"))
|
|
|
+ return keys
|
|
|
+
|
|
|
+ def allkeys(self):
|
|
|
+ keys = set()
|
|
|
+ if os.path.exists(self.refpath("HEAD")):
|
|
|
+ keys.add("HEAD")
|
|
|
+ path = self.refpath("")
|
|
|
+ for root, dirs, files in os.walk(self.refpath("refs")):
|
|
|
+ dir = root[len(path):].strip(os.path.sep).replace(os.path.sep, "/")
|
|
|
+ for filename in files:
|
|
|
+ refname = ("%s/%s" % (dir, filename)).strip("/")
|
|
|
+ if check_ref_format(refname):
|
|
|
+ keys.add(refname)
|
|
|
+ keys.update(self.get_packed_refs())
|
|
|
+ return keys
|
|
|
+
|
|
|
+ def refpath(self, name):
|
|
|
+ """Return the disk path of a ref.
|
|
|
+
|
|
|
+ """
|
|
|
+ if os.path.sep != "/":
|
|
|
+ name = name.replace("/", os.path.sep)
|
|
|
+ return os.path.join(self.path, name)
|
|
|
+
|
|
|
+ def get_packed_refs(self):
|
|
|
+ """Get contents of the packed-refs file.
|
|
|
+
|
|
|
+ :return: Dictionary mapping ref names to SHA1s
|
|
|
+
|
|
|
+ :note: Will return an empty dictionary when no packed-refs file is
|
|
|
+ present.
|
|
|
+ """
|
|
|
+ # TODO: invalidate the cache on repacking
|
|
|
+ if self._packed_refs is None:
|
|
|
+ # set both to empty because we want _peeled_refs to be
|
|
|
+ # None if and only if _packed_refs is also None.
|
|
|
+ self._packed_refs = {}
|
|
|
+ self._peeled_refs = {}
|
|
|
+ path = os.path.join(self.path, 'packed-refs')
|
|
|
+ try:
|
|
|
+ f = GitFile(path, 'rb')
|
|
|
+ except IOError, e:
|
|
|
+ if e.errno == errno.ENOENT:
|
|
|
+ return {}
|
|
|
+ raise
|
|
|
+ try:
|
|
|
+ first_line = iter(f).next().rstrip()
|
|
|
+ if (first_line.startswith("# pack-refs") and " peeled" in
|
|
|
+ first_line):
|
|
|
+ for sha, name, peeled in read_packed_refs_with_peeled(f):
|
|
|
+ self._packed_refs[name] = sha
|
|
|
+ if peeled:
|
|
|
+ self._peeled_refs[name] = peeled
|
|
|
+ else:
|
|
|
+ f.seek(0)
|
|
|
+ for sha, name in read_packed_refs(f):
|
|
|
+ self._packed_refs[name] = sha
|
|
|
+ finally:
|
|
|
+ f.close()
|
|
|
+ return self._packed_refs
|
|
|
+
|
|
|
+ def get_peeled(self, name):
|
|
|
+ """Return the cached peeled value of a ref, if available.
|
|
|
+
|
|
|
+ :param name: Name of the ref to peel
|
|
|
+ :return: The peeled value of the ref. If the ref is known not point to a
|
|
|
+ tag, this will be the SHA the ref refers to. If the ref may point to
|
|
|
+ a tag, but no cached information is available, None is returned.
|
|
|
+ """
|
|
|
+ self.get_packed_refs()
|
|
|
+ if self._peeled_refs is None or name not in self._packed_refs:
|
|
|
+ # No cache: no peeled refs were read, or this ref is loose
|
|
|
+ return None
|
|
|
+ if name in self._peeled_refs:
|
|
|
+ return self._peeled_refs[name]
|
|
|
+ else:
|
|
|
+ # Known not peelable
|
|
|
+ return self[name]
|
|
|
+
|
|
|
+ def read_loose_ref(self, name):
|
|
|
+ """Read a reference file and return its contents.
|
|
|
+
|
|
|
+ If the reference file a symbolic reference, only read the first line of
|
|
|
+ the file. Otherwise, only read the first 40 bytes.
|
|
|
+
|
|
|
+ :param name: the refname to read, relative to refpath
|
|
|
+ :return: The contents of the ref file, or None if the file does not
|
|
|
+ exist.
|
|
|
+ :raises IOError: if any other error occurs
|
|
|
+ """
|
|
|
+ filename = self.refpath(name)
|
|
|
+ try:
|
|
|
+ f = GitFile(filename, 'rb')
|
|
|
+ try:
|
|
|
+ header = f.read(len(SYMREF))
|
|
|
+ if header == SYMREF:
|
|
|
+ # Read only the first line
|
|
|
+ return header + iter(f).next().rstrip("\r\n")
|
|
|
+ else:
|
|
|
+ # Read only the first 40 bytes
|
|
|
+ return header + f.read(40 - len(SYMREF))
|
|
|
+ finally:
|
|
|
+ f.close()
|
|
|
+ except IOError, e:
|
|
|
+ if e.errno == errno.ENOENT:
|
|
|
+ return None
|
|
|
+ raise
|
|
|
+
|
|
|
+ def _remove_packed_ref(self, name):
|
|
|
+ if self._packed_refs is None:
|
|
|
+ return
|
|
|
+ filename = os.path.join(self.path, 'packed-refs')
|
|
|
+ # reread cached refs from disk, while holding the lock
|
|
|
+ f = GitFile(filename, 'wb')
|
|
|
+ try:
|
|
|
+ self._packed_refs = None
|
|
|
+ self.get_packed_refs()
|
|
|
+
|
|
|
+ if name not in self._packed_refs:
|
|
|
+ return
|
|
|
+
|
|
|
+ del self._packed_refs[name]
|
|
|
+ if name in self._peeled_refs:
|
|
|
+ del self._peeled_refs[name]
|
|
|
+ write_packed_refs(f, self._packed_refs, self._peeled_refs)
|
|
|
+ f.close()
|
|
|
+ finally:
|
|
|
+ f.abort()
|
|
|
+
|
|
|
+ def set_symbolic_ref(self, name, other):
|
|
|
+ """Make a ref point at another ref.
|
|
|
+
|
|
|
+ :param name: Name of the ref to set
|
|
|
+ :param other: Name of the ref to point at
|
|
|
+ """
|
|
|
+ self._check_refname(name)
|
|
|
+ self._check_refname(other)
|
|
|
+ filename = self.refpath(name)
|
|
|
+ try:
|
|
|
+ f = GitFile(filename, 'wb')
|
|
|
+ try:
|
|
|
+ f.write(SYMREF + other + '\n')
|
|
|
+ except (IOError, OSError):
|
|
|
+ f.abort()
|
|
|
+ raise
|
|
|
+ finally:
|
|
|
+ f.close()
|
|
|
+
|
|
|
+ def set_if_equals(self, name, old_ref, new_ref):
|
|
|
+ """Set a refname to new_ref only if it currently equals old_ref.
|
|
|
+
|
|
|
+ This method follows all symbolic references, and can be used to perform
|
|
|
+ an atomic compare-and-swap operation.
|
|
|
+
|
|
|
+ :param name: The refname to set.
|
|
|
+ :param old_ref: The old sha the refname must refer to, or None to set
|
|
|
+ unconditionally.
|
|
|
+ :param new_ref: The new sha the refname will refer to.
|
|
|
+ :return: True if the set was successful, False otherwise.
|
|
|
+ """
|
|
|
+ self._check_refname(name)
|
|
|
+ try:
|
|
|
+ realname, _ = self._follow(name)
|
|
|
+ except KeyError:
|
|
|
+ realname = name
|
|
|
+ filename = self.refpath(realname)
|
|
|
+ ensure_dir_exists(os.path.dirname(filename))
|
|
|
+ f = GitFile(filename, 'wb')
|
|
|
+ try:
|
|
|
+ if old_ref is not None:
|
|
|
+ try:
|
|
|
+ # read again while holding the lock
|
|
|
+ orig_ref = self.read_loose_ref(realname)
|
|
|
+ if orig_ref is None:
|
|
|
+ orig_ref = self.get_packed_refs().get(realname, None)
|
|
|
+ if orig_ref != old_ref:
|
|
|
+ f.abort()
|
|
|
+ return False
|
|
|
+ except (OSError, IOError):
|
|
|
+ f.abort()
|
|
|
+ raise
|
|
|
+ try:
|
|
|
+ f.write(new_ref + "\n")
|
|
|
+ except (OSError, IOError):
|
|
|
+ f.abort()
|
|
|
+ raise
|
|
|
+ finally:
|
|
|
+ f.close()
|
|
|
+ return True
|
|
|
+
|
|
|
+ def add_if_new(self, name, ref):
|
|
|
+ """Add a new reference only if it does not already exist.
|
|
|
+
|
|
|
+ This method follows symrefs, and only ensures that the last ref in the
|
|
|
+ chain does not exist.
|
|
|
+
|
|
|
+ :param name: The refname to set.
|
|
|
+ :param ref: The new sha the refname will refer to.
|
|
|
+ :return: True if the add was successful, False otherwise.
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ realname, contents = self._follow(name)
|
|
|
+ if contents is not None:
|
|
|
+ return False
|
|
|
+ except KeyError:
|
|
|
+ realname = name
|
|
|
+ self._check_refname(realname)
|
|
|
+ filename = self.refpath(realname)
|
|
|
+ ensure_dir_exists(os.path.dirname(filename))
|
|
|
+ f = GitFile(filename, 'wb')
|
|
|
+ try:
|
|
|
+ if os.path.exists(filename) or name in self.get_packed_refs():
|
|
|
+ f.abort()
|
|
|
+ return False
|
|
|
+ try:
|
|
|
+ f.write(ref + "\n")
|
|
|
+ except (OSError, IOError):
|
|
|
+ f.abort()
|
|
|
+ raise
|
|
|
+ finally:
|
|
|
+ f.close()
|
|
|
+ return True
|
|
|
+
|
|
|
+ def remove_if_equals(self, name, old_ref):
|
|
|
+ """Remove a refname only if it currently equals old_ref.
|
|
|
+
|
|
|
+ This method does not follow symbolic references. It can be used to
|
|
|
+ perform an atomic compare-and-delete operation.
|
|
|
+
|
|
|
+ :param name: The refname to delete.
|
|
|
+ :param old_ref: The old sha the refname must refer to, or None to delete
|
|
|
+ unconditionally.
|
|
|
+ :return: True if the delete was successful, False otherwise.
|
|
|
+ """
|
|
|
+ self._check_refname(name)
|
|
|
+ filename = self.refpath(name)
|
|
|
+ ensure_dir_exists(os.path.dirname(filename))
|
|
|
+ f = GitFile(filename, 'wb')
|
|
|
+ try:
|
|
|
+ if old_ref is not None:
|
|
|
+ orig_ref = self.read_loose_ref(name)
|
|
|
+ if orig_ref is None:
|
|
|
+ orig_ref = self.get_packed_refs().get(name, None)
|
|
|
+ if orig_ref != old_ref:
|
|
|
+ return False
|
|
|
+ # may only be packed
|
|
|
+ try:
|
|
|
+ os.remove(filename)
|
|
|
+ except OSError, e:
|
|
|
+ if e.errno != errno.ENOENT:
|
|
|
+ raise
|
|
|
+ self._remove_packed_ref(name)
|
|
|
+ finally:
|
|
|
+ # never write, we just wanted the lock
|
|
|
+ f.abort()
|
|
|
+ return True
|
|
|
+
|
|
|
+
|
|
|
+def _split_ref_line(line):
|
|
|
+ """Split a single ref line into a tuple of SHA1 and name."""
|
|
|
+ fields = line.rstrip("\n").split(" ")
|
|
|
+ if len(fields) != 2:
|
|
|
+ raise PackedRefsException("invalid ref line '%s'" % line)
|
|
|
+ sha, name = fields
|
|
|
+ try:
|
|
|
+ hex_to_sha(sha)
|
|
|
+ except (AssertionError, TypeError), e:
|
|
|
+ raise PackedRefsException(e)
|
|
|
+ if not check_ref_format(name):
|
|
|
+ raise PackedRefsException("invalid ref name '%s'" % name)
|
|
|
+ return (sha, name)
|
|
|
+
|
|
|
+
|
|
|
+def read_packed_refs(f):
|
|
|
+ """Read a packed refs file.
|
|
|
+
|
|
|
+ :param f: file-like object to read from
|
|
|
+ :return: Iterator over tuples with SHA1s and ref names.
|
|
|
+ """
|
|
|
+ for l in f:
|
|
|
+ if l[0] == "#":
|
|
|
+ # Comment
|
|
|
+ continue
|
|
|
+ if l[0] == "^":
|
|
|
+ raise PackedRefsException(
|
|
|
+ "found peeled ref in packed-refs without peeled")
|
|
|
+ yield _split_ref_line(l)
|
|
|
+
|
|
|
+
|
|
|
+def read_packed_refs_with_peeled(f):
|
|
|
+ """Read a packed refs file including peeled refs.
|
|
|
+
|
|
|
+ Assumes the "# pack-refs with: peeled" line was already read. Yields tuples
|
|
|
+ with ref names, SHA1s, and peeled SHA1s (or None).
|
|
|
+
|
|
|
+ :param f: file-like object to read from, seek'ed to the second line
|
|
|
+ """
|
|
|
+ last = None
|
|
|
+ for l in f:
|
|
|
+ if l[0] == "#":
|
|
|
+ continue
|
|
|
+ l = l.rstrip("\r\n")
|
|
|
+ if l[0] == "^":
|
|
|
+ if not last:
|
|
|
+ raise PackedRefsException("unexpected peeled ref line")
|
|
|
+ try:
|
|
|
+ hex_to_sha(l[1:])
|
|
|
+ except (AssertionError, TypeError), e:
|
|
|
+ raise PackedRefsException(e)
|
|
|
+ sha, name = _split_ref_line(last)
|
|
|
+ last = None
|
|
|
+ yield (sha, name, l[1:])
|
|
|
+ else:
|
|
|
+ if last:
|
|
|
+ sha, name = _split_ref_line(last)
|
|
|
+ yield (sha, name, None)
|
|
|
+ last = l
|
|
|
+ if last:
|
|
|
+ sha, name = _split_ref_line(last)
|
|
|
+ yield (sha, name, None)
|
|
|
+
|
|
|
+
|
|
|
+def write_packed_refs(f, packed_refs, peeled_refs=None):
|
|
|
+ """Write a packed refs file.
|
|
|
+
|
|
|
+ :param f: empty file-like object to write to
|
|
|
+ :param packed_refs: dict of refname to sha of packed refs to write
|
|
|
+ :param peeled_refs: dict of refname to peeled value of sha
|
|
|
+ """
|
|
|
+ if peeled_refs is None:
|
|
|
+ peeled_refs = {}
|
|
|
+ else:
|
|
|
+ f.write('# pack-refs with: peeled\n')
|
|
|
+ for refname in sorted(packed_refs.iterkeys()):
|
|
|
+ f.write('%s %s\n' % (packed_refs[refname], refname))
|
|
|
+ if refname in peeled_refs:
|
|
|
+ f.write('^%s\n' % peeled_refs[refname])
|