ソースを参照

Add X.509 signature support

Implement X509SignatureVendor using gpgsm (GnuPG for S/MIME) for creating
and verifying X.509 signatures.
Jelmer Vernooij 3 週間 前
コミット
569b0c5016
2 ファイル変更207 行追加10 行削除
  1. 139 1
      dulwich/signature.py
  2. 68 9
      tests/test_signature.py

+ 139 - 1
dulwich/signature.py

@@ -330,6 +330,144 @@ class GPGCliSignatureVendor(SignatureVendor):
                 )
 
 
+class X509SignatureVendor(SignatureVendor):
+    """Signature vendor that uses gpgsm (GnuPG for S/MIME) for X.509 signatures.
+
+    Supports git config options:
+    - gpg.x509.program: Path to gpgsm command (defaults to 'gpgsm')
+    """
+
+    def __init__(
+        self, config: "Config | None" = None, gpgsm_command: str | None = None
+    ) -> None:
+        """Initialize the X.509 signature vendor.
+
+        Args:
+          config: Optional Git configuration
+          gpgsm_command: Path to the gpgsm command. If not specified, will try to
+                        read from config's gpg.x509.program setting, or default to 'gpgsm'
+        """
+        super().__init__(config)
+
+        if gpgsm_command is not None:
+            self.gpgsm_command = gpgsm_command
+        elif config is not None:
+            try:
+                gpgsm_program = config.get((b"gpg", b"x509"), b"program")
+                self.gpgsm_command = gpgsm_program.decode("utf-8")
+            except KeyError:
+                self.gpgsm_command = "gpgsm"
+        else:
+            self.gpgsm_command = "gpgsm"
+
+    def sign(self, data: bytes, keyid: str | None = None) -> bytes:
+        """Sign data with an X.509 certificate using gpgsm.
+
+        Args:
+          data: The data to sign
+          keyid: Optional certificate ID to use for signing. If not specified,
+                 the default certificate will be used.
+
+        Returns:
+          The signature as bytes
+
+        Raises:
+          subprocess.CalledProcessError: if gpgsm command fails
+        """
+        import subprocess
+
+        args = [self.gpgsm_command, "--detach-sign", "--armor"]
+        if keyid is not None:
+            args.extend(["--local-user", keyid])
+
+        result = subprocess.run(
+            args,
+            input=data,
+            capture_output=True,
+            check=True,
+        )
+        return result.stdout
+
+    def verify(
+        self, data: bytes, signature: bytes, keyids: Iterable[str] | None = None
+    ) -> None:
+        """Verify an X.509 signature using gpgsm.
+
+        Args:
+          data: The data that was signed
+          signature: The signature to verify
+          keyids: Optional iterable of trusted certificate IDs.
+            If the signature was not created by any certificate in keyids, verification will
+            fail. If not specified, this function only verifies that the signature is valid.
+
+        Raises:
+          subprocess.CalledProcessError: if gpgsm signature verification fails
+          ValueError: if signature was not created by a trusted certificate
+        """
+        import subprocess
+        import tempfile
+
+        # gpgsm requires the signature and data in separate files for verification
+        with (
+            tempfile.NamedTemporaryFile(mode="wb", suffix=".sig") as sig_file,
+            tempfile.NamedTemporaryFile(mode="wb", suffix=".dat") as data_file,
+        ):
+            sig_file.write(signature)
+            sig_file.flush()
+
+            data_file.write(data)
+            data_file.flush()
+
+            args = [self.gpgsm_command, "--verify", sig_file.name, data_file.name]
+
+            result = subprocess.run(
+                args,
+                capture_output=True,
+                check=True,
+            )
+
+            # If keyids are specified, check that the signature was made by one of them
+            if keyids:
+                # Parse stderr to extract the certificate fingerprint/ID that made the signature
+                stderr_text = result.stderr.decode("utf-8", errors="replace")
+
+                # Collect signing certificate IDs
+                signing_certs = []
+                for line in stderr_text.split("\n"):
+                    if "using certificate ID" in line or "Good signature from" in line:
+                        # Extract certificate ID from the output
+                        parts = line.split()
+                        for i, part in enumerate(parts):
+                            if part.upper().startswith("0x"):
+                                signing_certs.append(part[2:])
+                            elif len(part) >= 8 and all(
+                                c in "0123456789ABCDEF" for c in part.upper()
+                            ):
+                                signing_certs.append(part)
+
+                if not signing_certs:
+                    raise ValueError(
+                        "Could not determine signing certificate from gpgsm output"
+                    )
+
+                # Check if any of the signing certs match the trusted keyids
+                keyids_normalized = [k.replace(" ", "").upper() for k in keyids]
+
+                for signed_by in signing_certs:
+                    signed_by_normalized = signed_by.replace(" ", "").upper()
+                    if any(
+                        signed_by_normalized in keyid or keyid in signed_by_normalized
+                        for keyid in keyids_normalized
+                    ):
+                        return
+
+                # None of the signing certs matched
+                raise ValueError(
+                    f"Signature not created by a trusted certificate. "
+                    f"Signed by: {signing_certs}, trusted certs: {list(keyids)}"
+                )
+
+
 class SSHSigSignatureVendor(SignatureVendor):
     """Signature vendor that uses the sshsig Python package for SSH signature verification.
 
@@ -669,7 +807,7 @@ def get_signature_vendor(
         except ImportError:
             return GPGCliSignatureVendor(config=config)
     elif format_lower == SIGNATURE_FORMAT_X509:
-        raise ValueError("X.509 signatures are not yet supported")
+        return X509SignatureVendor(config=config)
     elif format_lower == SIGNATURE_FORMAT_SSH:
         # Try to use sshsig package vendor first (verify-only), fall back to CLI
         try:

+ 68 - 9
tests/test_signature.py

@@ -35,6 +35,7 @@ from dulwich.signature import (
     SignatureVendor,
     SSHCliSignatureVendor,
     SSHSigSignatureVendor,
+    X509SignatureVendor,
     detect_signature_format,
     get_signature_vendor,
     get_signature_vendor_for_signature,
@@ -282,6 +283,61 @@ class GPGCliSignatureVendorTests(unittest.TestCase):
         self.assertEqual(vendor.gpg_command, "gpg")
 
 
+class X509SignatureVendorTests(unittest.TestCase):
+    """Tests for X509SignatureVendor."""
+
+    def test_gpgsm_program_default(self) -> None:
+        """Test default gpgsm command is 'gpgsm'."""
+        vendor = X509SignatureVendor()
+        self.assertEqual(vendor.gpgsm_command, "gpgsm")
+
+    def test_gpgsm_program_from_config(self) -> None:
+        """Test reading gpg.x509.program from config."""
+        config = ConfigDict()
+        config.set((b"gpg", b"x509"), b"program", b"/usr/local/bin/gpgsm")
+
+        vendor = X509SignatureVendor(config=config)
+        self.assertEqual(vendor.gpgsm_command, "/usr/local/bin/gpgsm")
+
+    def test_gpgsm_program_override(self) -> None:
+        """Test gpgsm_command parameter overrides config."""
+        config = ConfigDict()
+        config.set((b"gpg", b"x509"), b"program", b"/usr/local/bin/gpgsm")
+
+        vendor = X509SignatureVendor(config=config, gpgsm_command="/custom/gpgsm")
+        self.assertEqual(vendor.gpgsm_command, "/custom/gpgsm")
+
+    def test_gpgsm_program_default_when_not_in_config(self) -> None:
+        """Test default when gpg.x509.program not in config."""
+        config = ConfigDict()
+        vendor = X509SignatureVendor(config=config)
+        self.assertEqual(vendor.gpgsm_command, "gpgsm")
+
+    @unittest.skipIf(
+        shutil.which("gpgsm") is None, "gpgsm command not available in PATH"
+    )
+    def test_sign_and_verify(self) -> None:
+        """Test basic X.509 sign and verify cycle.
+
+        Note: This test requires gpgsm and an X.509 certificate to be configured.
+        It may be skipped in environments without gpgsm/certificate setup.
+        """
+        vendor = X509SignatureVendor()
+        test_data = b"test data to sign"
+
+        try:
+            # Try to sign the data
+            signature = vendor.sign(test_data)
+            self.assertIsInstance(signature, bytes)
+            self.assertGreater(len(signature), 0)
+
+            # Verify the signature
+            vendor.verify(test_data, signature)
+        except subprocess.CalledProcessError:
+            # Skip test if no X.509 certificate is available
+            self.skipTest("No X.509 certificate available for signing")
+
+
 class GetSignatureVendorTests(unittest.TestCase):
     """Tests for get_signature_vendor function."""
 
@@ -308,11 +364,10 @@ class GetSignatureVendorTests(unittest.TestCase):
         vendor = get_signature_vendor(format="OpenPGP")
         self.assertIsInstance(vendor, (GPGSignatureVendor, GPGCliSignatureVendor))
 
-    def test_x509_not_supported(self) -> None:
-        """Test that x509 format raises ValueError."""
-        with self.assertRaises(ValueError) as cm:
-            get_signature_vendor(format="x509")
-        self.assertIn("X.509", str(cm.exception))
+    def test_x509_format_supported(self) -> None:
+        """Test that x509 format is now supported."""
+        vendor = get_signature_vendor(format="x509")
+        self.assertIsInstance(vendor, X509SignatureVendor)
 
     def test_ssh_format_supported(self) -> None:
         """Test that ssh format is now supported."""
@@ -342,6 +397,11 @@ class GetSignatureVendorTests(unittest.TestCase):
         # Should be either SSHSigSignatureVendor or SSHCliSignatureVendor
         self.assertIsInstance(vendor, (SSHSigSignatureVendor, SSHCliSignatureVendor))
 
+    def test_x509_format(self) -> None:
+        """Test requesting X.509 format."""
+        vendor = get_signature_vendor(format="x509")
+        self.assertIsInstance(vendor, X509SignatureVendor)
+
 
 class SSHSigSignatureVendorTests(unittest.TestCase):
     """Tests for SSHSigSignatureVendor (sshsig package implementation)."""
@@ -579,11 +639,10 @@ class GetSignatureVendorForSignatureTests(unittest.TestCase):
         self.assertIsInstance(vendor, (GPGSignatureVendor, GPGCliSignatureVendor))
 
     def test_get_vendor_for_x509_signature(self) -> None:
-        """Test that X.509 signature raises ValueError (not supported)."""
+        """Test getting vendor for X.509 signature."""
         x509_sig = b"-----BEGIN PKCS7-----\nfoo\n-----END PKCS7-----"
-        with self.assertRaises(ValueError) as cm:
-            get_signature_vendor_for_signature(x509_sig)
-        self.assertIn("X.509", str(cm.exception))
+        vendor = get_signature_vendor_for_signature(x509_sig)
+        self.assertIsInstance(vendor, X509SignatureVendor)
 
     def test_get_vendor_with_config(self) -> None:
         """Test that config is passed to vendor."""