Sfoglia il codice sorgente

Add sshsig package support to SSHSignatureVendor

Jelmer Vernooij 4 settimane fa
parent
commit
07c3204659
2 ha cambiato i file con 153 aggiunte e 18 eliminazioni
  1. 83 9
      dulwich/signature.py
  2. 70 9
      tests/test_signature.py

+ 83 - 9
dulwich/signature.py

@@ -372,24 +372,93 @@ class SSHSignatureVendor(SignatureVendor):
           The signature as bytes
 
         Raises:
-          NotImplementedError: SSH signing not yet implemented
+          NotImplementedError: The sshsig package does not support signing.
+                              Use SSHCliSignatureVendor instead.
         """
-        raise NotImplementedError("SSH signing not yet implemented")
+        raise NotImplementedError(
+            "The sshsig package does not support signing. "
+            "Use SSHCliSignatureVendor instead."
+        )
 
     def verify(
         self, data: bytes, signature: bytes, keyids: Iterable[str] | None = None
     ) -> None:
-        """Verify an SSH signature.
+        """Verify an SSH signature using the sshsig package.
 
         Args:
           data: The data that was signed
-          signature: The signature to verify
-          keyids: Optional iterable of allowed SSH public keys
+          signature: The SSH signature to verify (armored format)
+          keyids: Optional iterable of allowed SSH public keys.
+                 If not provided, uses gpg.ssh.allowedSignersFile from config.
 
         Raises:
-          NotImplementedError: SSH verification not yet implemented
+          ValueError: if no allowed signers are configured or provided
+          sshsig.sshsig.InvalidSignature: if signature verification fails
         """
-        raise NotImplementedError("SSH verification not yet implemented")
+        import sshsig.allowed_signers
+        import sshsig.ssh_public_key
+        import sshsig.sshsig
+        from typing import Any
+
+        # Determine allowed signers
+        allowed_keys: list[Any] = []
+
+        if keyids:
+            # Parse keyids as SSH public keys
+            for keyid in keyids:
+                try:
+                    # mypy doesn't see PublicKey.from_string, use Any
+                    key: Any = sshsig.ssh_public_key.PublicKey.from_string(  # type: ignore[attr-defined]
+                        keyid.encode()
+                    )
+                    allowed_keys.append(key)
+                except Exception:
+                    # Try as a file path or ignore invalid keys
+                    try:
+                        with open(keyid, "rb") as f:
+                            key_str = f.read().strip()
+                            key = sshsig.ssh_public_key.PublicKey.from_string(  # type: ignore[attr-defined]
+                                key_str
+                            )
+                            allowed_keys.append(key)
+                    except Exception:
+                        pass
+        elif self.allowed_signers_file:
+            # Load from allowedSignersFile
+            import pathlib
+
+            try:
+                allowed_keys = list(
+                    sshsig.allowed_signers.load_for_git_allowed_signers_file(
+                        pathlib.Path(self.allowed_signers_file)
+                    )
+                )
+            except FileNotFoundError:
+                raise ValueError(
+                    f"Allowed signers file not found: {self.allowed_signers_file}"
+                )
+        else:
+            raise ValueError(
+                "SSH signature verification requires either keyids or "
+                "gpg.ssh.allowedSignersFile to be configured"
+            )
+
+        if not allowed_keys:
+            raise ValueError("No valid allowed signers found")
+
+        # Verify the signature
+        # sshsig.verify expects armored signature as string
+        sig_str = (
+            signature.decode("utf-8") if isinstance(signature, bytes) else signature
+        )
+
+        # Verify with namespace "git" (Git's default)
+        sshsig.sshsig.verify(
+            msg_in=data,
+            armored_signature=sig_str,
+            allowed_signers=allowed_keys,
+            namespace="git",
+        )
 
 
 class SSHCliSignatureVendor(SignatureVendor):
@@ -589,8 +658,13 @@ def get_signature_vendor(
     elif format_lower == "x509":
         raise ValueError("X.509 signatures are not yet supported")
     elif format_lower == "ssh":
-        # For SSH, use CLI vendor (sshsig package support can be added later)
-        return SSHCliSignatureVendor(config=config)
+        # Try to use sshsig package vendor first (verify-only), fall back to CLI
+        try:
+            import sshsig  # noqa: F401
+
+            return SSHSignatureVendor(config=config)
+        except ImportError:
+            return SSHCliSignatureVendor(config=config)
     else:
         raise ValueError(f"Unsupported signature format: {format}")
 

+ 70 - 9
tests/test_signature.py

@@ -312,7 +312,8 @@ class GetSignatureVendorTests(unittest.TestCase):
     def test_ssh_format_supported(self) -> None:
         """Test that ssh format is now supported."""
         vendor = get_signature_vendor(format="ssh")
-        self.assertIsInstance(vendor, SSHCliSignatureVendor)
+        # Should be either SSHSignatureVendor or SSHCliSignatureVendor
+        self.assertIsInstance(vendor, (SSHSignatureVendor, SSHCliSignatureVendor))
 
     def test_invalid_format(self) -> None:
         """Test that invalid format raises ValueError."""
@@ -333,23 +334,26 @@ class GetSignatureVendorTests(unittest.TestCase):
     def test_ssh_format(self) -> None:
         """Test requesting SSH format."""
         vendor = get_signature_vendor(format="ssh")
-        self.assertIsInstance(vendor, SSHCliSignatureVendor)
+        # Should be either SSHSignatureVendor or SSHCliSignatureVendor
+        self.assertIsInstance(vendor, (SSHSignatureVendor, SSHCliSignatureVendor))
 
 
 class SSHSignatureVendorTests(unittest.TestCase):
-    """Tests for SSHSignatureVendor base implementation."""
+    """Tests for SSHSignatureVendor (sshsig package implementation)."""
 
-    def test_not_implemented_sign(self) -> None:
-        """Test that sign raises NotImplementedError."""
+    def test_sign_not_supported(self) -> None:
+        """Test that sign raises NotImplementedError with helpful message."""
         vendor = SSHSignatureVendor()
-        with self.assertRaises(NotImplementedError):
+        with self.assertRaises(NotImplementedError) as cm:
             vendor.sign(b"test data", keyid="dummy")
+        self.assertIn("SSHCliSignatureVendor", str(cm.exception))
 
-    def test_not_implemented_verify(self) -> None:
-        """Test that verify raises NotImplementedError."""
+    def test_verify_without_config_raises(self) -> None:
+        """Test that verify without config or keyids raises ValueError."""
         vendor = SSHSignatureVendor()
-        with self.assertRaises(NotImplementedError):
+        with self.assertRaises(ValueError) as cm:
             vendor.verify(b"test data", b"fake signature")
+        self.assertIn("allowedSignersFile", str(cm.exception))
 
     def test_config_parsing(self) -> None:
         """Test parsing SSH config options."""
@@ -361,6 +365,63 @@ class SSHSignatureVendorTests(unittest.TestCase):
         self.assertEqual(vendor.allowed_signers_file, "/path/to/allowed")
         self.assertEqual(vendor.default_key_command, "ssh-add -L")
 
+    def test_verify_with_cli_generated_signature(self) -> None:
+        """Test verifying a signature created by SSH CLI vendor."""
+        import os
+        import tempfile
+
+        if shutil.which("ssh-keygen") is None:
+            self.skipTest("ssh-keygen not available")
+
+        # Generate a test SSH key and signature using CLI vendor
+        with tempfile.TemporaryDirectory() as tmpdir:
+            private_key = os.path.join(tmpdir, "test_key")
+            public_key = private_key + ".pub"
+            allowed_signers = os.path.join(tmpdir, "allowed_signers")
+
+            # Generate Ed25519 key
+            subprocess.run(
+                [
+                    "ssh-keygen",
+                    "-t",
+                    "ed25519",
+                    "-f",
+                    private_key,
+                    "-N",
+                    "",
+                    "-C",
+                    "test@example.com",
+                ],
+                capture_output=True,
+                check=True,
+            )
+
+            # Create allowed_signers file
+            with open(public_key) as pub:
+                pub_key_content = pub.read().strip()
+            with open(allowed_signers, "w") as allowed:
+                allowed.write(f"* {pub_key_content}\n")
+
+            # Sign with CLI vendor
+            cli_config = ConfigDict()
+            cli_config.set(
+                (b"gpg", b"ssh"), b"allowedSignersFile", allowed_signers.encode()
+            )
+            cli_vendor = SSHCliSignatureVendor(config=cli_config)
+
+            test_data = b"test data for sshsig verification"
+            signature = cli_vendor.sign(test_data, keyid=private_key)
+
+            # Verify with sshsig package vendor
+            pkg_config = ConfigDict()
+            pkg_config.set(
+                (b"gpg", b"ssh"), b"allowedSignersFile", allowed_signers.encode()
+            )
+            pkg_vendor = SSHSignatureVendor(config=pkg_config)
+
+            # This should succeed
+            pkg_vendor.verify(test_data, signature)
+
 
 class SSHCliSignatureVendorTests(unittest.TestCase):
     """Tests for SSHCliSignatureVendor."""