123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786 |
- # notes.py -- Git notes handling
- # Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
- #
- # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
- # General Public License as public by the Free Software Foundation; version 2.0
- # or (at your option) any later version. You can redistribute it and/or
- # modify it under the terms of either of these two licenses.
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- # You should have received a copy of the licenses; if not, see
- # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
- # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
- # License, Version 2.0.
- #
- """Git notes handling."""
- import stat
- from collections.abc import Iterator
- from typing import TYPE_CHECKING, Optional
- from .objects import Blob, Tree
- if TYPE_CHECKING:
- from .config import StackedConfig
- NOTES_REF_PREFIX = b"refs/notes/"
- DEFAULT_NOTES_REF = NOTES_REF_PREFIX + b"commits"
- def get_note_fanout_level(tree: Tree, object_store) -> int:
- """Determine the fanout level for a note tree.
- Git uses a fanout directory structure for performance with large numbers
- of notes. The fanout level determines how many levels of subdirectories
- are used.
- Args:
- tree: The notes tree to analyze
- object_store: Object store to retrieve subtrees
- Returns:
- Fanout level (0 for no fanout, 1 or 2 for fanout)
- """
- # Count the total number of notes in the tree recursively
- def count_notes(tree: Tree, level: int = 0) -> int:
- count = 0
- for name, mode, sha in tree.items():
- if stat.S_ISREG(mode):
- count += 1
- elif stat.S_ISDIR(mode) and level < 2: # Only recurse 2 levels deep
- try:
- subtree = object_store[sha]
- count += count_notes(subtree, level + 1)
- except KeyError:
- pass
- return count
- note_count = count_notes(tree)
- # Use fanout based on number of notes
- # Git typically starts using fanout around 256 notes
- if note_count < 256:
- return 0
- elif note_count < 65536: # 256^2
- return 1
- else:
- return 2
- def split_path_for_fanout(hexsha: bytes, fanout_level: int) -> tuple[bytes, ...]:
- """Split a hex SHA into path components based on fanout level.
- Args:
- hexsha: Hex SHA of the object
- fanout_level: Number of directory levels for fanout
- Returns:
- Tuple of path components
- """
- if fanout_level == 0:
- return (hexsha,)
- components = []
- for i in range(fanout_level):
- components.append(hexsha[i * 2 : (i + 1) * 2])
- components.append(hexsha[fanout_level * 2 :])
- return tuple(components)
- def get_note_path(object_sha: bytes, fanout_level: int = 0) -> bytes:
- """Get the path within the notes tree for a given object.
- Args:
- object_sha: Hex SHA of the object to get notes for
- fanout_level: Fanout level to use
- Returns:
- Path within the notes tree
- """
- components = split_path_for_fanout(object_sha, fanout_level)
- return b"/".join(components)
- class NotesTree:
- """Represents a Git notes tree."""
- def __init__(self, tree: Tree, object_store):
- """Initialize a notes tree.
- Args:
- tree: The tree object containing notes
- object_store: Object store to retrieve note contents from
- """
- self._tree = tree
- self._object_store = object_store
- self._fanout_level = self._detect_fanout_level()
- def _detect_fanout_level(self) -> int:
- """Detect the fanout level used in this notes tree.
- Returns:
- Detected fanout level
- """
- if not self._tree.items():
- return 0
- # Check for presence of both files and directories
- has_files = False
- has_dirs = False
- dir_names = []
- for name, mode, sha in self._tree.items():
- if stat.S_ISDIR(mode):
- has_dirs = True
- dir_names.append(name)
- elif stat.S_ISREG(mode):
- has_files = True
- # If we have files at the root level, check if they're full SHA names
- if has_files and not has_dirs:
- # Check if any file names are full 40-char hex strings
- for name, mode, sha in self._tree.items():
- if stat.S_ISREG(mode) and len(name) == 40:
- try:
- int(name, 16) # Verify it's a valid hex string
- return 0 # No fanout
- except ValueError:
- pass
- # Check if all directories are 2-character hex names
- if has_dirs and dir_names:
- all_two_char_hex = all(
- len(name) == 2 and all(c in b"0123456789abcdef" for c in name)
- for name in dir_names
- )
- if all_two_char_hex:
- # Check a sample directory to determine if it's level 1 or 2
- sample_dir_name = dir_names[0]
- try:
- sample_mode, sample_sha = self._tree[sample_dir_name]
- sample_tree = self._object_store[sample_sha]
- # Check if this subtree also has 2-char hex directories
- sub_has_dirs = False
- for sub_name, sub_mode, sub_sha in sample_tree.items():
- if stat.S_ISDIR(sub_mode) and len(sub_name) == 2:
- try:
- int(sub_name, 16)
- sub_has_dirs = True
- break
- except ValueError:
- pass
- return 2 if sub_has_dirs else 1
- except KeyError:
- return 1 # Assume level 1 if we can't inspect
- return 0
- def _reorganize_tree(self, new_fanout_level: int) -> None:
- """Reorganize the notes tree to use a different fanout level.
- Args:
- new_fanout_level: The desired fanout level
- """
- if new_fanout_level == self._fanout_level:
- return
- # Collect all existing notes
- notes = []
- for object_sha, note_sha in self.list_notes():
- note_obj = self._object_store[note_sha]
- if isinstance(note_obj, Blob):
- notes.append((object_sha, note_obj.data))
- # Create new empty tree
- new_tree = Tree()
- self._object_store.add_object(new_tree)
- self._tree = new_tree
- self._fanout_level = new_fanout_level
- # Re-add all notes with new fanout structure using set_note
- # Temporarily set fanout back to avoid recursion
- for object_sha, note_content in notes:
- # Use the internal tree update logic without checking fanout again
- note_blob = Blob.from_string(note_content)
- self._object_store.add_object(note_blob)
- path = get_note_path(object_sha, new_fanout_level)
- components = path.split(b"/")
- # Build new tree structure
- def update_tree(tree: Tree, components: list, blob_sha: bytes) -> Tree:
- if len(components) == 1:
- # Leaf level - add the note blob
- new_tree = Tree()
- for name, mode, sha in tree.items():
- if name != components[0]:
- new_tree.add(name, mode, sha)
- new_tree.add(components[0], stat.S_IFREG | 0o644, blob_sha)
- return new_tree
- else:
- # Directory level
- new_tree = Tree()
- found = False
- for name, mode, sha in tree.items():
- if name == components[0]:
- # Update this subtree
- if stat.S_ISDIR(mode):
- subtree = self._object_store[sha]
- else:
- # If not a directory, we need to replace it
- subtree = Tree()
- new_subtree = update_tree(subtree, components[1:], blob_sha)
- self._object_store.add_object(new_subtree)
- new_tree.add(name, stat.S_IFDIR, new_subtree.id)
- found = True
- else:
- new_tree.add(name, mode, sha)
- if not found:
- # Create new subtree path
- subtree = Tree()
- new_subtree = update_tree(subtree, components[1:], blob_sha)
- self._object_store.add_object(new_subtree)
- new_tree.add(components[0], stat.S_IFDIR, new_subtree.id)
- return new_tree
- self._tree = update_tree(self._tree, components, note_blob.id)
- self._object_store.add_object(self._tree)
- def _update_tree_entry(
- self, tree: Tree, name: bytes, mode: int, sha: bytes
- ) -> Tree:
- """Update a tree entry and return the updated tree.
- Args:
- tree: The tree to update
- name: Name of the entry
- mode: File mode
- sha: SHA of the object
- Returns:
- The updated tree
- """
- new_tree = Tree()
- for existing_name, existing_mode, existing_sha in tree.items():
- if existing_name != name:
- new_tree.add(existing_name, existing_mode, existing_sha)
- new_tree.add(name, mode, sha)
- self._object_store.add_object(new_tree)
- # Update the tree reference
- if tree is self._tree:
- self._tree = new_tree
- return new_tree
- def _get_note_sha(self, object_sha: bytes) -> Optional[bytes]:
- """Get the SHA of the note blob for an object.
- Args:
- object_sha: SHA of the object to get notes for
- Returns:
- SHA of the note blob, or None if no note exists
- """
- path = get_note_path(object_sha, self._fanout_level)
- components = path.split(b"/")
- current_tree = self._tree
- for component in components[:-1]:
- try:
- mode, sha = current_tree[component]
- if not stat.S_ISDIR(mode): # Not a directory
- return None
- current_tree = self._object_store[sha]
- except KeyError:
- return None
- try:
- mode, sha = current_tree[components[-1]]
- if not stat.S_ISREG(mode): # Not a regular file
- return None
- return sha
- except KeyError:
- return None
- def get_note(self, object_sha: bytes) -> Optional[bytes]:
- """Get the note content for an object.
- Args:
- object_sha: SHA of the object to get notes for
- Returns:
- Note content as bytes, or None if no note exists
- """
- note_sha = self._get_note_sha(object_sha)
- if note_sha is None:
- return None
- try:
- note_obj = self._object_store[note_sha]
- if not isinstance(note_obj, Blob):
- return None
- return note_obj.data
- except KeyError:
- return None
- def set_note(self, object_sha: bytes, note_content: bytes) -> Tree:
- """Set or update a note for an object.
- Args:
- object_sha: SHA of the object to annotate
- note_content: Content of the note
- Returns:
- New tree object with the note added/updated
- """
- # Create note blob
- note_blob = Blob.from_string(note_content)
- self._object_store.add_object(note_blob)
- # Check if we need to reorganize the tree for better fanout
- desired_fanout = get_note_fanout_level(self._tree, self._object_store)
- if desired_fanout != self._fanout_level:
- self._reorganize_tree(desired_fanout)
- # Get path components
- path = get_note_path(object_sha, self._fanout_level)
- components = path.split(b"/")
- # Build new tree structure
- def update_tree(tree: Tree, components: list, blob_sha: bytes) -> Tree:
- if len(components) == 1:
- # Leaf level - add the note blob
- new_tree = Tree()
- for name, mode, sha in tree.items():
- if name != components[0]:
- new_tree.add(name, mode, sha)
- new_tree.add(components[0], stat.S_IFREG | 0o644, blob_sha)
- return new_tree
- else:
- # Directory level
- new_tree = Tree()
- found = False
- for name, mode, sha in tree.items():
- if name == components[0]:
- # Update this subtree
- if stat.S_ISDIR(mode):
- subtree = self._object_store[sha]
- else:
- # If not a directory, we need to replace it
- subtree = Tree()
- new_subtree = update_tree(subtree, components[1:], blob_sha)
- self._object_store.add_object(new_subtree)
- new_tree.add(name, stat.S_IFDIR, new_subtree.id)
- found = True
- else:
- new_tree.add(name, mode, sha)
- if not found:
- # Create new subtree path
- subtree = Tree()
- new_subtree = update_tree(subtree, components[1:], blob_sha)
- self._object_store.add_object(new_subtree)
- new_tree.add(components[0], stat.S_IFDIR, new_subtree.id)
- return new_tree
- new_tree = update_tree(self._tree, components, note_blob.id)
- self._object_store.add_object(new_tree)
- self._tree = new_tree
- self._fanout_level = self._detect_fanout_level()
- return new_tree
- def remove_note(self, object_sha: bytes) -> Optional[Tree]:
- """Remove a note for an object.
- Args:
- object_sha: SHA of the object to remove notes from
- Returns:
- New tree object with the note removed, or None if no note existed
- """
- if self._get_note_sha(object_sha) is None:
- return None
- # Get path components
- path = get_note_path(object_sha, self._fanout_level)
- components = path.split(b"/")
- # Build new tree structure without the note
- def remove_from_tree(tree: Tree, components: list) -> Optional[Tree]:
- if len(components) == 1:
- # Leaf level - remove the note
- new_tree = Tree()
- found = False
- for name, mode, sha in tree.items():
- if name != components[0]:
- new_tree.add(name, mode, sha)
- else:
- found = True
- if not found:
- return None
- # Return None if tree is now empty
- return new_tree if len(new_tree) > 0 else None
- else:
- # Directory level
- new_tree = Tree()
- modified = False
- for name, mode, sha in tree.items():
- if name == components[0] and stat.S_ISDIR(mode):
- # Update this subtree
- subtree = self._object_store[sha]
- new_subtree = remove_from_tree(subtree, components[1:])
- if new_subtree is not None:
- self._object_store.add_object(new_subtree)
- new_tree.add(name, stat.S_IFDIR, new_subtree.id)
- modified = True
- else:
- new_tree.add(name, mode, sha)
- if not modified:
- return None
- # Return None if tree is now empty
- return new_tree if len(new_tree) > 0 else None
- new_tree = remove_from_tree(self._tree, components)
- if new_tree is None:
- new_tree = Tree() # Empty tree
- self._object_store.add_object(new_tree)
- self._tree = new_tree
- self._fanout_level = self._detect_fanout_level()
- return new_tree
- def list_notes(self) -> Iterator[tuple[bytes, bytes]]:
- """List all notes in this tree.
- Yields:
- Tuples of (object_sha, note_sha)
- """
- def walk_tree(tree: Tree, prefix: bytes = b"") -> Iterator[tuple[bytes, bytes]]:
- for name, mode, sha in tree.items():
- if stat.S_ISDIR(mode): # Directory
- subtree = self._object_store[sha]
- yield from walk_tree(subtree, prefix + name)
- elif stat.S_ISREG(mode): # File
- # Reconstruct the full hex SHA from the path
- full_hex = prefix + name
- yield (full_hex, sha)
- yield from walk_tree(self._tree)
- def create_notes_tree(object_store) -> Tree:
- """Create an empty notes tree.
- Args:
- object_store: Object store to add the tree to
- Returns:
- Empty tree object
- """
- tree = Tree()
- object_store.add_object(tree)
- return tree
- class Notes:
- """High-level interface for Git notes operations."""
- def __init__(self, object_store, refs_container):
- """Initialize Notes.
- Args:
- object_store: Object store to read/write objects
- refs_container: Refs container to read/write refs
- """
- self._object_store = object_store
- self._refs = refs_container
- def get_notes_ref(
- self,
- notes_ref: Optional[bytes] = None,
- config: Optional["StackedConfig"] = None,
- ) -> bytes:
- """Get the notes reference to use.
- Args:
- notes_ref: The notes ref to use, or None to use the default
- config: Config to read notes.displayRef from
- Returns:
- The notes reference name
- """
- if notes_ref is None:
- if config is not None:
- notes_ref = config.get((b"notes",), b"displayRef")
- if notes_ref is None:
- notes_ref = DEFAULT_NOTES_REF
- return notes_ref
- def get_note(
- self,
- object_sha: bytes,
- notes_ref: Optional[bytes] = None,
- config: Optional["StackedConfig"] = None,
- ) -> Optional[bytes]:
- """Get the note for an object.
- Args:
- object_sha: SHA of the object to get notes for
- notes_ref: The notes ref to use, or None to use the default
- config: Config to read notes.displayRef from
- Returns:
- The note content as bytes, or None if no note exists
- """
- notes_ref = self.get_notes_ref(notes_ref, config)
- try:
- notes_commit_sha = self._refs[notes_ref]
- except KeyError:
- return None
- # Get the commit object
- notes_obj = self._object_store[notes_commit_sha]
- # If it's a commit, get the tree from it
- from .objects import Commit
- if isinstance(notes_obj, Commit):
- notes_tree = self._object_store[notes_obj.tree]
- else:
- # If it's directly a tree (shouldn't happen in normal usage)
- notes_tree = notes_obj
- if not isinstance(notes_tree, Tree):
- return None
- notes_tree_obj = NotesTree(notes_tree, self._object_store)
- return notes_tree_obj.get_note(object_sha)
- def set_note(
- self,
- object_sha: bytes,
- note_content: bytes,
- notes_ref: Optional[bytes] = None,
- author: Optional[bytes] = None,
- committer: Optional[bytes] = None,
- message: Optional[bytes] = None,
- config: Optional["StackedConfig"] = None,
- ) -> bytes:
- """Set or update a note for an object.
- Args:
- object_sha: SHA of the object to annotate
- note_content: Content of the note
- notes_ref: The notes ref to use, or None to use the default
- author: Author identity (defaults to committer)
- committer: Committer identity (defaults to config)
- message: Commit message for the notes update
- config: Config to read user identity and notes.displayRef from
- Returns:
- SHA of the new notes commit
- """
- import time
- from .objects import Commit
- from .repo import get_user_identity
- notes_ref = self.get_notes_ref(notes_ref, config)
- # Get current notes tree
- try:
- notes_commit_sha = self._refs[notes_ref]
- notes_obj = self._object_store[notes_commit_sha]
- # If it's a commit, get the tree from it
- if isinstance(notes_obj, Commit):
- notes_tree = self._object_store[notes_obj.tree]
- else:
- # If it's directly a tree (shouldn't happen in normal usage)
- notes_tree = notes_obj
- if not isinstance(notes_tree, Tree):
- notes_tree = create_notes_tree(self._object_store)
- except KeyError:
- notes_tree = create_notes_tree(self._object_store)
- # Update notes tree
- notes_tree_obj = NotesTree(notes_tree, self._object_store)
- new_tree = notes_tree_obj.set_note(object_sha, note_content)
- # Create commit
- if committer is None and config is not None:
- committer = get_user_identity(config, kind="COMMITTER")
- if committer is None:
- committer = b"Git User <user@example.com>"
- if author is None:
- author = committer
- if message is None:
- message = b"Notes added by 'git notes add'"
- commit = Commit()
- commit.tree = new_tree.id
- commit.author = author
- commit.committer = committer
- commit.commit_time = commit.author_time = int(time.time())
- commit.commit_timezone = commit.author_timezone = 0
- commit.encoding = b"UTF-8"
- commit.message = message
- # Set parent to previous notes commit if exists
- try:
- parent_sha = self._refs[notes_ref]
- parent = self._object_store[parent_sha]
- if isinstance(parent, Commit):
- commit.parents = [parent_sha]
- except KeyError:
- commit.parents = []
- self._object_store.add_object(commit)
- self._refs[notes_ref] = commit.id
- return commit.id
- def remove_note(
- self,
- object_sha: bytes,
- notes_ref: Optional[bytes] = None,
- author: Optional[bytes] = None,
- committer: Optional[bytes] = None,
- message: Optional[bytes] = None,
- config: Optional["StackedConfig"] = None,
- ) -> Optional[bytes]:
- """Remove a note for an object.
- Args:
- object_sha: SHA of the object to remove notes from
- notes_ref: The notes ref to use, or None to use the default
- author: Author identity (defaults to committer)
- committer: Committer identity (defaults to config)
- message: Commit message for the notes removal
- config: Config to read user identity and notes.displayRef from
- Returns:
- SHA of the new notes commit, or None if no note existed
- """
- import time
- from .objects import Commit
- from .repo import get_user_identity
- notes_ref = self.get_notes_ref(notes_ref, config)
- # Get current notes tree
- try:
- notes_commit_sha = self._refs[notes_ref]
- notes_obj = self._object_store[notes_commit_sha]
- # If it's a commit, get the tree from it
- if isinstance(notes_obj, Commit):
- notes_tree = self._object_store[notes_obj.tree]
- else:
- # If it's directly a tree (shouldn't happen in normal usage)
- notes_tree = notes_obj
- if not isinstance(notes_tree, Tree):
- return None
- except KeyError:
- return None
- # Remove from notes tree
- notes_tree_obj = NotesTree(notes_tree, self._object_store)
- new_tree = notes_tree_obj.remove_note(object_sha)
- if new_tree is None:
- return None
- # Create commit
- if committer is None and config is not None:
- committer = get_user_identity(config, kind="COMMITTER")
- if committer is None:
- committer = b"Git User <user@example.com>"
- if author is None:
- author = committer
- if message is None:
- message = b"Notes removed by 'git notes remove'"
- commit = Commit()
- commit.tree = new_tree.id
- commit.author = author
- commit.committer = committer
- commit.commit_time = commit.author_time = int(time.time())
- commit.commit_timezone = commit.author_timezone = 0
- commit.encoding = b"UTF-8"
- commit.message = message
- # Set parent to previous notes commit
- parent_sha = self._refs[notes_ref]
- parent = self._object_store[parent_sha]
- if isinstance(parent, Commit):
- commit.parents = [parent_sha]
- self._object_store.add_object(commit)
- self._refs[notes_ref] = commit.id
- return commit.id
- def list_notes(
- self,
- notes_ref: Optional[bytes] = None,
- config: Optional["StackedConfig"] = None,
- ) -> list[tuple[bytes, bytes]]:
- """List all notes in a notes ref.
- Args:
- notes_ref: The notes ref to use, or None to use the default
- config: Config to read notes.displayRef from
- Returns:
- List of tuples of (object_sha, note_content)
- """
- notes_ref = self.get_notes_ref(notes_ref, config)
- try:
- notes_commit_sha = self._refs[notes_ref]
- except KeyError:
- return []
- # Get the commit object
- from .objects import Commit
- notes_obj = self._object_store[notes_commit_sha]
- # If it's a commit, get the tree from it
- if isinstance(notes_obj, Commit):
- notes_tree = self._object_store[notes_obj.tree]
- else:
- # If it's directly a tree (shouldn't happen in normal usage)
- notes_tree = notes_obj
- if not isinstance(notes_tree, Tree):
- return []
- notes_tree_obj = NotesTree(notes_tree, self._object_store)
- result = []
- for object_sha, note_sha in notes_tree_obj.list_notes():
- note_obj = self._object_store[note_sha]
- if isinstance(note_obj, Blob):
- result.append((object_sha, note_obj.data))
- return result
|