浏览代码

Add porcelain.checkout_branch (#1148)

Co-authored-by: DedSecer <z2465216809qq.com@icloud.com>
Hugh Sorby 2 年之前
父节点
当前提交
4618608e1c
共有 3 个文件被更改,包括 463 次插入6 次删除
  1. 112 6
      dulwich/porcelain.py
  2. 1 0
      dulwich/refs.py
  3. 350 0
      dulwich/tests/test_porcelain.py

+ 112 - 6
dulwich/porcelain.py

@@ -25,7 +25,7 @@ Currently implemented:
  * add
  * branch{_create,_delete,_list}
  * check-ignore
- * checkout
+ * checkout_branch
  * clone
  * commit
  * commit-tree
@@ -82,20 +82,21 @@ from dulwich.diff_tree import (CHANGE_ADD, CHANGE_COPY, CHANGE_DELETE,
                                CHANGE_MODIFY, CHANGE_RENAME,
                                RENAME_CHANGE_TYPES)
 from dulwich.errors import SendPackError
+from dulwich.file import ensure_dir_exists
 from dulwich.graph import can_fast_forward
 from dulwich.ignore import IgnoreFilterManager
 from dulwich.index import (_fs_to_tree_path, blob_from_path_and_stat,
-                           build_file_from_blob, get_unstaged_changes)
-from dulwich.object_store import tree_lookup_path
+                           build_file_from_blob, get_unstaged_changes, index_entry_from_stat)
+from dulwich.object_store import tree_lookup_path, iter_tree_contents
 from dulwich.objects import (Commit, Tag, format_timezone, parse_timezone,
                              pretty_format_tree_entry)
 from dulwich.objectspec import (parse_commit, parse_object, parse_ref,
-                                parse_reftuples, parse_tree)
+                                parse_reftuples, parse_tree, to_bytes)
 from dulwich.pack import write_pack_from_container, write_pack_index
 from dulwich.patch import write_tree_diff
 from dulwich.protocol import ZERO_SHA, Protocol
-from dulwich.refs import (LOCAL_BRANCH_PREFIX, LOCAL_TAG_PREFIX,
-                          _import_remote_refs)
+from dulwich.refs import (LOCAL_BRANCH_PREFIX, LOCAL_REMOTE_PREFIX,
+                          LOCAL_TAG_PREFIX, _import_remote_refs)
 from dulwich.repo import BaseRepo, Repo
 from dulwich.server import (FileSystemBackend, ReceivePackHandler,
                             TCPGitServer, UploadPackHandler)
@@ -143,6 +144,10 @@ class TimezoneFormatError(Error):
     """Raised when the timezone cannot be determined from a given string."""
 
 
+class CheckoutError(Error):
+    """Indicates that a checkout cannot be performed."""
+
+
 def parse_timezone_format(tz_str):
     """Parse given string and attempt to return a timezone offset.
 
@@ -1859,6 +1864,107 @@ def reset_file(repo, file_path: str, target: bytes = b'HEAD',
     build_file_from_blob(blob, mode, full_path, symlink_fn=symlink_fn)
 
 
+def _update_head_during_checkout_branch(repo, target):
+    checkout_target = None
+    if target == b'HEAD':  # Do not update head while trying to checkout to HEAD.
+        pass
+    elif target in repo.refs.keys(base=LOCAL_BRANCH_PREFIX):
+        update_head(repo, target)
+    else:
+        # If checking out a remote branch, create a local one without the remote name prefix.
+        config = repo.get_config()
+        name = target.split(b"/")[0]
+        section = (b"remote", name)
+        if config.has_section(section):
+            checkout_target = target.replace(name + b"/", b"")
+            try:
+                branch_create(repo, checkout_target, (LOCAL_REMOTE_PREFIX + target).decode())
+            except Error:
+                pass
+            update_head(repo, LOCAL_BRANCH_PREFIX + checkout_target)
+        else:
+            update_head(repo, target, detached=True)
+
+    return checkout_target
+
+
+def checkout_branch(repo, target: Union[bytes, str], force: bool = False):
+    """switch branches or restore working tree files.
+
+    The implementation of this function will probably not scale well
+    for branches with lots of local changes.
+    This is due to the analysis of a diff between branches before any
+    changes are applied.
+
+    Args:
+      repo: dulwich Repo object
+      target: branch name or commit sha to checkout
+      force: true or not to force checkout
+    """
+    target = to_bytes(target)
+
+    current_tree = parse_tree(repo, repo.head())
+    target_tree = parse_tree(repo, target)
+
+    if force:
+        repo.reset_index(target_tree.id)
+        _update_head_during_checkout_branch(repo, target)
+    else:
+        status_report = status(repo)
+        changes = list(set(status_report[0]['add'] + status_report[0]['delete'] + status_report[0]['modify'] + status_report[1]))
+        index = 0
+        while index < len(changes):
+            change = changes[index]
+            try:
+                current_tree.lookup_path(repo.object_store.__getitem__, change)
+                try:
+                    target_tree.lookup_path(repo.object_store.__getitem__, change)
+                    index += 1
+                except KeyError:
+                    raise CheckoutError('Your local changes to the following files would be overwritten by checkout: ' + change.decode())
+            except KeyError:
+                changes.pop(index)
+
+        # Update head.
+        checkout_target = _update_head_during_checkout_branch(repo, target)
+        if checkout_target is not None:
+            target_tree = parse_tree(repo, checkout_target)
+
+        dealt_with = set()
+        repo_index = repo.open_index()
+        for entry in iter_tree_contents(repo.object_store, target_tree.id):
+            dealt_with.add(entry.path)
+            if entry.path in changes:
+                continue
+            full_path = os.path.join(os.fsencode(repo.path), entry.path)
+            blob = repo.object_store[entry.sha]
+            ensure_dir_exists(os.path.dirname(full_path))
+            st = build_file_from_blob(blob, entry.mode, full_path)
+            repo_index[entry.path] = index_entry_from_stat(st, entry.sha, 0)
+
+        repo_index.write()
+
+        for entry in iter_tree_contents(repo.object_store, current_tree.id):
+            if entry.path not in dealt_with:
+                repo.unstage([entry.path])
+
+    # Remove the untracked files which are in the current_file_set.
+    repo_index = repo.open_index()
+    for change in repo_index.changes_from_tree(repo.object_store, current_tree.id):
+        path_change = change[0]
+        if path_change[1] is None:
+            file_name = path_change[0]
+            full_path = os.path.join(repo.path, file_name.decode())
+            if os.path.isfile(full_path):
+                os.remove(full_path)
+            dir_path = os.path.dirname(full_path)
+            while dir_path != repo.path:
+                is_empty = len(os.listdir(dir_path)) == 0
+                if is_empty:
+                    os.rmdir(dir_path)
+                dir_path = os.path.dirname(dir_path)
+
+
 def check_mailmap(repo, contact):
     """Check canonical name and email of contact.
 

+ 1 - 0
dulwich/refs.py

@@ -38,6 +38,7 @@ HEADREF = b"HEAD"
 SYMREF = b"ref: "
 LOCAL_BRANCH_PREFIX = b"refs/heads/"
 LOCAL_TAG_PREFIX = b"refs/tags/"
+LOCAL_REMOTE_PREFIX = b"refs/remotes/"
 BAD_REF_CHARS = set(b"\177 ~^:?*[")
 PEELED_TAG_SUFFIX = b"^{}"
 

+ 350 - 0
dulwich/tests/test_porcelain.py

@@ -39,6 +39,7 @@ from dulwich import porcelain
 from dulwich.diff_tree import tree_changes
 from dulwich.errors import CommitError
 from dulwich.objects import ZERO_SHA, Blob, Tag, Tree
+from dulwich.porcelain import CheckoutError
 from dulwich.repo import NoIndexPresent, Repo
 from dulwich.server import DictBackend
 from dulwich.tests import TestCase
@@ -1577,10 +1578,359 @@ class ResetFileTests(PorcelainTestCase):
             author=b"John <john@example.com>",
         )
         porcelain.reset_file(self.repo, os.path.join('new_dir', 'foo'), target=sha)
+
         with open(full_path) as f:
             self.assertEqual('hello', f.read())
 
 
+def _commit_file_with_content(repo, filename, content):
+    file_path = os.path.join(repo.path, filename)
+
+    with open(file_path, 'w') as f:
+        f.write(content)
+    porcelain.add(repo, paths=[file_path])
+    sha = porcelain.commit(
+        repo,
+        message=b"add " + filename.encode(),
+        committer=b"Jane <jane@example.com>",
+        author=b"John <john@example.com>",
+    )
+
+    return sha, file_path
+
+
+class CheckoutTests(PorcelainTestCase):
+
+    def setUp(self):
+        super().setUp()
+        self._sha, self._foo_path = _commit_file_with_content(self.repo, 'foo', 'hello\n')
+        porcelain.branch_create(self.repo, 'uni')
+
+    def test_checkout_to_existing_branch(self):
+        self.assertEqual(b"master", porcelain.active_branch(self.repo))
+        porcelain.checkout_branch(self.repo, b'uni')
+        self.assertEqual(b"uni", porcelain.active_branch(self.repo))
+
+    def test_checkout_to_non_existing_branch(self):
+        self.assertEqual(b"master", porcelain.active_branch(self.repo))
+
+        with self.assertRaises(KeyError):
+            porcelain.checkout_branch(self.repo, b'bob')
+
+        self.assertEqual(b"master", porcelain.active_branch(self.repo))
+
+    def test_checkout_to_branch_with_modified_files(self):
+        with open(self._foo_path, 'a') as f:
+            f.write('new message\n')
+        porcelain.add(self.repo, paths=[self._foo_path])
+
+        status = list(porcelain.status(self.repo))
+        self.assertEqual([{'add': [], 'delete': [], 'modify': [b"foo"]}, [], []], status)
+
+        # Both branches have file 'foo' checkout should be fine.
+        porcelain.checkout_branch(self.repo, b'uni')
+        self.assertEqual(b"uni", porcelain.active_branch(self.repo))
+
+        status = list(porcelain.status(self.repo))
+        self.assertEqual([{'add': [], 'delete': [], 'modify': [b"foo"]}, [], []], status)
+
+    def test_checkout_with_deleted_files(self):
+        porcelain.remove(self.repo.path, [os.path.join(self.repo.path, 'foo')])
+        status = list(porcelain.status(self.repo))
+        self.assertEqual([{'add': [], 'delete': [b'foo'], 'modify': []}, [], []], status)
+
+        # Both branches have file 'foo' checkout should be fine.
+        porcelain.checkout_branch(self.repo, b'uni')
+        self.assertEqual(b"uni", porcelain.active_branch(self.repo))
+
+        status = list(porcelain.status(self.repo))
+        self.assertEqual([{'add': [], 'delete': [b"foo"], 'modify': []}, [], []], status)
+
+    def test_checkout_to_branch_with_added_files(self):
+        file_path = os.path.join(self.repo.path, 'bar')
+
+        with open(file_path, 'w') as f:
+            f.write('bar content\n')
+        porcelain.add(self.repo, paths=[file_path])
+        status = list(porcelain.status(self.repo))
+        self.assertEqual([{'add': [b'bar'], 'delete': [], 'modify': []}, [], []], status)
+
+        # Both branches have file 'foo' checkout should be fine.
+        porcelain.checkout_branch(self.repo, b'uni')
+        self.assertEqual(b'uni', porcelain.active_branch(self.repo))
+
+        status = list(porcelain.status(self.repo))
+        self.assertEqual([{'add': [b'bar'], 'delete': [], 'modify': []}, [], []], status)
+
+    def test_checkout_to_branch_with_modified_file_not_present(self):
+        # Commit a new file that the other branch doesn't have.
+        _, nee_path = _commit_file_with_content(self.repo, 'nee', 'Good content\n')
+
+        # Modify the file the other branch doesn't have.
+        with open(nee_path, 'a') as f:
+            f.write('bar content\n')
+        porcelain.add(self.repo, paths=[nee_path])
+        status = list(porcelain.status(self.repo))
+        self.assertEqual([{'add': [], 'delete': [], 'modify': [b'nee']}, [], []], status)
+
+        # 'uni' branch doesn't have 'nee' and it has been modified, should result in the checkout being aborted.
+        with self.assertRaises(CheckoutError):
+            porcelain.checkout_branch(self.repo, b'uni')
+
+        self.assertEqual(b'master', porcelain.active_branch(self.repo))
+
+        status = list(porcelain.status(self.repo))
+        self.assertEqual([{'add': [], 'delete': [], 'modify': [b'nee']}, [], []], status)
+
+    def test_checkout_to_branch_with_modified_file_not_present_forced(self):
+        # Commit a new file that the other branch doesn't have.
+        _, nee_path = _commit_file_with_content(self.repo, 'nee', 'Good content\n')
+
+        # Modify the file the other branch doesn't have.
+        with open(nee_path, 'a') as f:
+            f.write('bar content\n')
+        porcelain.add(self.repo, paths=[nee_path])
+        status = list(porcelain.status(self.repo))
+        self.assertEqual([{'add': [], 'delete': [], 'modify': [b'nee']}, [], []], status)
+
+        # 'uni' branch doesn't have 'nee' and it has been modified, but we force to reset the entire index.
+        porcelain.checkout_branch(self.repo, b'uni', force=True)
+
+        self.assertEqual(b'uni', porcelain.active_branch(self.repo))
+
+        status = list(porcelain.status(self.repo))
+        self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [], []], status)
+
+    def test_checkout_to_branch_with_unstaged_files(self):
+        # Edit `foo`.
+        with open(self._foo_path, 'a') as f:
+            f.write('new message')
+
+        status = list(porcelain.status(self.repo))
+        self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [b'foo'], []], status)
+
+        porcelain.checkout_branch(self.repo, b'uni')
+
+        status = list(porcelain.status(self.repo))
+        self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [b'foo'], []], status)
+
+    def test_checkout_to_branch_with_untracked_files(self):
+        with open(os.path.join(self.repo.path, 'neu'), 'a') as f:
+            f.write('new message\n')
+
+        status = list(porcelain.status(self.repo))
+        self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [], ['neu']], status)
+
+        porcelain.checkout_branch(self.repo, b'uni')
+
+        status = list(porcelain.status(self.repo))
+        self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [], ['neu']], status)
+
+    def test_checkout_to_branch_with_new_files(self):
+        porcelain.checkout_branch(self.repo, b'uni')
+        sub_directory = os.path.join(self.repo.path, 'sub1')
+        os.mkdir(sub_directory)
+        for index in range(5):
+            _commit_file_with_content(self.repo, 'new_file_' + str(index + 1), "Some content\n")
+            _commit_file_with_content(self.repo, os.path.join('sub1', 'new_file_' + str(index + 10)), "Good content\n")
+
+        status = list(porcelain.status(self.repo))
+        self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [], []], status)
+
+        porcelain.checkout_branch(self.repo, b'master')
+        self.assertEqual(b"master", porcelain.active_branch(self.repo))
+        status = list(porcelain.status(self.repo))
+        self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [], []], status)
+
+        porcelain.checkout_branch(self.repo, b'uni')
+        self.assertEqual(b"uni", porcelain.active_branch(self.repo))
+        status = list(porcelain.status(self.repo))
+        self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [], []], status)
+
+    def test_checkout_to_branch_with_file_in_sub_directory(self):
+        sub_directory = os.path.join(self.repo.path, 'sub1', 'sub2')
+        os.makedirs(sub_directory)
+
+        sub_directory_file = os.path.join(sub_directory, 'neu')
+        with open(sub_directory_file, 'w') as f:
+            f.write('new message\n')
+
+        porcelain.add(self.repo, paths=[sub_directory_file])
+        porcelain.commit(
+            self.repo,
+            message=b"add " + sub_directory_file.encode(),
+            committer=b"Jane <jane@example.com>",
+            author=b"John <john@example.com>",
+        )
+        status = list(porcelain.status(self.repo))
+        self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [], []], status)
+
+        self.assertTrue(os.path.isdir(sub_directory))
+        self.assertTrue(os.path.isdir(os.path.dirname(sub_directory)))
+
+        porcelain.checkout_branch(self.repo, b'uni')
+
+        status = list(porcelain.status(self.repo))
+        self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [], []], status)
+
+        self.assertFalse(os.path.isdir(sub_directory))
+        self.assertFalse(os.path.isdir(os.path.dirname(sub_directory)))
+
+        porcelain.checkout_branch(self.repo, b'master')
+
+        self.assertTrue(os.path.isdir(sub_directory))
+        self.assertTrue(os.path.isdir(os.path.dirname(sub_directory)))
+
+    def test_checkout_to_branch_with_multiple_files_in_sub_directory(self):
+        sub_directory = os.path.join(self.repo.path, 'sub1', 'sub2')
+        os.makedirs(sub_directory)
+
+        sub_directory_file_1 = os.path.join(sub_directory, 'neu')
+        with open(sub_directory_file_1, 'w') as f:
+            f.write('new message\n')
+
+        sub_directory_file_2 = os.path.join(sub_directory, 'gus')
+        with open(sub_directory_file_2, 'w') as f:
+            f.write('alternative message\n')
+
+        porcelain.add(self.repo, paths=[sub_directory_file_1, sub_directory_file_2])
+        porcelain.commit(
+            self.repo,
+            message=b"add files neu and gus.",
+            committer=b"Jane <jane@example.com>",
+            author=b"John <john@example.com>",
+        )
+        status = list(porcelain.status(self.repo))
+        self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [], []], status)
+
+        self.assertTrue(os.path.isdir(sub_directory))
+        self.assertTrue(os.path.isdir(os.path.dirname(sub_directory)))
+
+        porcelain.checkout_branch(self.repo, b'uni')
+
+        status = list(porcelain.status(self.repo))
+        self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [], []], status)
+
+        self.assertFalse(os.path.isdir(sub_directory))
+        self.assertFalse(os.path.isdir(os.path.dirname(sub_directory)))
+
+    def _commit_something_wrong(self):
+        with open(self._foo_path, 'a') as f:
+            f.write('something wrong')
+
+        porcelain.add(self.repo, paths=[self._foo_path])
+        return porcelain.commit(
+            self.repo,
+            message=b"I may added something wrong",
+            committer=b"Jane <jane@example.com>",
+            author=b"John <john@example.com>",
+        )
+
+    def test_checkout_to_commit_sha(self):
+        self._commit_something_wrong()
+
+        porcelain.checkout_branch(self.repo, self._sha)
+        self.assertEqual(self._sha, self.repo.head())
+
+    def test_checkout_to_head(self):
+        new_sha = self._commit_something_wrong()
+
+        porcelain.checkout_branch(self.repo, b"HEAD")
+        self.assertEqual(new_sha, self.repo.head())
+
+    def _checkout_remote_branch(self):
+        errstream = BytesIO()
+        outstream = BytesIO()
+
+        porcelain.commit(
+            repo=self.repo.path,
+            message=b"init",
+            author=b"author <email>",
+            committer=b"committer <email>",
+        )
+
+        # Setup target repo cloned from temp test repo
+        clone_path = tempfile.mkdtemp()
+        self.addCleanup(shutil.rmtree, clone_path)
+        target_repo = porcelain.clone(
+            self.repo.path, target=clone_path, errstream=errstream
+        )
+        try:
+            self.assertEqual(target_repo[b"HEAD"], self.repo[b"HEAD"])
+        finally:
+            target_repo.close()
+
+        # create a second file to be pushed back to origin
+        handle, fullpath = tempfile.mkstemp(dir=clone_path)
+        os.close(handle)
+        porcelain.add(repo=clone_path, paths=[fullpath])
+        porcelain.commit(
+            repo=clone_path,
+            message=b"push",
+            author=b"author <email>",
+            committer=b"committer <email>",
+        )
+
+        # Setup a non-checked out branch in the remote
+        refs_path = b"refs/heads/foo"
+        new_id = self.repo[b"HEAD"].id
+        self.assertNotEqual(new_id, ZERO_SHA)
+        self.repo.refs[refs_path] = new_id
+
+        # Push to the remote
+        porcelain.push(
+            clone_path,
+            "origin",
+            b"HEAD:" + refs_path,
+            outstream=outstream,
+            errstream=errstream,
+        )
+
+        self.assertEqual(
+            target_repo.refs[b"refs/remotes/origin/foo"],
+            target_repo.refs[b"HEAD"],
+        )
+
+        porcelain.checkout_branch(target_repo, b"origin/foo")
+        original_id = target_repo[b"HEAD"].id
+        uni_id = target_repo[b"refs/remotes/origin/uni"].id
+
+        expected_refs = {
+            b"HEAD": original_id,
+            b"refs/heads/master": original_id,
+            b"refs/heads/foo": original_id,
+            b"refs/remotes/origin/foo": original_id,
+            b"refs/remotes/origin/uni": uni_id,
+            b"refs/remotes/origin/HEAD": new_id,
+            b"refs/remotes/origin/master": new_id,
+        }
+        self.assertEqual(expected_refs, target_repo.get_refs())
+
+        return target_repo
+
+    def test_checkout_remote_branch(self):
+        repo = self._checkout_remote_branch()
+        repo.close()
+
+    def test_checkout_remote_branch_then_master_then_remote_branch_again(self):
+        target_repo = self._checkout_remote_branch()
+        self.assertEqual(b"foo", porcelain.active_branch(target_repo))
+        _commit_file_with_content(target_repo, 'bar', 'something\n')
+        self.assertTrue(os.path.isfile(os.path.join(target_repo.path, 'bar')))
+
+        porcelain.checkout_branch(target_repo, b"master")
+
+        self.assertEqual(b"master", porcelain.active_branch(target_repo))
+        self.assertFalse(os.path.isfile(os.path.join(target_repo.path, 'bar')))
+
+        porcelain.checkout_branch(target_repo, b"origin/foo")
+
+        self.assertEqual(b"foo", porcelain.active_branch(target_repo))
+        self.assertTrue(os.path.isfile(os.path.join(target_repo.path, 'bar')))
+
+        target_repo.close()
+
+
 class SubmoduleTests(PorcelainTestCase):
 
     def test_empty(self):