Jelajahi Sumber

Add GPGCliSignatureVendor for CLI-based GPG signing

Jelmer Vernooij 4 minggu lalu
induk
melakukan
9ee769b317
2 mengubah file dengan 249 tambahan dan 1 penghapusan
  1. 128 0
      dulwich/signature.py
  2. 121 1
      tests/test_signature.py

+ 128 - 0
dulwich/signature.py

@@ -126,5 +126,133 @@ class GPGSignatureVendor(SignatureVendor):
                 )
 
 
+class GPGCliSignatureVendor(SignatureVendor):
+    """Signature vendor that uses the GPG command-line tool for signing and verification."""
+
+    def __init__(self, gpg_command: str = "gpg") -> None:
+        """Initialize the GPG CLI vendor.
+
+        Args:
+          gpg_command: Path to the GPG command (defaults to 'gpg')
+        """
+        self.gpg_command = gpg_command
+
+    def sign(self, data: bytes, keyid: str | None = None) -> bytes:
+        """Sign data with a GPG key using the command-line tool.
+
+        Args:
+          data: The data to sign
+          keyid: Optional GPG key ID to use for signing. If not specified,
+                 the default GPG key will be used.
+
+        Returns:
+          The signature as bytes
+
+        Raises:
+          subprocess.CalledProcessError: if GPG command fails
+        """
+        import subprocess
+
+        args = [self.gpg_command, "--detach-sign", "--armor"]
+        if keyid is not None:
+            args.extend(["--local-user", keyid])
+
+        result = subprocess.run(
+            args,
+            input=data,
+            capture_output=True,
+            check=True,
+        )
+        return result.stdout
+
+    def verify(
+        self, data: bytes, signature: bytes, keyids: Iterable[str] | None = None
+    ) -> None:
+        """Verify a GPG signature using the command-line tool.
+
+        Args:
+          data: The data that was signed
+          signature: The signature to verify
+          keyids: Optional iterable of trusted GPG key IDs.
+            If the signature was not created by any key in keyids, verification will
+            fail. If not specified, this function only verifies that the signature
+            is valid.
+
+        Raises:
+          subprocess.CalledProcessError: if GPG signature verification fails
+          ValueError: if signature was not created by a trusted key
+        """
+        import subprocess
+        import tempfile
+
+        # GPG requires the signature and data in separate files for verification
+        with (
+            tempfile.NamedTemporaryFile(mode="wb", suffix=".sig") as sig_file,
+            tempfile.NamedTemporaryFile(mode="wb", suffix=".dat") as data_file,
+        ):
+            sig_file.write(signature)
+            sig_file.flush()
+
+            data_file.write(data)
+            data_file.flush()
+
+            args = [self.gpg_command, "--verify", sig_file.name, data_file.name]
+
+            result = subprocess.run(
+                args,
+                capture_output=True,
+                check=True,
+            )
+
+            # If keyids are specified, check that the signature was made by one of them
+            if keyids:
+                # Parse stderr to extract the key fingerprint/ID that made the signature
+                stderr_text = result.stderr.decode("utf-8", errors="replace")
+
+                # GPG outputs both subkey and primary key fingerprints
+                # Collect both to check against trusted keyids
+                signing_keys = []
+                for line in stderr_text.split("\n"):
+                    if (
+                        "using RSA key" in line
+                        or "using DSA key" in line
+                        or "using EDDSA key" in line
+                        or "using ECDSA key" in line
+                    ):
+                        # Extract the key ID from lines like "gpg: using RSA key ABCD1234..."
+                        parts = line.split()
+                        if "key" in parts:
+                            key_idx = parts.index("key")
+                            if key_idx + 1 < len(parts):
+                                signing_keys.append(parts[key_idx + 1])
+                    elif "Primary key fingerprint:" in line:
+                        # Extract fingerprint
+                        fpr = line.split(":", 1)[1].strip().replace(" ", "")
+                        signing_keys.append(fpr)
+
+                if not signing_keys:
+                    raise ValueError("Could not determine signing key from GPG output")
+
+                # Check if any of the signing keys (subkey or primary) match the trusted keyids
+                keyids_normalized = [k.replace(" ", "").upper() for k in keyids]
+
+                # Check each signing key against trusted keyids
+                for signed_by in signing_keys:
+                    signed_by_normalized = signed_by.replace(" ", "").upper()
+                    # Check if signed_by matches or is a suffix of any trusted keyid
+                    # (GPG sometimes shows short key IDs)
+                    if any(
+                        signed_by_normalized in keyid or keyid in signed_by_normalized
+                        for keyid in keyids_normalized
+                    ):
+                        return
+
+                # None of the signing keys matched
+                raise ValueError(
+                    f"Signature not created by a trusted key. "
+                    f"Signed by: {signing_keys}, trusted keys: {list(keyids)}"
+                )
+
+
 # Default GPG vendor instance
 gpg_vendor = GPGSignatureVendor()

+ 121 - 1
tests/test_signature.py

@@ -21,9 +21,11 @@
 
 """Tests for signature vendors."""
 
+import shutil
+import subprocess
 import unittest
 
-from dulwich.signature import GPGSignatureVendor, SignatureVendor
+from dulwich.signature import GPGCliSignatureVendor, GPGSignatureVendor, SignatureVendor
 
 try:
     import gpg
@@ -106,3 +108,121 @@ class GPGSignatureVendorTests(unittest.TestCase):
                 vendor.verify(test_data, signature)
         except gpg.errors.GPGMEError as e:
             self.skipTest(f"GPG key not available: {e}")
+
+
+class GPGCliSignatureVendorTests(unittest.TestCase):
+    """Tests for GPGCliSignatureVendor."""
+
+    def setUp(self) -> None:
+        """Check if gpg command is available."""
+        if shutil.which("gpg") is None:
+            self.skipTest("gpg command not available")
+
+    def test_sign_and_verify(self) -> None:
+        """Test basic sign and verify cycle using CLI."""
+        vendor = GPGCliSignatureVendor()
+        test_data = b"test data to sign"
+
+        try:
+            # Sign the data
+            signature = vendor.sign(test_data)
+            self.assertIsInstance(signature, bytes)
+            self.assertGreater(len(signature), 0)
+            self.assertTrue(signature.startswith(b"-----BEGIN PGP SIGNATURE-----"))
+
+            # Verify the signature
+            vendor.verify(test_data, signature)
+        except subprocess.CalledProcessError as e:
+            # Skip test if no GPG key is available or configured
+            self.skipTest(f"GPG signing failed: {e}")
+
+    def test_verify_invalid_signature(self) -> None:
+        """Test that verify raises an error for invalid signatures."""
+        vendor = GPGCliSignatureVendor()
+        test_data = b"test data"
+        invalid_signature = b"this is not a valid signature"
+
+        with self.assertRaises(subprocess.CalledProcessError):
+            vendor.verify(test_data, invalid_signature)
+
+    def test_sign_with_keyid(self) -> None:
+        """Test signing with a specific key ID using CLI."""
+        vendor = GPGCliSignatureVendor()
+        test_data = b"test data to sign"
+
+        try:
+            # Try to get a key from the keyring
+            result = subprocess.run(
+                ["gpg", "--list-secret-keys", "--with-colons"],
+                capture_output=True,
+                check=True,
+                text=True,
+            )
+
+            # Parse output to find a key fingerprint
+            keyid = None
+            for line in result.stdout.split("\n"):
+                if line.startswith("fpr:"):
+                    keyid = line.split(":")[9]
+                    break
+
+            if not keyid:
+                self.skipTest("No GPG keys available for testing")
+
+            signature = vendor.sign(test_data, keyid=keyid)
+            self.assertIsInstance(signature, bytes)
+            self.assertGreater(len(signature), 0)
+
+            # Verify the signature
+            vendor.verify(test_data, signature)
+        except subprocess.CalledProcessError as e:
+            self.skipTest(f"GPG key not available: {e}")
+
+    def test_verify_with_keyids(self) -> None:
+        """Test verifying with specific trusted key IDs."""
+        vendor = GPGCliSignatureVendor()
+        test_data = b"test data to sign"
+
+        try:
+            # Sign without specifying a key (use default)
+            signature = vendor.sign(test_data)
+
+            # Get the primary key fingerprint from the keyring
+            result = subprocess.run(
+                ["gpg", "--list-secret-keys", "--with-colons"],
+                capture_output=True,
+                check=True,
+                text=True,
+            )
+
+            primary_keyid = None
+            for line in result.stdout.split("\n"):
+                if line.startswith("fpr:"):
+                    primary_keyid = line.split(":")[9]
+                    break
+
+            if not primary_keyid:
+                self.skipTest("No GPG keys available for testing")
+
+            # Verify with the correct primary keyid - should succeed
+            # (GPG shows primary key fingerprint even if signed by subkey)
+            vendor.verify(test_data, signature, keyids=[primary_keyid])
+
+            # Verify with a different keyid - should fail
+            fake_keyid = "0" * 40  # Fake 40-character fingerprint
+            with self.assertRaises(ValueError):
+                vendor.verify(test_data, signature, keyids=[fake_keyid])
+
+        except subprocess.CalledProcessError as e:
+            self.skipTest(f"GPG key not available: {e}")
+
+    def test_custom_gpg_command(self) -> None:
+        """Test using a custom GPG command path."""
+        vendor = GPGCliSignatureVendor(gpg_command="gpg")
+        test_data = b"test data"
+
+        try:
+            signature = vendor.sign(test_data)
+            self.assertIsInstance(signature, bytes)
+        except subprocess.CalledProcessError as e:
+            self.skipTest(f"GPG not available: {e}")