|
@@ -34,10 +34,12 @@ from unittest.mock import MagicMock, patch
|
|
|
|
|
|
|
|
from dulwich import cli
|
|
from dulwich import cli
|
|
|
from dulwich.cli import (
|
|
from dulwich.cli import (
|
|
|
|
|
+ AutoFlushBinaryIOWrapper,
|
|
|
|
|
+ AutoFlushTextIOWrapper,
|
|
|
|
|
+ _should_auto_flush,
|
|
|
detect_terminal_width,
|
|
detect_terminal_width,
|
|
|
format_bytes,
|
|
format_bytes,
|
|
|
launch_editor,
|
|
launch_editor,
|
|
|
- parse_relative_time,
|
|
|
|
|
write_columns,
|
|
write_columns,
|
|
|
)
|
|
)
|
|
|
from dulwich.repo import Repo
|
|
from dulwich.repo import Repo
|
|
@@ -143,30 +145,6 @@ class HelperFunctionsTest(TestCase):
|
|
|
result = launch_editor(b"Test template content")
|
|
result = launch_editor(b"Test template content")
|
|
|
self.assertEqual(b"Test template content", result)
|
|
self.assertEqual(b"Test template content", result)
|
|
|
|
|
|
|
|
- def test_parse_relative_time(self):
|
|
|
|
|
- """Test parsing relative time strings."""
|
|
|
|
|
- from dulwich.cli import parse_relative_time
|
|
|
|
|
-
|
|
|
|
|
- self.assertEqual(0, parse_relative_time("now"))
|
|
|
|
|
- self.assertEqual(60, parse_relative_time("1 minute ago"))
|
|
|
|
|
- self.assertEqual(120, parse_relative_time("2 minutes ago"))
|
|
|
|
|
- self.assertEqual(3600, parse_relative_time("1 hour ago"))
|
|
|
|
|
- self.assertEqual(7200, parse_relative_time("2 hours ago"))
|
|
|
|
|
- self.assertEqual(86400, parse_relative_time("1 day ago"))
|
|
|
|
|
- self.assertEqual(172800, parse_relative_time("2 days ago"))
|
|
|
|
|
- self.assertEqual(604800, parse_relative_time("1 week ago"))
|
|
|
|
|
- self.assertEqual(1209600, parse_relative_time("2 weeks ago"))
|
|
|
|
|
- self.assertEqual(2592000, parse_relative_time("1 month ago"))
|
|
|
|
|
- self.assertEqual(31536000, parse_relative_time("1 year ago"))
|
|
|
|
|
-
|
|
|
|
|
- # Test invalid formats
|
|
|
|
|
- with self.assertRaises(ValueError):
|
|
|
|
|
- parse_relative_time("invalid")
|
|
|
|
|
- with self.assertRaises(ValueError):
|
|
|
|
|
- parse_relative_time("2 days") # Missing "ago"
|
|
|
|
|
- with self.assertRaises(ValueError):
|
|
|
|
|
- parse_relative_time("two days ago") # Not a number
|
|
|
|
|
-
|
|
|
|
|
def test_parse_time_to_timestamp(self):
|
|
def test_parse_time_to_timestamp(self):
|
|
|
"""Test parsing time specifications to Unix timestamps."""
|
|
"""Test parsing time specifications to Unix timestamps."""
|
|
|
import time
|
|
import time
|
|
@@ -3061,92 +3039,6 @@ class FormatBytesTestCase(TestCase):
|
|
|
self.assertEqual("1000.0 TB", format_bytes(1024 * 1024 * 1024 * 1024 * 1000))
|
|
self.assertEqual("1000.0 TB", format_bytes(1024 * 1024 * 1024 * 1024 * 1000))
|
|
|
|
|
|
|
|
|
|
|
|
|
-class ParseRelativeTimeTestCase(TestCase):
|
|
|
|
|
- """Tests for parse_relative_time function."""
|
|
|
|
|
-
|
|
|
|
|
- def test_now(self):
|
|
|
|
|
- """Test parsing 'now'."""
|
|
|
|
|
- self.assertEqual(0, parse_relative_time("now"))
|
|
|
|
|
-
|
|
|
|
|
- def test_seconds(self):
|
|
|
|
|
- """Test parsing seconds."""
|
|
|
|
|
- self.assertEqual(1, parse_relative_time("1 second ago"))
|
|
|
|
|
- self.assertEqual(5, parse_relative_time("5 seconds ago"))
|
|
|
|
|
- self.assertEqual(30, parse_relative_time("30 seconds ago"))
|
|
|
|
|
-
|
|
|
|
|
- def test_minutes(self):
|
|
|
|
|
- """Test parsing minutes."""
|
|
|
|
|
- self.assertEqual(60, parse_relative_time("1 minute ago"))
|
|
|
|
|
- self.assertEqual(300, parse_relative_time("5 minutes ago"))
|
|
|
|
|
- self.assertEqual(1800, parse_relative_time("30 minutes ago"))
|
|
|
|
|
-
|
|
|
|
|
- def test_hours(self):
|
|
|
|
|
- """Test parsing hours."""
|
|
|
|
|
- self.assertEqual(3600, parse_relative_time("1 hour ago"))
|
|
|
|
|
- self.assertEqual(7200, parse_relative_time("2 hours ago"))
|
|
|
|
|
- self.assertEqual(86400, parse_relative_time("24 hours ago"))
|
|
|
|
|
-
|
|
|
|
|
- def test_days(self):
|
|
|
|
|
- """Test parsing days."""
|
|
|
|
|
- self.assertEqual(86400, parse_relative_time("1 day ago"))
|
|
|
|
|
- self.assertEqual(604800, parse_relative_time("7 days ago"))
|
|
|
|
|
- self.assertEqual(2592000, parse_relative_time("30 days ago"))
|
|
|
|
|
-
|
|
|
|
|
- def test_weeks(self):
|
|
|
|
|
- """Test parsing weeks."""
|
|
|
|
|
- self.assertEqual(604800, parse_relative_time("1 week ago"))
|
|
|
|
|
- self.assertEqual(1209600, parse_relative_time("2 weeks ago"))
|
|
|
|
|
- self.assertEqual(
|
|
|
|
|
- 36288000, parse_relative_time("60 weeks ago")
|
|
|
|
|
- ) # 60 * 7 * 24 * 60 * 60
|
|
|
|
|
-
|
|
|
|
|
- def test_invalid_format(self):
|
|
|
|
|
- """Test invalid time formats."""
|
|
|
|
|
- with self.assertRaises(ValueError) as cm:
|
|
|
|
|
- parse_relative_time("invalid")
|
|
|
|
|
- self.assertIn("Invalid relative time format", str(cm.exception))
|
|
|
|
|
-
|
|
|
|
|
- with self.assertRaises(ValueError) as cm:
|
|
|
|
|
- parse_relative_time("2 weeks")
|
|
|
|
|
- self.assertIn("Invalid relative time format", str(cm.exception))
|
|
|
|
|
-
|
|
|
|
|
- with self.assertRaises(ValueError) as cm:
|
|
|
|
|
- parse_relative_time("ago")
|
|
|
|
|
- self.assertIn("Invalid relative time format", str(cm.exception))
|
|
|
|
|
-
|
|
|
|
|
- with self.assertRaises(ValueError) as cm:
|
|
|
|
|
- parse_relative_time("two weeks ago")
|
|
|
|
|
- self.assertIn("Invalid number in relative time", str(cm.exception))
|
|
|
|
|
-
|
|
|
|
|
- def test_invalid_unit(self):
|
|
|
|
|
- """Test invalid time units."""
|
|
|
|
|
- with self.assertRaises(ValueError) as cm:
|
|
|
|
|
- parse_relative_time("5 fortnights ago")
|
|
|
|
|
- self.assertIn("Unknown time unit: fortnights", str(cm.exception))
|
|
|
|
|
-
|
|
|
|
|
- with self.assertRaises(ValueError) as cm:
|
|
|
|
|
- parse_relative_time("2 decades ago")
|
|
|
|
|
- self.assertIn("Unknown time unit: decades", str(cm.exception))
|
|
|
|
|
-
|
|
|
|
|
- def test_singular_plural(self):
|
|
|
|
|
- """Test that both singular and plural forms work."""
|
|
|
|
|
- self.assertEqual(
|
|
|
|
|
- parse_relative_time("1 second ago"), parse_relative_time("1 seconds ago")
|
|
|
|
|
- )
|
|
|
|
|
- self.assertEqual(
|
|
|
|
|
- parse_relative_time("1 minute ago"), parse_relative_time("1 minutes ago")
|
|
|
|
|
- )
|
|
|
|
|
- self.assertEqual(
|
|
|
|
|
- parse_relative_time("1 hour ago"), parse_relative_time("1 hours ago")
|
|
|
|
|
- )
|
|
|
|
|
- self.assertEqual(
|
|
|
|
|
- parse_relative_time("1 day ago"), parse_relative_time("1 days ago")
|
|
|
|
|
- )
|
|
|
|
|
- self.assertEqual(
|
|
|
|
|
- parse_relative_time("1 week ago"), parse_relative_time("1 weeks ago")
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
class GetPagerTest(TestCase):
|
|
class GetPagerTest(TestCase):
|
|
|
"""Tests for get_pager function."""
|
|
"""Tests for get_pager function."""
|
|
|
|
|
|
|
@@ -3672,5 +3564,377 @@ class ConfigCommandTest(DulwichCliTestCase):
|
|
|
self.assertEqual(stdout, "value1\nvalue2\nvalue3\n")
|
|
self.assertEqual(stdout, "value1\nvalue2\nvalue3\n")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
+class GitFlushTest(TestCase):
|
|
|
|
|
+ """Tests for GIT_FLUSH environment variable support."""
|
|
|
|
|
+
|
|
|
|
|
+ def test_should_auto_flush_with_git_flush_1(self):
|
|
|
|
|
+ """Test that GIT_FLUSH=1 enables auto-flushing."""
|
|
|
|
|
+
|
|
|
|
|
+ mock_stream = MagicMock()
|
|
|
|
|
+ mock_stream.isatty.return_value = True
|
|
|
|
|
+
|
|
|
|
|
+ self.assertTrue(_should_auto_flush(mock_stream, env={"GIT_FLUSH": "1"}))
|
|
|
|
|
+
|
|
|
|
|
+ def test_should_auto_flush_with_git_flush_0(self):
|
|
|
|
|
+ """Test that GIT_FLUSH=0 disables auto-flushing."""
|
|
|
|
|
+ mock_stream = MagicMock()
|
|
|
|
|
+ mock_stream.isatty.return_value = True
|
|
|
|
|
+
|
|
|
|
|
+ self.assertFalse(_should_auto_flush(mock_stream, env={"GIT_FLUSH": "0"}))
|
|
|
|
|
+
|
|
|
|
|
+ def test_should_auto_flush_auto_detect_tty(self):
|
|
|
|
|
+ """Test that auto-detect returns False for TTY (no flush needed)."""
|
|
|
|
|
+ mock_stream = MagicMock()
|
|
|
|
|
+ mock_stream.isatty.return_value = True
|
|
|
|
|
+
|
|
|
|
|
+ self.assertFalse(_should_auto_flush(mock_stream, env={}))
|
|
|
|
|
+
|
|
|
|
|
+ def test_should_auto_flush_auto_detect_pipe(self):
|
|
|
|
|
+ """Test that auto-detect returns True for pipes (flush needed)."""
|
|
|
|
|
+ mock_stream = MagicMock()
|
|
|
|
|
+ mock_stream.isatty.return_value = False
|
|
|
|
|
+
|
|
|
|
|
+ self.assertTrue(_should_auto_flush(mock_stream, env={}))
|
|
|
|
|
+
|
|
|
|
|
+ def test_text_wrapper_flushes_on_write(self):
|
|
|
|
|
+ """Test that AutoFlushTextIOWrapper flushes after write."""
|
|
|
|
|
+ mock_stream = MagicMock()
|
|
|
|
|
+ wrapper = AutoFlushTextIOWrapper(mock_stream)
|
|
|
|
|
+
|
|
|
|
|
+ wrapper.write("test")
|
|
|
|
|
+ mock_stream.write.assert_called_once_with("test")
|
|
|
|
|
+ mock_stream.flush.assert_called_once()
|
|
|
|
|
+
|
|
|
|
|
+ def test_text_wrapper_flushes_on_writelines(self):
|
|
|
|
|
+ """Test that AutoFlushTextIOWrapper flushes after writelines."""
|
|
|
|
|
+ from dulwich.cli import AutoFlushTextIOWrapper
|
|
|
|
|
+
|
|
|
|
|
+ mock_stream = MagicMock()
|
|
|
|
|
+ wrapper = AutoFlushTextIOWrapper(mock_stream)
|
|
|
|
|
+
|
|
|
|
|
+ wrapper.writelines(["line1\n", "line2\n"])
|
|
|
|
|
+ mock_stream.writelines.assert_called_once()
|
|
|
|
|
+ mock_stream.flush.assert_called_once()
|
|
|
|
|
+
|
|
|
|
|
+ def test_binary_wrapper_flushes_on_write(self):
|
|
|
|
|
+ """Test that AutoFlushBinaryIOWrapper flushes after write."""
|
|
|
|
|
+ mock_stream = MagicMock()
|
|
|
|
|
+ wrapper = AutoFlushBinaryIOWrapper(mock_stream)
|
|
|
|
|
+
|
|
|
|
|
+ wrapper.write(b"test")
|
|
|
|
|
+ mock_stream.write.assert_called_once_with(b"test")
|
|
|
|
|
+ mock_stream.flush.assert_called_once()
|
|
|
|
|
+
|
|
|
|
|
+ def test_text_wrapper_env_classmethod(self):
|
|
|
|
|
+ """Test that AutoFlushTextIOWrapper.env() respects GIT_FLUSH."""
|
|
|
|
|
+ mock_stream = MagicMock()
|
|
|
|
|
+ mock_stream.isatty.return_value = False
|
|
|
|
|
+
|
|
|
|
|
+ wrapper = AutoFlushTextIOWrapper.env(mock_stream, env={"GIT_FLUSH": "1"})
|
|
|
|
|
+ self.assertIsInstance(wrapper, AutoFlushTextIOWrapper)
|
|
|
|
|
+
|
|
|
|
|
+ wrapper = AutoFlushTextIOWrapper.env(mock_stream, env={"GIT_FLUSH": "0"})
|
|
|
|
|
+ self.assertIs(mock_stream, wrapper)
|
|
|
|
|
+
|
|
|
|
|
+ def test_binary_wrapper_env_classmethod(self):
|
|
|
|
|
+ """Test that AutoFlushBinaryIOWrapper.env() respects GIT_FLUSH."""
|
|
|
|
|
+ mock_stream = MagicMock()
|
|
|
|
|
+ mock_stream.isatty.return_value = False
|
|
|
|
|
+
|
|
|
|
|
+ wrapper = AutoFlushBinaryIOWrapper.env(mock_stream, env={"GIT_FLUSH": "1"})
|
|
|
|
|
+ self.assertIsInstance(wrapper, AutoFlushBinaryIOWrapper)
|
|
|
|
|
+
|
|
|
|
|
+ wrapper = AutoFlushBinaryIOWrapper.env(mock_stream, env={"GIT_FLUSH": "0"})
|
|
|
|
|
+ self.assertIs(wrapper, mock_stream)
|
|
|
|
|
+
|
|
|
|
|
+ def test_wrapper_delegates_attributes(self):
|
|
|
|
|
+ """Test that wrapper delegates unknown attributes to stream."""
|
|
|
|
|
+ mock_stream = MagicMock()
|
|
|
|
|
+ mock_stream.encoding = "utf-8"
|
|
|
|
|
+ wrapper = AutoFlushTextIOWrapper(mock_stream)
|
|
|
|
|
+
|
|
|
|
|
+ self.assertEqual(wrapper.encoding, "utf-8")
|
|
|
|
|
+
|
|
|
|
|
+ def test_wrapper_context_manager(self):
|
|
|
|
|
+ """Test that wrapper supports context manager protocol."""
|
|
|
|
|
+ mock_stream = MagicMock()
|
|
|
|
|
+ wrapper = AutoFlushTextIOWrapper(mock_stream)
|
|
|
|
|
+
|
|
|
|
|
+ with wrapper as w:
|
|
|
|
|
+ self.assertIs(w, wrapper)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class MaintenanceCommandTest(DulwichCliTestCase):
|
|
|
|
|
+ """Tests for maintenance command."""
|
|
|
|
|
+
|
|
|
|
|
+ def setUp(self):
|
|
|
|
|
+ super().setUp()
|
|
|
|
|
+ # Set up a temporary HOME for testing global config
|
|
|
|
|
+ self.temp_home = tempfile.mkdtemp()
|
|
|
|
|
+ self.addCleanup(shutil.rmtree, self.temp_home)
|
|
|
|
|
+ self.overrideEnv("HOME", self.temp_home)
|
|
|
|
|
+
|
|
|
|
|
+ def test_maintenance_run_default(self):
|
|
|
|
|
+ """Test maintenance run with default tasks."""
|
|
|
|
|
+ result, _stdout, _stderr = self._run_cli("maintenance", "run")
|
|
|
|
|
+ self.assertIsNone(result)
|
|
|
|
|
+
|
|
|
|
|
+ def test_maintenance_run_specific_task(self):
|
|
|
|
|
+ """Test maintenance run with a specific task."""
|
|
|
|
|
+ result, _stdout, _stderr = self._run_cli(
|
|
|
|
|
+ "maintenance", "run", "--task", "pack-refs"
|
|
|
|
|
+ )
|
|
|
|
|
+ self.assertIsNone(result)
|
|
|
|
|
+
|
|
|
|
|
+ def test_maintenance_run_multiple_tasks(self):
|
|
|
|
|
+ """Test maintenance run with multiple specific tasks."""
|
|
|
|
|
+ result, _stdout, _stderr = self._run_cli(
|
|
|
|
|
+ "maintenance", "run", "--task", "pack-refs", "--task", "gc"
|
|
|
|
|
+ )
|
|
|
|
|
+ self.assertIsNone(result)
|
|
|
|
|
+
|
|
|
|
|
+ def test_maintenance_run_quiet(self):
|
|
|
|
|
+ """Test maintenance run with quiet flag."""
|
|
|
|
|
+ result, _stdout, _stderr = self._run_cli("maintenance", "run", "--quiet")
|
|
|
|
|
+ self.assertIsNone(result)
|
|
|
|
|
+
|
|
|
|
|
+ def test_maintenance_run_auto(self):
|
|
|
|
|
+ """Test maintenance run with auto flag."""
|
|
|
|
|
+ result, _stdout, _stderr = self._run_cli("maintenance", "run", "--auto")
|
|
|
|
|
+ self.assertIsNone(result)
|
|
|
|
|
+
|
|
|
|
|
+ def test_maintenance_no_subcommand(self):
|
|
|
|
|
+ """Test maintenance command without subcommand shows help."""
|
|
|
|
|
+ result, _stdout, _stderr = self._run_cli("maintenance")
|
|
|
|
|
+ self.assertEqual(result, 1)
|
|
|
|
|
+
|
|
|
|
|
+ def test_maintenance_register(self):
|
|
|
|
|
+ """Test maintenance register subcommand."""
|
|
|
|
|
+ result, _stdout, _stderr = self._run_cli("maintenance", "register")
|
|
|
|
|
+ self.assertIsNone(result)
|
|
|
|
|
+
|
|
|
|
|
+ def test_maintenance_unregister(self):
|
|
|
|
|
+ """Test maintenance unregister subcommand."""
|
|
|
|
|
+ # First register
|
|
|
|
|
+ _result, _stdout, _stderr = self._run_cli("maintenance", "register")
|
|
|
|
|
+
|
|
|
|
|
+ # Then unregister
|
|
|
|
|
+ result, _stdout, _stderr = self._run_cli("maintenance", "unregister")
|
|
|
|
|
+ self.assertIsNone(result)
|
|
|
|
|
+
|
|
|
|
|
+ def test_maintenance_unregister_not_registered(self):
|
|
|
|
|
+ """Test unregistering a repository that is not registered."""
|
|
|
|
|
+ result, _stdout, _stderr = self._run_cli("maintenance", "unregister")
|
|
|
|
|
+ self.assertEqual(result, 1)
|
|
|
|
|
+
|
|
|
|
|
+ def test_maintenance_unregister_force(self):
|
|
|
|
|
+ """Test unregistering with --force flag."""
|
|
|
|
|
+ result, _stdout, _stderr = self._run_cli("maintenance", "unregister", "--force")
|
|
|
|
|
+ self.assertIsNone(result)
|
|
|
|
|
+
|
|
|
|
|
+ def test_maintenance_unimplemented_subcommand(self):
|
|
|
|
|
+ """Test unimplemented maintenance subcommands."""
|
|
|
|
|
+ for subcommand in ["start", "stop"]:
|
|
|
|
|
+ result, _stdout, _stderr = self._run_cli("maintenance", subcommand)
|
|
|
|
|
+ self.assertEqual(result, 1)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class InterpretTrailersCommandTest(DulwichCliTestCase):
|
|
|
|
|
+ """Tests for interpret-trailers command."""
|
|
|
|
|
+
|
|
|
|
|
+ def test_parse_trailers_from_file(self):
|
|
|
|
|
+ """Test parsing trailers from a file."""
|
|
|
|
|
+ # Create a message file with trailers
|
|
|
|
|
+ msg_file = os.path.join(self.test_dir, "message.txt")
|
|
|
|
|
+ with open(msg_file, "wb") as f:
|
|
|
|
|
+ f.write(b"Subject\n\nBody\n\nSigned-off-by: Alice <alice@example.com>\n")
|
|
|
|
|
+
|
|
|
|
|
+ result, stdout, _stderr = self._run_cli(
|
|
|
|
|
+ "interpret-trailers", "--only-trailers", msg_file
|
|
|
|
|
+ )
|
|
|
|
|
+ self.assertIsNone(result)
|
|
|
|
|
+ self.assertIn("Signed-off-by: Alice <alice@example.com>", stdout)
|
|
|
|
|
+
|
|
|
|
|
+ def test_add_trailer_to_message(self):
|
|
|
|
|
+ """Test adding a trailer to a message."""
|
|
|
|
|
+ msg_file = os.path.join(self.test_dir, "message.txt")
|
|
|
|
|
+ with open(msg_file, "wb") as f:
|
|
|
|
|
+ f.write(b"Subject\n\nBody text\n")
|
|
|
|
|
+
|
|
|
|
|
+ result, stdout, _stderr = self._run_cli(
|
|
|
|
|
+ "interpret-trailers",
|
|
|
|
|
+ "--trailer",
|
|
|
|
|
+ "Signed-off-by:Alice <alice@example.com>",
|
|
|
|
|
+ msg_file,
|
|
|
|
|
+ )
|
|
|
|
|
+ self.assertIsNone(result)
|
|
|
|
|
+ self.assertIn("Signed-off-by: Alice <alice@example.com>", stdout)
|
|
|
|
|
+ self.assertIn("Subject", stdout)
|
|
|
|
|
+ self.assertIn("Body text", stdout)
|
|
|
|
|
+
|
|
|
|
|
+ def test_add_multiple_trailers(self):
|
|
|
|
|
+ """Test adding multiple trailers."""
|
|
|
|
|
+ msg_file = os.path.join(self.test_dir, "message.txt")
|
|
|
|
|
+ with open(msg_file, "wb") as f:
|
|
|
|
|
+ f.write(b"Subject\n\nBody\n")
|
|
|
|
|
+
|
|
|
|
|
+ result, stdout, _stderr = self._run_cli(
|
|
|
|
|
+ "interpret-trailers",
|
|
|
|
|
+ "--trailer",
|
|
|
|
|
+ "Signed-off-by:Alice",
|
|
|
|
|
+ "--trailer",
|
|
|
|
|
+ "Reviewed-by:Bob",
|
|
|
|
|
+ msg_file,
|
|
|
|
|
+ )
|
|
|
|
|
+ self.assertIsNone(result)
|
|
|
|
|
+ self.assertIn("Signed-off-by: Alice", stdout)
|
|
|
|
|
+ self.assertIn("Reviewed-by: Bob", stdout)
|
|
|
|
|
+
|
|
|
|
|
+ def test_parse_shorthand(self):
|
|
|
|
|
+ """Test --parse shorthand option."""
|
|
|
|
|
+ msg_file = os.path.join(self.test_dir, "message.txt")
|
|
|
|
|
+ with open(msg_file, "wb") as f:
|
|
|
|
|
+ f.write(b"Subject\n\nBody\n\nSigned-off-by: Alice\n")
|
|
|
|
|
+
|
|
|
|
|
+ result, stdout, _stderr = self._run_cli(
|
|
|
|
|
+ "interpret-trailers", "--parse", msg_file
|
|
|
|
|
+ )
|
|
|
|
|
+ self.assertIsNone(result)
|
|
|
|
|
+ # --parse is shorthand for --only-trailers --only-input --unfold
|
|
|
|
|
+ self.assertEqual(stdout, "Signed-off-by: Alice\n")
|
|
|
|
|
+
|
|
|
|
|
+ def test_trim_empty(self):
|
|
|
|
|
+ """Test --trim-empty option."""
|
|
|
|
|
+ msg_file = os.path.join(self.test_dir, "message.txt")
|
|
|
|
|
+ with open(msg_file, "wb") as f:
|
|
|
|
|
+ f.write(b"Subject\n\nBody\n\nSigned-off-by: Alice\nReviewed-by: \n")
|
|
|
|
|
+
|
|
|
|
|
+ result, stdout, _stderr = self._run_cli(
|
|
|
|
|
+ "interpret-trailers", "--trim-empty", "--only-trailers", msg_file
|
|
|
|
|
+ )
|
|
|
|
|
+ self.assertIsNone(result)
|
|
|
|
|
+ self.assertIn("Signed-off-by: Alice", stdout)
|
|
|
|
|
+ self.assertNotIn("Reviewed-by:", stdout)
|
|
|
|
|
+
|
|
|
|
|
+ def test_if_exists_replace(self):
|
|
|
|
|
+ """Test --if-exists replace option."""
|
|
|
|
|
+ msg_file = os.path.join(self.test_dir, "message.txt")
|
|
|
|
|
+ with open(msg_file, "wb") as f:
|
|
|
|
|
+ f.write(b"Subject\n\nBody\n\nSigned-off-by: Alice\n")
|
|
|
|
|
+
|
|
|
|
|
+ result, stdout, _stderr = self._run_cli(
|
|
|
|
|
+ "interpret-trailers",
|
|
|
|
|
+ "--if-exists",
|
|
|
|
|
+ "replace",
|
|
|
|
|
+ "--trailer",
|
|
|
|
|
+ "Signed-off-by:Bob",
|
|
|
|
|
+ msg_file,
|
|
|
|
|
+ )
|
|
|
|
|
+ self.assertIsNone(result)
|
|
|
|
|
+ self.assertIn("Signed-off-by: Bob", stdout)
|
|
|
|
|
+ self.assertNotIn("Alice", stdout)
|
|
|
|
|
+
|
|
|
|
|
+ def test_trailer_with_equals(self):
|
|
|
|
|
+ """Test trailer with equals separator."""
|
|
|
|
|
+ msg_file = os.path.join(self.test_dir, "message.txt")
|
|
|
|
|
+ with open(msg_file, "wb") as f:
|
|
|
|
|
+ f.write(b"Subject\n\nBody\n")
|
|
|
|
|
+
|
|
|
|
|
+ result, stdout, _stderr = self._run_cli(
|
|
|
|
|
+ "interpret-trailers", "--trailer", "Bug=12345", msg_file
|
|
|
|
|
+ )
|
|
|
|
|
+ self.assertIsNone(result)
|
|
|
|
|
+ self.assertIn("Bug: 12345", stdout)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class ReplaceCommandTest(DulwichCliTestCase):
|
|
|
|
|
+ """Tests for replace command."""
|
|
|
|
|
+
|
|
|
|
|
+ def test_replace_create(self):
|
|
|
|
|
+ """Test creating a replacement ref."""
|
|
|
|
|
+ # Create two commits
|
|
|
|
|
+ [c1, c2] = build_commit_graph(self.repo.object_store, [[1], [2]])
|
|
|
|
|
+ self.repo[b"HEAD"] = c1.id
|
|
|
|
|
+
|
|
|
|
|
+ # Create replacement using the create form (decode to string for CLI)
|
|
|
|
|
+ c1_str = c1.id.decode("ascii")
|
|
|
|
|
+ c2_str = c2.id.decode("ascii")
|
|
|
|
|
+ _result, _stdout, _stderr = self._run_cli("replace", c1_str, c2_str)
|
|
|
|
|
+
|
|
|
|
|
+ # Verify the replacement ref was created
|
|
|
|
|
+ replace_ref = b"refs/replace/" + c1.id
|
|
|
|
|
+ self.assertIn(replace_ref, self.repo.refs.keys())
|
|
|
|
|
+ self.assertEqual(c2.id, self.repo.refs[replace_ref])
|
|
|
|
|
+
|
|
|
|
|
+ def test_replace_list_empty(self):
|
|
|
|
|
+ """Test listing replacements when there are none."""
|
|
|
|
|
+ _result, stdout, _stderr = self._run_cli("replace", "list")
|
|
|
|
|
+ self.assertEqual("", stdout)
|
|
|
|
|
+
|
|
|
|
|
+ def test_replace_list(self):
|
|
|
|
|
+ """Test listing replacement refs."""
|
|
|
|
|
+ # Create two commits
|
|
|
|
|
+ [c1, c2] = build_commit_graph(self.repo.object_store, [[1], [2]])
|
|
|
|
|
+ self.repo[b"HEAD"] = c1.id
|
|
|
|
|
+
|
|
|
|
|
+ # Create replacement
|
|
|
|
|
+ c1_str = c1.id.decode("ascii")
|
|
|
|
|
+ c2_str = c2.id.decode("ascii")
|
|
|
|
|
+ self._run_cli("replace", c1_str, c2_str)
|
|
|
|
|
+
|
|
|
|
|
+ # List replacements
|
|
|
|
|
+ _result, stdout, _stderr = self._run_cli("replace", "list")
|
|
|
|
|
+ self.assertIn(c1_str, stdout)
|
|
|
|
|
+ self.assertIn(c2_str, stdout)
|
|
|
|
|
+
|
|
|
|
|
+ def test_replace_default_list(self):
|
|
|
|
|
+ """Test that replace without subcommand defaults to list."""
|
|
|
|
|
+ # Create two commits
|
|
|
|
|
+ [c1, c2] = build_commit_graph(self.repo.object_store, [[1], [2]])
|
|
|
|
|
+ self.repo[b"HEAD"] = c1.id
|
|
|
|
|
+
|
|
|
|
|
+ # Create replacement
|
|
|
|
|
+ c1_str = c1.id.decode("ascii")
|
|
|
|
|
+ c2_str = c2.id.decode("ascii")
|
|
|
|
|
+ self._run_cli("replace", c1_str, c2_str)
|
|
|
|
|
+
|
|
|
|
|
+ # Call replace without subcommand (should list)
|
|
|
|
|
+ _result, stdout, _stderr = self._run_cli("replace")
|
|
|
|
|
+ self.assertIn(c1_str, stdout)
|
|
|
|
|
+ self.assertIn(c2_str, stdout)
|
|
|
|
|
+
|
|
|
|
|
+ def test_replace_delete(self):
|
|
|
|
|
+ """Test deleting a replacement ref."""
|
|
|
|
|
+ # Create two commits
|
|
|
|
|
+ [c1, c2] = build_commit_graph(self.repo.object_store, [[1], [2]])
|
|
|
|
|
+ self.repo[b"HEAD"] = c1.id
|
|
|
|
|
+
|
|
|
|
|
+ # Create replacement
|
|
|
|
|
+ c1_str = c1.id.decode("ascii")
|
|
|
|
|
+ c2_str = c2.id.decode("ascii")
|
|
|
|
|
+ self._run_cli("replace", c1_str, c2_str)
|
|
|
|
|
+
|
|
|
|
|
+ # Verify it exists
|
|
|
|
|
+ replace_ref = b"refs/replace/" + c1.id
|
|
|
|
|
+ self.assertIn(replace_ref, self.repo.refs.keys())
|
|
|
|
|
+
|
|
|
|
|
+ # Delete the replacement
|
|
|
|
|
+ _result, _stdout, _stderr = self._run_cli("replace", "delete", c1_str)
|
|
|
|
|
+
|
|
|
|
|
+ # Verify it's gone
|
|
|
|
|
+ self.assertNotIn(replace_ref, self.repo.refs.keys())
|
|
|
|
|
+
|
|
|
|
|
+ def test_replace_delete_nonexistent(self):
|
|
|
|
|
+ """Test deleting a nonexistent replacement ref fails."""
|
|
|
|
|
+ # Create a commit
|
|
|
|
|
+ [c1] = build_commit_graph(self.repo.object_store, [[1]])
|
|
|
|
|
+ self.repo[b"HEAD"] = c1.id
|
|
|
|
|
+
|
|
|
|
|
+ # Try to delete a non-existent replacement
|
|
|
|
|
+ c1_str = c1.id.decode("ascii")
|
|
|
|
|
+ result, _stdout, _stderr = self._run_cli("replace", "delete", c1_str)
|
|
|
|
|
+ self.assertEqual(result, 1)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
if __name__ == "__main__":
|
|
if __name__ == "__main__":
|
|
|
unittest.main()
|
|
unittest.main()
|