|
@@ -1459,6 +1459,393 @@ class SymbolicRefCommandTest(DulwichCliTestCase):
|
|
|
)
|
|
|
|
|
|
|
|
|
+class BundleCommandTest(DulwichCliTestCase):
|
|
|
+ """Tests for bundle commands."""
|
|
|
+
|
|
|
+ def setUp(self):
|
|
|
+ super().setUp()
|
|
|
+ # Create a basic repository with some commits for bundle testing
|
|
|
+ # Create initial commit
|
|
|
+ test_file = os.path.join(self.repo_path, "file1.txt")
|
|
|
+ with open(test_file, "w") as f:
|
|
|
+ f.write("Content of file1\n")
|
|
|
+ self._run_cli("add", "file1.txt")
|
|
|
+ self._run_cli("commit", "--message=Initial commit")
|
|
|
+
|
|
|
+ # Create second commit
|
|
|
+ test_file2 = os.path.join(self.repo_path, "file2.txt")
|
|
|
+ with open(test_file2, "w") as f:
|
|
|
+ f.write("Content of file2\n")
|
|
|
+ self._run_cli("add", "file2.txt")
|
|
|
+ self._run_cli("commit", "--message=Add file2")
|
|
|
+
|
|
|
+ # Create a branch and tag for testing
|
|
|
+ self._run_cli("branch", "feature")
|
|
|
+ self._run_cli("tag", "v1.0")
|
|
|
+
|
|
|
+ def test_bundle_create_basic(self):
|
|
|
+ """Test basic bundle creation."""
|
|
|
+ bundle_file = os.path.join(self.test_dir, "test.bundle")
|
|
|
+
|
|
|
+ result, stdout, stderr = self._run_cli("bundle", "create", bundle_file, "HEAD")
|
|
|
+ self.assertEqual(result, 0)
|
|
|
+ self.assertTrue(os.path.exists(bundle_file))
|
|
|
+ self.assertGreater(os.path.getsize(bundle_file), 0)
|
|
|
+
|
|
|
+ def test_bundle_create_all_refs(self):
|
|
|
+ """Test bundle creation with --all flag."""
|
|
|
+ bundle_file = os.path.join(self.test_dir, "all.bundle")
|
|
|
+
|
|
|
+ result, stdout, stderr = self._run_cli("bundle", "create", "--all", bundle_file)
|
|
|
+ self.assertEqual(result, 0)
|
|
|
+ self.assertTrue(os.path.exists(bundle_file))
|
|
|
+
|
|
|
+ def test_bundle_create_specific_refs(self):
|
|
|
+ """Test bundle creation with specific refs."""
|
|
|
+ bundle_file = os.path.join(self.test_dir, "refs.bundle")
|
|
|
+
|
|
|
+ # Only use HEAD since feature branch may not exist
|
|
|
+ result, stdout, stderr = self._run_cli("bundle", "create", bundle_file, "HEAD")
|
|
|
+ self.assertEqual(result, 0)
|
|
|
+ self.assertTrue(os.path.exists(bundle_file))
|
|
|
+
|
|
|
+ def test_bundle_create_with_range(self):
|
|
|
+ """Test bundle creation with commit range."""
|
|
|
+ # Get the first commit SHA by looking at the log
|
|
|
+ result, stdout, stderr = self._run_cli("log", "--reverse")
|
|
|
+ lines = stdout.strip().split("\n")
|
|
|
+ # Find first commit line that contains a SHA
|
|
|
+ first_commit = None
|
|
|
+ for line in lines:
|
|
|
+ if line.startswith("commit "):
|
|
|
+ first_commit = line.split()[1][:8] # Get short SHA
|
|
|
+ break
|
|
|
+
|
|
|
+ if first_commit:
|
|
|
+ bundle_file = os.path.join(self.test_dir, "range.bundle")
|
|
|
+
|
|
|
+ result, stdout, stderr = self._run_cli(
|
|
|
+ "bundle", "create", bundle_file, f"{first_commit}..HEAD"
|
|
|
+ )
|
|
|
+ self.assertEqual(result, 0)
|
|
|
+ self.assertTrue(os.path.exists(bundle_file))
|
|
|
+ else:
|
|
|
+ self.skipTest("Could not determine first commit SHA")
|
|
|
+
|
|
|
+ def test_bundle_create_to_stdout(self):
|
|
|
+ """Test bundle creation to stdout."""
|
|
|
+ result, stdout, stderr = self._run_cli("bundle", "create", "-", "HEAD")
|
|
|
+ self.assertEqual(result, 0)
|
|
|
+ self.assertGreater(len(stdout), 0)
|
|
|
+ # Bundle output is binary, so check it's not empty
|
|
|
+ self.assertIsInstance(stdout, (str, bytes))
|
|
|
+
|
|
|
+ def test_bundle_create_no_refs(self):
|
|
|
+ """Test bundle creation with no refs specified."""
|
|
|
+ bundle_file = os.path.join(self.test_dir, "noref.bundle")
|
|
|
+
|
|
|
+ result, stdout, stderr = self._run_cli("bundle", "create", bundle_file)
|
|
|
+ self.assertEqual(result, 1)
|
|
|
+ self.assertIn("No refs specified", stdout)
|
|
|
+
|
|
|
+ def test_bundle_create_empty_bundle_refused(self):
|
|
|
+ """Test that empty bundles are refused."""
|
|
|
+ bundle_file = os.path.join(self.test_dir, "empty.bundle")
|
|
|
+
|
|
|
+ # Try to create bundle with non-existent ref - this should fail with KeyError
|
|
|
+ with self.assertRaises(KeyError):
|
|
|
+ result, stdout, stderr = self._run_cli(
|
|
|
+ "bundle", "create", bundle_file, "nonexistent-ref"
|
|
|
+ )
|
|
|
+
|
|
|
+ def test_bundle_verify_valid(self):
|
|
|
+ """Test bundle verification of valid bundle."""
|
|
|
+ bundle_file = os.path.join(self.test_dir, "valid.bundle")
|
|
|
+
|
|
|
+ # First create a bundle
|
|
|
+ result, stdout, stderr = self._run_cli("bundle", "create", bundle_file, "HEAD")
|
|
|
+ self.assertEqual(result, 0)
|
|
|
+
|
|
|
+ # Now verify it
|
|
|
+ result, stdout, stderr = self._run_cli("bundle", "verify", bundle_file)
|
|
|
+ self.assertEqual(result, 0)
|
|
|
+ self.assertIn("valid and can be applied", stdout)
|
|
|
+
|
|
|
+ def test_bundle_verify_quiet(self):
|
|
|
+ """Test bundle verification with quiet flag."""
|
|
|
+ bundle_file = os.path.join(self.test_dir, "quiet.bundle")
|
|
|
+
|
|
|
+ # Create bundle
|
|
|
+ self._run_cli("bundle", "create", bundle_file, "HEAD")
|
|
|
+
|
|
|
+ # Verify quietly
|
|
|
+ result, stdout, stderr = self._run_cli(
|
|
|
+ "bundle", "verify", "--quiet", bundle_file
|
|
|
+ )
|
|
|
+ self.assertEqual(result, 0)
|
|
|
+ self.assertEqual(stdout.strip(), "")
|
|
|
+
|
|
|
+ def test_bundle_verify_from_stdin(self):
|
|
|
+ """Test bundle verification from stdin."""
|
|
|
+ bundle_file = os.path.join(self.test_dir, "stdin.bundle")
|
|
|
+
|
|
|
+ # Create bundle
|
|
|
+ self._run_cli("bundle", "create", bundle_file, "HEAD")
|
|
|
+
|
|
|
+ # Read bundle content
|
|
|
+ with open(bundle_file, "rb") as f:
|
|
|
+ bundle_content = f.read()
|
|
|
+
|
|
|
+ # Mock stdin with bundle content
|
|
|
+ old_stdin = sys.stdin
|
|
|
+ try:
|
|
|
+ sys.stdin = io.BytesIO(bundle_content)
|
|
|
+ sys.stdin.buffer = sys.stdin
|
|
|
+ result, stdout, stderr = self._run_cli("bundle", "verify", "-")
|
|
|
+ self.assertEqual(result, 0)
|
|
|
+ finally:
|
|
|
+ sys.stdin = old_stdin
|
|
|
+
|
|
|
+ def test_bundle_list_heads(self):
|
|
|
+ """Test listing bundle heads."""
|
|
|
+ bundle_file = os.path.join(self.test_dir, "heads.bundle")
|
|
|
+
|
|
|
+ # Create bundle with HEAD only
|
|
|
+ self._run_cli("bundle", "create", bundle_file, "HEAD")
|
|
|
+
|
|
|
+ # List heads
|
|
|
+ result, stdout, stderr = self._run_cli("bundle", "list-heads", bundle_file)
|
|
|
+ self.assertEqual(result, 0)
|
|
|
+ # Should contain at least the HEAD reference
|
|
|
+ self.assertTrue(len(stdout.strip()) > 0)
|
|
|
+
|
|
|
+ def test_bundle_list_heads_specific_refs(self):
|
|
|
+ """Test listing specific bundle heads."""
|
|
|
+ bundle_file = os.path.join(self.test_dir, "specific.bundle")
|
|
|
+
|
|
|
+ # Create bundle with HEAD
|
|
|
+ self._run_cli("bundle", "create", bundle_file, "HEAD")
|
|
|
+
|
|
|
+ # List heads without filtering
|
|
|
+ result, stdout, stderr = self._run_cli("bundle", "list-heads", bundle_file)
|
|
|
+ self.assertEqual(result, 0)
|
|
|
+ # Should contain some reference
|
|
|
+ self.assertTrue(len(stdout.strip()) > 0)
|
|
|
+
|
|
|
+ def test_bundle_list_heads_from_stdin(self):
|
|
|
+ """Test listing bundle heads from stdin."""
|
|
|
+ bundle_file = os.path.join(self.test_dir, "stdin-heads.bundle")
|
|
|
+
|
|
|
+ # Create bundle
|
|
|
+ self._run_cli("bundle", "create", bundle_file, "HEAD")
|
|
|
+
|
|
|
+ # Read bundle content
|
|
|
+ with open(bundle_file, "rb") as f:
|
|
|
+ bundle_content = f.read()
|
|
|
+
|
|
|
+ # Mock stdin
|
|
|
+ old_stdin = sys.stdin
|
|
|
+ try:
|
|
|
+ sys.stdin = io.BytesIO(bundle_content)
|
|
|
+ sys.stdin.buffer = sys.stdin
|
|
|
+ result, stdout, stderr = self._run_cli("bundle", "list-heads", "-")
|
|
|
+ self.assertEqual(result, 0)
|
|
|
+ finally:
|
|
|
+ sys.stdin = old_stdin
|
|
|
+
|
|
|
+ def test_bundle_unbundle(self):
|
|
|
+ """Test bundle unbundling."""
|
|
|
+ bundle_file = os.path.join(self.test_dir, "unbundle.bundle")
|
|
|
+
|
|
|
+ # Create bundle
|
|
|
+ self._run_cli("bundle", "create", bundle_file, "HEAD")
|
|
|
+
|
|
|
+ # Unbundle
|
|
|
+ result, stdout, stderr = self._run_cli("bundle", "unbundle", bundle_file)
|
|
|
+ self.assertEqual(result, 0)
|
|
|
+
|
|
|
+ def test_bundle_unbundle_specific_refs(self):
|
|
|
+ """Test unbundling specific refs."""
|
|
|
+ bundle_file = os.path.join(self.test_dir, "unbundle-specific.bundle")
|
|
|
+
|
|
|
+ # Create bundle with HEAD
|
|
|
+ self._run_cli("bundle", "create", bundle_file, "HEAD")
|
|
|
+
|
|
|
+ # Unbundle only HEAD
|
|
|
+ result, stdout, stderr = self._run_cli(
|
|
|
+ "bundle", "unbundle", bundle_file, "HEAD"
|
|
|
+ )
|
|
|
+ self.assertEqual(result, 0)
|
|
|
+
|
|
|
+ def test_bundle_unbundle_from_stdin(self):
|
|
|
+ """Test unbundling from stdin."""
|
|
|
+ bundle_file = os.path.join(self.test_dir, "stdin-unbundle.bundle")
|
|
|
+
|
|
|
+ # Create bundle
|
|
|
+ self._run_cli("bundle", "create", bundle_file, "HEAD")
|
|
|
+
|
|
|
+ # Read bundle content to simulate stdin
|
|
|
+ with open(bundle_file, "rb") as f:
|
|
|
+ bundle_content = f.read()
|
|
|
+
|
|
|
+ # Mock stdin with bundle content
|
|
|
+ old_stdin = sys.stdin
|
|
|
+ try:
|
|
|
+ # Create a BytesIO object with buffer attribute
|
|
|
+ mock_stdin = io.BytesIO(bundle_content)
|
|
|
+ mock_stdin.buffer = mock_stdin
|
|
|
+ sys.stdin = mock_stdin
|
|
|
+
|
|
|
+ result, stdout, stderr = self._run_cli("bundle", "unbundle", "-")
|
|
|
+ self.assertEqual(result, 0)
|
|
|
+ finally:
|
|
|
+ sys.stdin = old_stdin
|
|
|
+
|
|
|
+ def test_bundle_unbundle_with_progress(self):
|
|
|
+ """Test unbundling with progress output."""
|
|
|
+ bundle_file = os.path.join(self.test_dir, "progress.bundle")
|
|
|
+
|
|
|
+ # Create bundle
|
|
|
+ self._run_cli("bundle", "create", bundle_file, "HEAD")
|
|
|
+
|
|
|
+ # Unbundle with progress
|
|
|
+ result, stdout, stderr = self._run_cli(
|
|
|
+ "bundle", "unbundle", "--progress", bundle_file
|
|
|
+ )
|
|
|
+ self.assertEqual(result, 0)
|
|
|
+
|
|
|
+ def test_bundle_create_with_progress(self):
|
|
|
+ """Test bundle creation with progress output."""
|
|
|
+ bundle_file = os.path.join(self.test_dir, "create-progress.bundle")
|
|
|
+
|
|
|
+ result, stdout, stderr = self._run_cli(
|
|
|
+ "bundle", "create", "--progress", bundle_file, "HEAD"
|
|
|
+ )
|
|
|
+ self.assertEqual(result, 0)
|
|
|
+ self.assertTrue(os.path.exists(bundle_file))
|
|
|
+
|
|
|
+ def test_bundle_create_with_quiet(self):
|
|
|
+ """Test bundle creation with quiet flag."""
|
|
|
+ bundle_file = os.path.join(self.test_dir, "quiet-create.bundle")
|
|
|
+
|
|
|
+ result, stdout, stderr = self._run_cli(
|
|
|
+ "bundle", "create", "--quiet", bundle_file, "HEAD"
|
|
|
+ )
|
|
|
+ self.assertEqual(result, 0)
|
|
|
+ self.assertTrue(os.path.exists(bundle_file))
|
|
|
+
|
|
|
+ def test_bundle_create_version_2(self):
|
|
|
+ """Test bundle creation with specific version."""
|
|
|
+ bundle_file = os.path.join(self.test_dir, "v2.bundle")
|
|
|
+
|
|
|
+ result, stdout, stderr = self._run_cli(
|
|
|
+ "bundle", "create", "--version", "2", bundle_file, "HEAD"
|
|
|
+ )
|
|
|
+ self.assertEqual(result, 0)
|
|
|
+ self.assertTrue(os.path.exists(bundle_file))
|
|
|
+
|
|
|
+ def test_bundle_create_version_3(self):
|
|
|
+ """Test bundle creation with version 3."""
|
|
|
+ bundle_file = os.path.join(self.test_dir, "v3.bundle")
|
|
|
+
|
|
|
+ result, stdout, stderr = self._run_cli(
|
|
|
+ "bundle", "create", "--version", "3", bundle_file, "HEAD"
|
|
|
+ )
|
|
|
+ self.assertEqual(result, 0)
|
|
|
+ self.assertTrue(os.path.exists(bundle_file))
|
|
|
+
|
|
|
+ def test_bundle_invalid_subcommand(self):
|
|
|
+ """Test invalid bundle subcommand."""
|
|
|
+ result, stdout, stderr = self._run_cli("bundle", "invalid-command")
|
|
|
+ self.assertEqual(result, 1)
|
|
|
+ self.assertIn("Unknown bundle subcommand", stdout)
|
|
|
+
|
|
|
+ def test_bundle_no_subcommand(self):
|
|
|
+ """Test bundle command with no subcommand."""
|
|
|
+ result, stdout, stderr = self._run_cli("bundle")
|
|
|
+ self.assertEqual(result, 1)
|
|
|
+ self.assertIn("Usage: bundle", stdout)
|
|
|
+
|
|
|
+ def test_bundle_create_with_stdin_refs(self):
|
|
|
+ """Test bundle creation reading refs from stdin."""
|
|
|
+ bundle_file = os.path.join(self.test_dir, "stdin-refs.bundle")
|
|
|
+
|
|
|
+ # Mock stdin with refs
|
|
|
+ old_stdin = sys.stdin
|
|
|
+ try:
|
|
|
+ sys.stdin = io.StringIO("master\nfeature\n")
|
|
|
+ result, stdout, stderr = self._run_cli(
|
|
|
+ "bundle", "create", "--stdin", bundle_file
|
|
|
+ )
|
|
|
+ self.assertEqual(result, 0)
|
|
|
+ self.assertTrue(os.path.exists(bundle_file))
|
|
|
+ finally:
|
|
|
+ sys.stdin = old_stdin
|
|
|
+
|
|
|
+ def test_bundle_verify_missing_prerequisites(self):
|
|
|
+ """Test bundle verification with missing prerequisites."""
|
|
|
+ # Create a simple bundle first
|
|
|
+ bundle_file = os.path.join(self.test_dir, "prereq.bundle")
|
|
|
+ self._run_cli("bundle", "create", bundle_file, "HEAD")
|
|
|
+
|
|
|
+ # Create a new repo to simulate missing objects
|
|
|
+ new_repo_path = os.path.join(self.test_dir, "new_repo")
|
|
|
+ os.mkdir(new_repo_path)
|
|
|
+ new_repo = Repo.init(new_repo_path)
|
|
|
+ new_repo.close()
|
|
|
+
|
|
|
+ # Try to verify in new repo
|
|
|
+ old_cwd = os.getcwd()
|
|
|
+ try:
|
|
|
+ os.chdir(new_repo_path)
|
|
|
+ result, stdout, stderr = self._run_cli("bundle", "verify", bundle_file)
|
|
|
+ # Just check that verification runs - result depends on bundle content
|
|
|
+ self.assertIn(result, [0, 1])
|
|
|
+ finally:
|
|
|
+ os.chdir(old_cwd)
|
|
|
+
|
|
|
+ def test_bundle_create_with_committish_range(self):
|
|
|
+ """Test bundle creation with commit range using parse_committish_range."""
|
|
|
+ # Create additional commits for range testing
|
|
|
+ test_file3 = os.path.join(self.repo_path, "file3.txt")
|
|
|
+ with open(test_file3, "w") as f:
|
|
|
+ f.write("Content of file3\n")
|
|
|
+ self._run_cli("add", "file3.txt")
|
|
|
+ self._run_cli("commit", "--message=Add file3")
|
|
|
+
|
|
|
+ # Get commit SHAs
|
|
|
+ result, stdout, stderr = self._run_cli("log")
|
|
|
+ lines = stdout.strip().split("\n")
|
|
|
+ # Extract SHAs from commit lines
|
|
|
+ commits = []
|
|
|
+ for line in lines:
|
|
|
+ if line.startswith("commit:"):
|
|
|
+ sha = line.split()[1]
|
|
|
+ commits.append(sha[:8]) # Get short SHA
|
|
|
+
|
|
|
+ # We should have exactly 3 commits: Add file3, Add file2, Initial commit
|
|
|
+ self.assertEqual(len(commits), 3)
|
|
|
+
|
|
|
+ bundle_file = os.path.join(self.test_dir, "range-test.bundle")
|
|
|
+
|
|
|
+ # Test with commit range using .. syntax
|
|
|
+ # Create a bundle containing commits reachable from commits[0] but not from commits[2]
|
|
|
+ result, stdout, stderr = self._run_cli(
|
|
|
+ "bundle", "create", bundle_file, f"{commits[2]}..HEAD"
|
|
|
+ )
|
|
|
+ if result != 0:
|
|
|
+ self.fail(
|
|
|
+ f"Bundle create failed with exit code {result}. stdout: {stdout!r}, stderr: {stderr!r}"
|
|
|
+ )
|
|
|
+ self.assertEqual(result, 0)
|
|
|
+ self.assertTrue(os.path.exists(bundle_file))
|
|
|
+
|
|
|
+ # Verify the bundle was created
|
|
|
+ result, stdout, stderr = self._run_cli("bundle", "verify", bundle_file)
|
|
|
+ self.assertEqual(result, 0)
|
|
|
+ self.assertIn("valid and can be applied", stdout)
|
|
|
+
|
|
|
+
|
|
|
class FormatBytesTestCase(TestCase):
|
|
|
"""Tests for format_bytes function."""
|
|
|
|