|
@@ -5,7 +5,7 @@ from typing import Optional, cast
|
|
|
try:
|
|
|
import merge3
|
|
|
except ImportError:
|
|
|
- merge3 = None
|
|
|
+ merge3 = None # type: ignore
|
|
|
|
|
|
from dulwich.object_store import BaseObjectStore
|
|
|
from dulwich.objects import S_ISGITLINK, Blob, Commit, Tree
|
|
@@ -19,116 +19,24 @@ class MergeConflict(Exception):
|
|
|
super().__init__(f"Merge conflict in {path!r}: {message}")
|
|
|
|
|
|
|
|
|
-class Merger:
|
|
|
- """Handles git merge operations."""
|
|
|
+def _can_merge_lines(
|
|
|
+ base_lines: list[bytes], a_lines: list[bytes], b_lines: list[bytes]
|
|
|
+) -> bool:
|
|
|
+ """Check if lines can be merged without conflict."""
|
|
|
+ # If one side is unchanged, we can take the other side
|
|
|
+ if base_lines == a_lines:
|
|
|
+ return True
|
|
|
+ elif base_lines == b_lines:
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ # For now, treat any difference as a conflict
|
|
|
+ # A more sophisticated algorithm would check for non-overlapping changes
|
|
|
+ return False
|
|
|
|
|
|
- def __init__(self, object_store: BaseObjectStore):
|
|
|
- """Initialize merger.
|
|
|
|
|
|
- Args:
|
|
|
- object_store: Object store to read objects from
|
|
|
- """
|
|
|
- self.object_store = object_store
|
|
|
+if merge3 is not None:
|
|
|
|
|
|
- def merge_blobs(
|
|
|
- self,
|
|
|
- base_blob: Optional[Blob],
|
|
|
- ours_blob: Optional[Blob],
|
|
|
- theirs_blob: Optional[Blob],
|
|
|
- ) -> tuple[bytes, bool]:
|
|
|
- """Perform three-way merge on blob contents.
|
|
|
-
|
|
|
- Args:
|
|
|
- base_blob: Common ancestor blob (can be None)
|
|
|
- ours_blob: Our version of the blob (can be None)
|
|
|
- theirs_blob: Their version of the blob (can be None)
|
|
|
-
|
|
|
- Returns:
|
|
|
- Tuple of (merged_content, had_conflicts)
|
|
|
- """
|
|
|
- if merge3 is None:
|
|
|
- raise ImportError(
|
|
|
- "merge3 is required for merging. Install with: pip install dulwich[merge]"
|
|
|
- )
|
|
|
-
|
|
|
- # Handle deletion cases
|
|
|
- if ours_blob is None and theirs_blob is None:
|
|
|
- return b"", False
|
|
|
-
|
|
|
- if base_blob is None:
|
|
|
- # No common ancestor
|
|
|
- if ours_blob is None:
|
|
|
- assert theirs_blob is not None
|
|
|
- return theirs_blob.data, False
|
|
|
- elif theirs_blob is None:
|
|
|
- return ours_blob.data, False
|
|
|
- elif ours_blob.data == theirs_blob.data:
|
|
|
- return ours_blob.data, False
|
|
|
- else:
|
|
|
- # Both added different content - conflict
|
|
|
- m = merge3.Merge3(
|
|
|
- [],
|
|
|
- ours_blob.data.splitlines(True),
|
|
|
- theirs_blob.data.splitlines(True),
|
|
|
- )
|
|
|
- return self._merge3_to_bytes(m), True
|
|
|
-
|
|
|
- # Get content for each version
|
|
|
- base_content = base_blob.data if base_blob else b""
|
|
|
- ours_content = ours_blob.data if ours_blob else b""
|
|
|
- theirs_content = theirs_blob.data if theirs_blob else b""
|
|
|
-
|
|
|
- # Check if either side deleted
|
|
|
- if ours_blob is None or theirs_blob is None:
|
|
|
- if ours_blob is None and theirs_blob is None:
|
|
|
- return b"", False
|
|
|
- elif ours_blob is None:
|
|
|
- # We deleted, check if they modified
|
|
|
- if base_content == theirs_content:
|
|
|
- return b"", False # They didn't modify, accept deletion
|
|
|
- else:
|
|
|
- # Conflict: we deleted, they modified
|
|
|
- m = merge3.Merge3(
|
|
|
- base_content.splitlines(True),
|
|
|
- [],
|
|
|
- theirs_content.splitlines(True),
|
|
|
- )
|
|
|
- return self._merge3_to_bytes(m), True
|
|
|
- else:
|
|
|
- # They deleted, check if we modified
|
|
|
- if base_content == ours_content:
|
|
|
- return b"", False # We didn't modify, accept deletion
|
|
|
- else:
|
|
|
- # Conflict: they deleted, we modified
|
|
|
- m = merge3.Merge3(
|
|
|
- base_content.splitlines(True),
|
|
|
- ours_content.splitlines(True),
|
|
|
- [],
|
|
|
- )
|
|
|
- return self._merge3_to_bytes(m), True
|
|
|
-
|
|
|
- # Both sides exist, check if merge is needed
|
|
|
- if ours_content == theirs_content:
|
|
|
- return ours_content, False
|
|
|
- elif base_content == ours_content:
|
|
|
- return theirs_content, False
|
|
|
- elif base_content == theirs_content:
|
|
|
- return ours_content, False
|
|
|
-
|
|
|
- # Perform three-way merge
|
|
|
- m = merge3.Merge3(
|
|
|
- base_content.splitlines(True),
|
|
|
- ours_content.splitlines(True),
|
|
|
- theirs_content.splitlines(True),
|
|
|
- )
|
|
|
-
|
|
|
- # Check for conflicts and generate merged content
|
|
|
- merged_content = self._merge3_to_bytes(m)
|
|
|
- has_conflicts = b"<<<<<<< ours" in merged_content
|
|
|
-
|
|
|
- return merged_content, has_conflicts
|
|
|
-
|
|
|
- def _merge3_to_bytes(self, m: merge3.Merge3) -> bytes:
|
|
|
+ def _merge3_to_bytes(m: merge3.Merge3) -> bytes:
|
|
|
"""Convert merge3 result to bytes with conflict markers.
|
|
|
|
|
|
Args:
|
|
@@ -152,8 +60,8 @@ class Merger:
|
|
|
base_lines, a_lines, b_lines = group[1], group[2], group[3]
|
|
|
|
|
|
# Try to merge line by line
|
|
|
- if self._can_merge_lines(base_lines, a_lines, b_lines):
|
|
|
- merged_lines = self._merge_lines(base_lines, a_lines, b_lines)
|
|
|
+ if _can_merge_lines(base_lines, a_lines, b_lines):
|
|
|
+ merged_lines = _merge_lines(base_lines, a_lines, b_lines)
|
|
|
result.extend(merged_lines)
|
|
|
else:
|
|
|
# Real conflict - add conflict markers
|
|
@@ -165,31 +73,141 @@ class Merger:
|
|
|
|
|
|
return b"".join(result)
|
|
|
|
|
|
- def _can_merge_lines(
|
|
|
- self, base_lines: list[bytes], a_lines: list[bytes], b_lines: list[bytes]
|
|
|
- ) -> bool:
|
|
|
- """Check if lines can be merged without conflict."""
|
|
|
- # If one side is unchanged, we can take the other side
|
|
|
- if base_lines == a_lines:
|
|
|
- return True
|
|
|
- elif base_lines == b_lines:
|
|
|
- return True
|
|
|
+
|
|
|
+def _merge_lines(
|
|
|
+ base_lines: list[bytes], a_lines: list[bytes], b_lines: list[bytes]
|
|
|
+) -> list[bytes]:
|
|
|
+ """Merge lines when possible."""
|
|
|
+ if base_lines == a_lines:
|
|
|
+ return b_lines
|
|
|
+ elif base_lines == b_lines:
|
|
|
+ return a_lines
|
|
|
+ else:
|
|
|
+ # This shouldn't happen if _can_merge_lines returned True
|
|
|
+ return a_lines
|
|
|
+
|
|
|
+
|
|
|
+def merge_blobs(
|
|
|
+ base_blob: Optional[Blob],
|
|
|
+ ours_blob: Optional[Blob],
|
|
|
+ theirs_blob: Optional[Blob],
|
|
|
+) -> tuple[bytes, bool]:
|
|
|
+ """Perform three-way merge on blob contents.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ base_blob: Common ancestor blob (can be None)
|
|
|
+ ours_blob: Our version of the blob (can be None)
|
|
|
+ theirs_blob: Their version of the blob (can be None)
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Tuple of (merged_content, had_conflicts)
|
|
|
+ """
|
|
|
+ # Handle deletion cases
|
|
|
+ if ours_blob is None and theirs_blob is None:
|
|
|
+ return b"", False
|
|
|
+
|
|
|
+ if base_blob is None:
|
|
|
+ # No common ancestor
|
|
|
+ if ours_blob is None:
|
|
|
+ assert theirs_blob is not None
|
|
|
+ return theirs_blob.data, False
|
|
|
+ elif theirs_blob is None:
|
|
|
+ return ours_blob.data, False
|
|
|
+ elif ours_blob.data == theirs_blob.data:
|
|
|
+ return ours_blob.data, False
|
|
|
else:
|
|
|
- # For now, treat any difference as a conflict
|
|
|
- # A more sophisticated algorithm would check for non-overlapping changes
|
|
|
- return False
|
|
|
-
|
|
|
- def _merge_lines(
|
|
|
- self, base_lines: list[bytes], a_lines: list[bytes], b_lines: list[bytes]
|
|
|
- ) -> list[bytes]:
|
|
|
- """Merge lines when possible."""
|
|
|
- if base_lines == a_lines:
|
|
|
- return b_lines
|
|
|
- elif base_lines == b_lines:
|
|
|
- return a_lines
|
|
|
+ # Both added different content - conflict
|
|
|
+ m = merge3.Merge3(
|
|
|
+ [],
|
|
|
+ ours_blob.data.splitlines(True),
|
|
|
+ theirs_blob.data.splitlines(True),
|
|
|
+ )
|
|
|
+ return _merge3_to_bytes(m), True
|
|
|
+
|
|
|
+ # Get content for each version
|
|
|
+ base_content = base_blob.data if base_blob else b""
|
|
|
+ ours_content = ours_blob.data if ours_blob else b""
|
|
|
+ theirs_content = theirs_blob.data if theirs_blob else b""
|
|
|
+
|
|
|
+ # Check if either side deleted
|
|
|
+ if ours_blob is None or theirs_blob is None:
|
|
|
+ if ours_blob is None and theirs_blob is None:
|
|
|
+ return b"", False
|
|
|
+ elif ours_blob is None:
|
|
|
+ # We deleted, check if they modified
|
|
|
+ if base_content == theirs_content:
|
|
|
+ return b"", False # They didn't modify, accept deletion
|
|
|
+ else:
|
|
|
+ # Conflict: we deleted, they modified
|
|
|
+ m = merge3.Merge3(
|
|
|
+ base_content.splitlines(True),
|
|
|
+ [],
|
|
|
+ theirs_content.splitlines(True),
|
|
|
+ )
|
|
|
+ return _merge3_to_bytes(m), True
|
|
|
else:
|
|
|
- # This shouldn't happen if _can_merge_lines returned True
|
|
|
- return a_lines
|
|
|
+ # They deleted, check if we modified
|
|
|
+ if base_content == ours_content:
|
|
|
+ return b"", False # We didn't modify, accept deletion
|
|
|
+ else:
|
|
|
+ # Conflict: they deleted, we modified
|
|
|
+ m = merge3.Merge3(
|
|
|
+ base_content.splitlines(True),
|
|
|
+ ours_content.splitlines(True),
|
|
|
+ [],
|
|
|
+ )
|
|
|
+ return _merge3_to_bytes(m), True
|
|
|
+
|
|
|
+ # Both sides exist, check if merge is needed
|
|
|
+ if ours_content == theirs_content:
|
|
|
+ return ours_content, False
|
|
|
+ elif base_content == ours_content:
|
|
|
+ return theirs_content, False
|
|
|
+ elif base_content == theirs_content:
|
|
|
+ return ours_content, False
|
|
|
+
|
|
|
+ # Perform three-way merge
|
|
|
+ m = merge3.Merge3(
|
|
|
+ base_content.splitlines(True),
|
|
|
+ ours_content.splitlines(True),
|
|
|
+ theirs_content.splitlines(True),
|
|
|
+ )
|
|
|
+
|
|
|
+ # Check for conflicts and generate merged content
|
|
|
+ merged_content = _merge3_to_bytes(m)
|
|
|
+ has_conflicts = b"<<<<<<< ours" in merged_content
|
|
|
+
|
|
|
+ return merged_content, has_conflicts
|
|
|
+
|
|
|
+
|
|
|
+class Merger:
|
|
|
+ """Handles git merge operations."""
|
|
|
+
|
|
|
+ def __init__(self, object_store: BaseObjectStore):
|
|
|
+ """Initialize merger.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ object_store: Object store to read objects from
|
|
|
+ """
|
|
|
+ self.object_store = object_store
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def merge_blobs(
|
|
|
+ base_blob: Optional[Blob],
|
|
|
+ ours_blob: Optional[Blob],
|
|
|
+ theirs_blob: Optional[Blob],
|
|
|
+ ) -> tuple[bytes, bool]:
|
|
|
+ """Perform three-way merge on blob contents.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ base_blob: Common ancestor blob (can be None)
|
|
|
+ ours_blob: Our version of the blob (can be None)
|
|
|
+ theirs_blob: Their version of the blob (can be None)
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Tuple of (merged_content, had_conflicts)
|
|
|
+ """
|
|
|
+ return merge_blobs(base_blob, ours_blob, theirs_blob)
|
|
|
|
|
|
def merge_trees(
|
|
|
self, base_tree: Optional[Tree], ours_tree: Tree, theirs_tree: Tree
|