Browse Source

porcelain clean (issue 398) (#690)

Implement a basic dulwich.porcelain.clean (#398)
Lane Barlow 6 years ago
parent
commit
65a7a610da
2 changed files with 198 additions and 6 deletions
  1. 71 6
      dulwich/porcelain.py
  2. 127 0
      dulwich/tests/test_porcelain.py

+ 71 - 6
dulwich/porcelain.py

@@ -408,6 +408,57 @@ def add(repo=".", paths=None):
     return (relpaths, ignored)
 
 
+def _is_subdir(subdir, parentdir):
+    """Check whether subdir is parentdir or a subdir of parentdir
+
+        If parentdir or subdir is a relative path, it will be disamgibuated
+        relative to the pwd.
+    """
+    parentdir_abs = os.path.realpath(parentdir) + os.path.sep
+    subdir_abs = os.path.realpath(subdir) + os.path.sep
+    return subdir_abs.startswith(parentdir_abs)
+
+
+# TODO: option to remove ignored files also, in line with `git clean -fdx`
+def clean(repo=".", target_dir=None):
+    """Remove any untracked files from the target directory recursively
+
+    Equivalent to running `git clean -fd` in target_dir.
+
+    :param repo: Repository where the files may be tracked
+    :param target_dir: Directory to clean - current directory if None
+    """
+    if target_dir is None:
+        target_dir = os.getcwd()
+
+    if not _is_subdir(target_dir, repo):
+        raise ValueError("target_dir must be in the repo's working dir")
+
+    with open_repo_closing(repo) as r:
+        index = r.open_index()
+        ignore_manager = IgnoreFilterManager.from_repo(r)
+
+        paths_in_wd = _walk_working_dir_paths(target_dir, r.path)
+        # Reverse file visit order, so that files and subdirectories are
+        # removed before containing directory
+        for ap, is_dir in reversed(list(paths_in_wd)):
+            if is_dir:
+                # All subdirectories and files have been removed if untracked,
+                # so dir contains no tracked files iff it is empty.
+                is_empty = len(os.listdir(ap)) == 0
+                if is_empty:
+                    os.rmdir(ap)
+            else:
+                ip = path_to_tree_path(r.path, ap)
+                is_tracked = ip in index
+
+                rp = os.path.relpath(ap, r.path)
+                is_ignored = ignore_manager.is_ignored(rp)
+
+                if not is_tracked and not is_ignored:
+                    os.remove(ap)
+
+
 def remove(repo=".", paths=None, cached=False):
     """Remove files from the staging area.
 
@@ -900,14 +951,12 @@ def status(repo=".", ignored=False):
         return GitStatus(tracked_changes, unstaged_changes, untracked_changes)
 
 
-def get_untracked_paths(frompath, basepath, index):
-    """Get untracked paths.
+def _walk_working_dir_paths(frompath, basepath):
+    """Get path, is_dir for files in working dir from frompath
 
-    ;param frompath: Path to walk
+    :param frompath: Path to begin walk
     :param basepath: Path to compare to
-    :param index: Index to check against
     """
-    # If nothing is specified, add all non-ignored files.
     for dirpath, dirnames, filenames in os.walk(frompath):
         # Skip .git and below.
         if '.git' in dirnames:
@@ -918,8 +967,24 @@ def get_untracked_paths(frompath, basepath, index):
             filenames.remove('.git')
             if dirpath != basepath:
                 continue
+
+        if dirpath != frompath:
+            yield dirpath, True
+
         for filename in filenames:
-            ap = os.path.join(dirpath, filename)
+            filepath = os.path.join(dirpath, filename)
+            yield filepath, False
+
+
+def get_untracked_paths(frompath, basepath, index):
+    """Get untracked paths.
+
+    ;param frompath: Path to walk
+    :param basepath: Path to compare to
+    :param index: Index to check against
+    """
+    for ap, is_dir in _walk_working_dir_paths(frompath, basepath):
+        if not is_dir:
             ip = path_to_tree_path(basepath, ap)
             if ip not in index:
                 yield os.path.relpath(ap, frompath)

+ 127 - 0
dulwich/tests/test_porcelain.py

@@ -25,6 +25,7 @@ try:
     from StringIO import StringIO
 except ImportError:
     from io import StringIO
+import errno
 import os
 import shutil
 import tarfile
@@ -53,6 +54,18 @@ from dulwich.tests.utils import (
     )
 
 
+def flat_walk_dir(dir_to_walk):
+    for dirpath, _, filenames in os.walk(dir_to_walk):
+        rel_dirpath = os.path.relpath(dirpath, dir_to_walk)
+        if not dirpath == dir_to_walk:
+            yield rel_dirpath
+        for filename in filenames:
+            if dirpath == dir_to_walk:
+                yield filename
+            else:
+                yield os.path.join(rel_dirpath, filename)
+
+
 class PorcelainTestCase(TestCase):
 
     def setUp(self):
@@ -116,6 +129,120 @@ class CommitTests(PorcelainTestCase):
         self.assertEqual(len(sha), 40)
 
 
+class CleanTests(PorcelainTestCase):
+    def path_in_wd(self, name):
+        """Get path of file in wd
+        """
+        return os.path.join(self.repo.path, name)
+
+    def put_files(self, tracked, ignored, untracked, empty_dirs):
+        """Put the described files in the wd
+        """
+        all_files = tracked | ignored | untracked
+        for file_path in all_files:
+            abs_path = self.path_in_wd(file_path)
+            # File may need to be written in a dir that doesn't exist yet, so
+            # create the parent dir(s) as necessary
+            parent_dir = os.path.dirname(abs_path)
+            try:
+                os.makedirs(parent_dir)
+            except OSError as err:
+                if not err.errno == errno.EEXIST:
+                    raise err
+            with open(abs_path, 'w') as f:
+                f.write('')
+
+        with open(self.path_in_wd('.gitignore'), 'w') as f:
+            f.writelines(ignored)
+
+        for dir_path in empty_dirs:
+            os.mkdir(self.path_in_wd('empty_dir'))
+
+        files_to_add = [self.path_in_wd(t) for t in tracked]
+        porcelain.add(repo=self.repo.path, paths=files_to_add)
+        porcelain.commit(repo=self.repo.path, message="init commit")
+
+    def clean(self, target_dir):
+        """Clean the target_dir and assert control dir unchanged
+        """
+        controldir = self.repo._controldir
+
+        controldir_before = set(flat_walk_dir(controldir))
+        porcelain.clean(repo=self.repo.path, target_dir=target_dir)
+        controldir_after = set(flat_walk_dir(controldir))
+
+        self.assertEqual(controldir_after, controldir_before)
+
+    def assert_wd(self, expected_paths):
+        """Assert paths of files and dirs in wd are same as expected_paths
+        """
+        control_dir = self.repo._controldir
+        control_dir_rel = os.path.relpath(control_dir, self.repo.path)
+
+        # normalize paths to simplify comparison across platforms
+        from os.path import normpath
+        found_paths = {
+            normpath(p)
+            for p in flat_walk_dir(self.repo.path)
+            if not p.split(os.sep)[0] == control_dir_rel}
+        norm_expected_paths = {normpath(p) for p in expected_paths}
+        self.assertEqual(found_paths, norm_expected_paths)
+
+    def test_from_root(self):
+        self.put_files(
+            tracked={
+                'tracked_file',
+                'tracked_dir/tracked_file',
+                '.gitignore'},
+            ignored={
+                'ignored_file'},
+            untracked={
+                'untracked_file',
+                'tracked_dir/untracked_dir/untracked_file',
+                'untracked_dir/untracked_dir/untracked_file'},
+            empty_dirs={
+                'empty_dir'})
+
+        self.clean(self.repo.path)
+
+        self.assert_wd({
+            'tracked_file',
+            'tracked_dir/tracked_file',
+            '.gitignore',
+            'ignored_file',
+            'tracked_dir'})
+
+    def test_from_subdir(self):
+        self.put_files(
+            tracked={
+                'tracked_file',
+                'tracked_dir/tracked_file',
+                '.gitignore'},
+            ignored={
+                'ignored_file'},
+            untracked={
+                'untracked_file',
+                'tracked_dir/untracked_dir/untracked_file',
+                'untracked_dir/untracked_dir/untracked_file'},
+            empty_dirs={
+                'empty_dir'})
+
+        target_dir = self.path_in_wd('untracked_dir')
+        self.clean(target_dir)
+
+        self.assert_wd({
+            'tracked_file',
+            'tracked_dir/tracked_file',
+            '.gitignore',
+            'ignored_file',
+            'untracked_file',
+            'tracked_dir/untracked_dir/untracked_file',
+            'empty_dir',
+            'untracked_dir',
+            'tracked_dir',
+            'tracked_dir/untracked_dir'})
+
+
 class CloneTests(PorcelainTestCase):
 
     def test_simple_local(self):