فهرست منبع

Add SSH signature support with SSHSignatureVendor and SSHCliSignatureVendor

Jelmer Vernooij 4 هفته پیش
والد
کامیت
ed6bad316f
2فایلهای تغییر یافته به همراه365 افزوده شده و 6 حذف شده
  1. 223 1
      dulwich/signature.py
  2. 142 5
      tests/test_signature.py

+ 223 - 1
dulwich/signature.py

@@ -325,6 +325,227 @@ class GPGCliSignatureVendor(SignatureVendor):
                 )
 
 
+class SSHSignatureVendor(SignatureVendor):
+    """Signature vendor that uses SSH keys for signing and verification.
+
+    Supports git config options:
+    - gpg.ssh.allowedSignersFile: File containing allowed SSH public keys
+    - gpg.ssh.defaultKeyCommand: Command to get default SSH key
+    """
+
+    def __init__(self, config: "Config | None" = None) -> None:
+        """Initialize the SSH signature vendor.
+
+        Args:
+          config: Optional Git configuration for SSH signature settings
+        """
+        super().__init__(config)
+
+        # Parse SSH-specific config
+        self.allowed_signers_file = None
+        self.default_key_command = None
+
+        if config is not None:
+            try:
+                signers_file = config.get((b"gpg", b"ssh"), b"allowedSignersFile")
+                if signers_file:
+                    self.allowed_signers_file = signers_file.decode("utf-8")
+            except KeyError:
+                pass
+
+            try:
+                key_command = config.get((b"gpg", b"ssh"), b"defaultKeyCommand")
+                if key_command:
+                    self.default_key_command = key_command.decode("utf-8")
+            except KeyError:
+                pass
+
+    def sign(self, data: bytes, keyid: str | None = None) -> bytes:
+        """Sign data with an SSH key.
+
+        Args:
+          data: The data to sign
+          keyid: Optional SSH key to use. Can be a path to private key or
+                 public key prefixed with "key::"
+
+        Returns:
+          The signature as bytes
+
+        Raises:
+          NotImplementedError: SSH signing not yet implemented
+        """
+        raise NotImplementedError("SSH signing not yet implemented")
+
+    def verify(
+        self, data: bytes, signature: bytes, keyids: Iterable[str] | None = None
+    ) -> None:
+        """Verify an SSH signature.
+
+        Args:
+          data: The data that was signed
+          signature: The signature to verify
+          keyids: Optional iterable of allowed SSH public keys
+
+        Raises:
+          NotImplementedError: SSH verification not yet implemented
+        """
+        raise NotImplementedError("SSH verification not yet implemented")
+
+
+class SSHCliSignatureVendor(SignatureVendor):
+    """Signature vendor that uses ssh-keygen command-line tool for SSH signatures.
+
+    Supports git config options:
+    - gpg.ssh.allowedSignersFile: File containing allowed SSH public keys
+    - gpg.ssh.program: Path to ssh-keygen command
+    """
+
+    def __init__(
+        self, config: "Config | None" = None, ssh_command: str | None = None
+    ) -> None:
+        """Initialize the SSH CLI vendor.
+
+        Args:
+          config: Optional Git configuration to read gpg.ssh settings from
+          ssh_command: Path to ssh-keygen command. If not specified, will try to
+                      read from config's gpg.ssh.program setting, or default to 'ssh-keygen'
+        """
+        super().__init__(config)
+
+        if ssh_command is not None:
+            self.ssh_command = ssh_command
+        elif config is not None:
+            try:
+                ssh_program = config.get((b"gpg", b"ssh"), b"program")
+                self.ssh_command = ssh_program.decode("utf-8")
+            except KeyError:
+                self.ssh_command = "ssh-keygen"
+        else:
+            self.ssh_command = "ssh-keygen"
+
+        # Parse SSH-specific config
+        self.allowed_signers_file = None
+        if config is not None:
+            try:
+                signers_file = config.get((b"gpg", b"ssh"), b"allowedSignersFile")
+                if signers_file:
+                    self.allowed_signers_file = signers_file.decode("utf-8")
+            except KeyError:
+                pass
+
+    def sign(self, data: bytes, keyid: str | None = None) -> bytes:
+        """Sign data with an SSH key using ssh-keygen.
+
+        Args:
+          data: The data to sign
+          keyid: Path to SSH private key. If not specified, ssh-keygen will
+                use the default key (typically from ssh-agent)
+
+        Returns:
+          The signature as bytes
+
+        Raises:
+          subprocess.CalledProcessError: if ssh-keygen command fails
+          ValueError: if keyid is not provided and no default key available
+        """
+        import os
+        import subprocess
+        import tempfile
+
+        if keyid is None:
+            raise ValueError("SSH signing requires a key to be specified")
+
+        # Create a temporary directory to hold both data and signature files
+        # ssh-keygen creates the signature file with .sig suffix
+        with tempfile.TemporaryDirectory() as tmpdir:
+            data_filename = os.path.join(tmpdir, "data.git")
+            sig_filename = data_filename + ".sig"
+
+            # Write data to file
+            with open(data_filename, "wb") as data_file:
+                data_file.write(data)
+
+            # Sign with ssh-keygen
+            args = [
+                self.ssh_command,
+                "-Y",
+                "sign",
+                "-f",
+                keyid,
+                "-n",
+                "git",  # namespace
+                data_filename,
+            ]
+
+            subprocess.run(args, capture_output=True, check=True)
+
+            # Read signature file
+            with open(sig_filename, "rb") as sig_file:
+                signature = sig_file.read()
+
+            return signature
+
+    def verify(
+        self, data: bytes, signature: bytes, keyids: Iterable[str] | None = None
+    ) -> None:
+        """Verify an SSH signature using ssh-keygen.
+
+        Args:
+          data: The data that was signed
+          signature: The signature to verify
+          keyids: Not used for SSH verification. Instead, allowed signers
+                 are read from gpg.ssh.allowedSignersFile config
+
+        Raises:
+          subprocess.CalledProcessError: if signature verification fails
+          ValueError: if allowedSignersFile is not configured
+        """
+        import os
+        import subprocess
+        import tempfile
+
+        if self.allowed_signers_file is None:
+            raise ValueError(
+                "SSH signature verification requires gpg.ssh.allowedSignersFile "
+                "to be configured"
+            )
+
+        # Create a temporary directory for data and signature files
+        with tempfile.TemporaryDirectory() as tmpdir:
+            data_filename = os.path.join(tmpdir, "data.git")
+            sig_filename = os.path.join(tmpdir, "data.git.sig")
+
+            # Write data and signature files
+            with open(data_filename, "wb") as data_file:
+                data_file.write(data)
+
+            with open(sig_filename, "wb") as sig_file:
+                sig_file.write(signature)
+
+            # Verify with ssh-keygen
+            # For git signatures, we use "git" as the signer identity
+            args = [
+                self.ssh_command,
+                "-Y",
+                "verify",
+                "-f",
+                self.allowed_signers_file,
+                "-I",
+                "git",  # signer identity
+                "-n",
+                "git",  # namespace
+                "-s",
+                sig_filename,
+            ]
+
+            subprocess.run(
+                args,
+                stdin=open(data_filename, "rb"),
+                capture_output=True,
+                check=True,
+            )
+
+
 def get_signature_vendor(
     format: str | None = None, config: "Config | None" = None
 ) -> SignatureVendor:
@@ -368,7 +589,8 @@ def get_signature_vendor(
     elif format_lower == "x509":
         raise ValueError("X.509 signatures are not yet supported")
     elif format_lower == "ssh":
-        raise ValueError("SSH signatures are not yet supported")
+        # For SSH, use CLI vendor (sshsig package support can be added later)
+        return SSHCliSignatureVendor(config=config)
     else:
         raise ValueError(f"Unsupported signature format: {format}")
 

+ 142 - 5
tests/test_signature.py

@@ -30,6 +30,8 @@ from dulwich.signature import (
     GPGCliSignatureVendor,
     GPGSignatureVendor,
     SignatureVendor,
+    SSHCliSignatureVendor,
+    SSHSignatureVendor,
     get_signature_vendor,
 )
 
@@ -307,11 +309,10 @@ class GetSignatureVendorTests(unittest.TestCase):
             get_signature_vendor(format="x509")
         self.assertIn("X.509", str(cm.exception))
 
-    def test_ssh_not_supported(self) -> None:
-        """Test that ssh format raises ValueError."""
-        with self.assertRaises(ValueError) as cm:
-            get_signature_vendor(format="ssh")
-        self.assertIn("SSH", str(cm.exception))
+    def test_ssh_format_supported(self) -> None:
+        """Test that ssh format is now supported."""
+        vendor = get_signature_vendor(format="ssh")
+        self.assertIsInstance(vendor, SSHCliSignatureVendor)
 
     def test_invalid_format(self) -> None:
         """Test that invalid format raises ValueError."""
@@ -328,3 +329,139 @@ class GetSignatureVendorTests(unittest.TestCase):
         # If CLI vendor is used, check that config was passed
         if isinstance(vendor, GPGCliSignatureVendor):
             self.assertEqual(vendor.gpg_command, "gpg2")
+
+    def test_ssh_format(self) -> None:
+        """Test requesting SSH format."""
+        vendor = get_signature_vendor(format="ssh")
+        self.assertIsInstance(vendor, SSHCliSignatureVendor)
+
+
+class SSHSignatureVendorTests(unittest.TestCase):
+    """Tests for SSHSignatureVendor base implementation."""
+
+    def test_not_implemented_sign(self) -> None:
+        """Test that sign raises NotImplementedError."""
+        vendor = SSHSignatureVendor()
+        with self.assertRaises(NotImplementedError):
+            vendor.sign(b"test data", keyid="dummy")
+
+    def test_not_implemented_verify(self) -> None:
+        """Test that verify raises NotImplementedError."""
+        vendor = SSHSignatureVendor()
+        with self.assertRaises(NotImplementedError):
+            vendor.verify(b"test data", b"fake signature")
+
+    def test_config_parsing(self) -> None:
+        """Test parsing SSH config options."""
+        config = ConfigDict()
+        config.set((b"gpg", b"ssh"), b"allowedSignersFile", b"/path/to/allowed")
+        config.set((b"gpg", b"ssh"), b"defaultKeyCommand", b"ssh-add -L")
+
+        vendor = SSHSignatureVendor(config=config)
+        self.assertEqual(vendor.allowed_signers_file, "/path/to/allowed")
+        self.assertEqual(vendor.default_key_command, "ssh-add -L")
+
+
+class SSHCliSignatureVendorTests(unittest.TestCase):
+    """Tests for SSHCliSignatureVendor."""
+
+    def setUp(self) -> None:
+        """Check if ssh-keygen is available."""
+        if shutil.which("ssh-keygen") is None:
+            self.skipTest("ssh-keygen command not available")
+
+    def test_ssh_program_from_config(self) -> None:
+        """Test reading gpg.ssh.program from config."""
+        config = ConfigDict()
+        config.set((b"gpg", b"ssh"), b"program", b"/usr/bin/ssh-keygen")
+
+        vendor = SSHCliSignatureVendor(config=config)
+        self.assertEqual(vendor.ssh_command, "/usr/bin/ssh-keygen")
+
+    def test_ssh_program_override(self) -> None:
+        """Test that ssh_command parameter overrides config."""
+        config = ConfigDict()
+        config.set((b"gpg", b"ssh"), b"program", b"/usr/bin/ssh-keygen")
+
+        vendor = SSHCliSignatureVendor(config=config, ssh_command="ssh-keygen")
+        self.assertEqual(vendor.ssh_command, "ssh-keygen")
+
+    def test_ssh_program_default(self) -> None:
+        """Test default ssh-keygen command when no config provided."""
+        vendor = SSHCliSignatureVendor()
+        self.assertEqual(vendor.ssh_command, "ssh-keygen")
+
+    def test_allowed_signers_from_config(self) -> None:
+        """Test reading gpg.ssh.allowedSignersFile from config."""
+        config = ConfigDict()
+        config.set((b"gpg", b"ssh"), b"allowedSignersFile", b"/tmp/allowed_signers")
+
+        vendor = SSHCliSignatureVendor(config=config)
+        self.assertEqual(vendor.allowed_signers_file, "/tmp/allowed_signers")
+
+    def test_sign_without_key_raises(self) -> None:
+        """Test that signing without a key raises ValueError."""
+        vendor = SSHCliSignatureVendor()
+        with self.assertRaises(ValueError) as cm:
+            vendor.sign(b"test data")
+        self.assertIn("key", str(cm.exception).lower())
+
+    def test_verify_without_allowed_signers_raises(self) -> None:
+        """Test that verify without allowedSignersFile raises ValueError."""
+        vendor = SSHCliSignatureVendor()
+        with self.assertRaises(ValueError) as cm:
+            vendor.verify(b"test data", b"fake signature")
+        self.assertIn("allowedSignersFile", str(cm.exception))
+
+    def test_sign_and_verify_with_ssh_key(self) -> None:
+        """Test sign and verify cycle with SSH key."""
+        import os
+        import tempfile
+
+        # Generate a test SSH key
+        with tempfile.TemporaryDirectory() as tmpdir:
+            private_key = os.path.join(tmpdir, "test_key")
+            public_key = private_key + ".pub"
+            allowed_signers = os.path.join(tmpdir, "allowed_signers")
+
+            # Generate Ed25519 key (no passphrase)
+            subprocess.run(
+                [
+                    "ssh-keygen",
+                    "-t",
+                    "ed25519",
+                    "-f",
+                    private_key,
+                    "-N",
+                    "",
+                    "-C",
+                    "test@example.com",
+                ],
+                capture_output=True,
+                check=True,
+            )
+
+            # Create allowed_signers file
+            with open(public_key) as pub:
+                pub_key_content = pub.read().strip()
+            with open(allowed_signers, "w") as allowed:
+                allowed.write(f"git {pub_key_content}\n")
+
+            # Create vendor with config
+            config = ConfigDict()
+            config.set(
+                (b"gpg", b"ssh"), b"allowedSignersFile", allowed_signers.encode()
+            )
+
+            vendor = SSHCliSignatureVendor(config=config)
+
+            # Test signing and verification
+            test_data = b"test data to sign with SSH"
+            signature = vendor.sign(test_data, keyid=private_key)
+
+            self.assertIsInstance(signature, bytes)
+            self.assertGreater(len(signature), 0)
+            self.assertTrue(signature.startswith(b"-----BEGIN SSH SIGNATURE-----"))
+
+            # Verify the signature
+            vendor.verify(test_data, signature)