Ver código fonte

Add detect_signature_format() and get_signature_vendor_for_signature()

Jelmer Vernooij 4 semanas atrás
pai
commit
08e8c3654d
2 arquivos alterados com 123 adições e 0 exclusões
  1. 53 0
      dulwich/signature.py
  2. 70 0
      tests/test_signature.py

+ 53 - 0
dulwich/signature.py

@@ -682,5 +682,58 @@ def get_signature_vendor(
         raise ValueError(f"Unsupported signature format: {format}")
 
 
+def detect_signature_format(signature: bytes) -> str:
+    """Detect the signature format from the signature data.
+
+    Git signatures are always in ASCII-armored format.
+
+    Args:
+      signature: The signature bytes
+
+    Returns:
+      Signature format constant (SIGNATURE_FORMAT_OPENPGP, SIGNATURE_FORMAT_SSH, etc.)
+
+    Raises:
+      ValueError: if signature format cannot be detected
+    """
+    # SSH signatures start with SSH armor header
+    if signature.startswith(b"-----BEGIN SSH SIGNATURE-----"):
+        return SIGNATURE_FORMAT_SSH
+
+    # GPG/PGP signatures start with PGP armor header
+    if signature.startswith(b"-----BEGIN PGP SIGNATURE-----"):
+        return SIGNATURE_FORMAT_OPENPGP
+
+    # X.509 signatures (S/MIME format)
+    if signature.startswith(
+        (b"-----BEGIN SIGNED MESSAGE-----", b"-----BEGIN PKCS7-----")
+    ):
+        return SIGNATURE_FORMAT_X509
+
+    raise ValueError("Unable to detect signature format")
+
+
+def get_signature_vendor_for_signature(
+    signature: bytes, config: "Config | None" = None
+) -> SignatureVendor:
+    """Get the appropriate signature vendor for a given signature.
+
+    This function detects the signature format and returns the appropriate
+    vendor to verify it.
+
+    Args:
+      signature: The signature bytes to detect format from
+      config: Optional Git configuration
+
+    Returns:
+      SignatureVendor instance appropriate for the signature format
+
+    Raises:
+      ValueError: if signature format cannot be detected or is not supported
+    """
+    format = detect_signature_format(signature)
+    return get_signature_vendor(format=format, config=config)
+
+
 # Default GPG vendor instance
 gpg_vendor = GPGSignatureVendor()

+ 70 - 0
tests/test_signature.py

@@ -27,12 +27,17 @@ import unittest
 
 from dulwich.config import ConfigDict
 from dulwich.signature import (
+    SIGNATURE_FORMAT_OPENPGP,
+    SIGNATURE_FORMAT_SSH,
+    SIGNATURE_FORMAT_X509,
     GPGCliSignatureVendor,
     GPGSignatureVendor,
     SignatureVendor,
     SSHCliSignatureVendor,
     SSHSigSignatureVendor,
+    detect_signature_format,
     get_signature_vendor,
+    get_signature_vendor_for_signature,
 )
 
 try:
@@ -526,3 +531,68 @@ class SSHCliSignatureVendorTests(unittest.TestCase):
 
             # Verify the signature
             vendor.verify(test_data, signature)
+
+
+class DetectSignatureFormatTests(unittest.TestCase):
+    """Tests for detect_signature_format function."""
+
+    def test_detect_ssh_signature(self) -> None:
+        """Test detecting SSH signature format."""
+        ssh_sig = b"-----BEGIN SSH SIGNATURE-----\nfoo\n-----END SSH SIGNATURE-----"
+        self.assertEqual(detect_signature_format(ssh_sig), SIGNATURE_FORMAT_SSH)
+
+    def test_detect_pgp_signature(self) -> None:
+        """Test detecting PGP signature format."""
+        pgp_sig = b"-----BEGIN PGP SIGNATURE-----\nfoo\n-----END PGP SIGNATURE-----"
+        self.assertEqual(detect_signature_format(pgp_sig), SIGNATURE_FORMAT_OPENPGP)
+
+    def test_detect_x509_signature_pkcs7(self) -> None:
+        """Test detecting X.509 PKCS7 signature format."""
+        x509_sig = b"-----BEGIN PKCS7-----\nfoo\n-----END PKCS7-----"
+        self.assertEqual(detect_signature_format(x509_sig), SIGNATURE_FORMAT_X509)
+
+    def test_detect_x509_signature_signed_message(self) -> None:
+        """Test detecting X.509 signed message format."""
+        x509_sig = b"-----BEGIN SIGNED MESSAGE-----\nfoo\n-----END SIGNED MESSAGE-----"
+        self.assertEqual(detect_signature_format(x509_sig), SIGNATURE_FORMAT_X509)
+
+    def test_unknown_signature_format(self) -> None:
+        """Test that unknown format raises ValueError."""
+        with self.assertRaises(ValueError) as cm:
+            detect_signature_format(b"not a signature")
+        self.assertIn("Unable to detect", str(cm.exception))
+
+
+class GetSignatureVendorForSignatureTests(unittest.TestCase):
+    """Tests for get_signature_vendor_for_signature function."""
+
+    def test_get_vendor_for_ssh_signature(self) -> None:
+        """Test getting vendor for SSH signature."""
+        ssh_sig = b"-----BEGIN SSH SIGNATURE-----\nfoo\n-----END SSH SIGNATURE-----"
+        vendor = get_signature_vendor_for_signature(ssh_sig)
+        self.assertIsInstance(vendor, (SSHSigSignatureVendor, SSHCliSignatureVendor))
+
+    def test_get_vendor_for_pgp_signature(self) -> None:
+        """Test getting vendor for PGP signature."""
+        pgp_sig = b"-----BEGIN PGP SIGNATURE-----\nfoo\n-----END PGP SIGNATURE-----"
+        vendor = get_signature_vendor_for_signature(pgp_sig)
+        self.assertIsInstance(vendor, (GPGSignatureVendor, GPGCliSignatureVendor))
+
+    def test_get_vendor_for_x509_signature(self) -> None:
+        """Test that X.509 signature raises ValueError (not supported)."""
+        x509_sig = b"-----BEGIN PKCS7-----\nfoo\n-----END PKCS7-----"
+        with self.assertRaises(ValueError) as cm:
+            get_signature_vendor_for_signature(x509_sig)
+        self.assertIn("X.509", str(cm.exception))
+
+    def test_get_vendor_with_config(self) -> None:
+        """Test that config is passed to vendor."""
+        config = ConfigDict()
+        config.set((b"gpg",), b"program", b"gpg2")
+
+        pgp_sig = b"-----BEGIN PGP SIGNATURE-----\nfoo\n-----END PGP SIGNATURE-----"
+        vendor = get_signature_vendor_for_signature(pgp_sig, config=config)
+
+        # If CLI vendor is used, check config was passed
+        if isinstance(vendor, GPGCliSignatureVendor):
+            self.assertEqual(vendor.gpg_command, "gpg2")