Ver Fonte

Add SignatureVendor abstraction for signing and verifying Git objects

Jelmer Vernooij há 4 semanas atrás
pai
commit
ffcaa049bc
3 ficheiros alterados com 248 adições e 56 exclusões
  1. 10 56
      dulwich/objects.py
  2. 130 0
      dulwich/signature.py
  3. 108 0
      tests/test_signature.py

+ 10 - 56
dulwich/objects.py

@@ -1261,20 +1261,9 @@ class Tag(ShaFile):
           keyid: Optional GPG key ID to use for signing. If not specified,
                  the default GPG key will be used.
         """
-        import gpg
-
-        with gpg.Context(armor=True) as c:
-            if keyid is not None:
-                key = c.get_key(keyid)
-                with gpg.Context(armor=True, signers=[key]) as ctx:
-                    self.signature, _unused_result = ctx.sign(
-                        self.as_raw_string(),
-                        mode=gpg.constants.sig.mode.DETACH,
-                    )
-            else:
-                self.signature, _unused_result = c.sign(
-                    self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH
-                )
+        from dulwich.signature import gpg_vendor
+
+        self.signature = gpg_vendor.sign(self.as_raw_string(), keyid=keyid)
 
     def raw_without_sig(self) -> bytes:
         """Return raw string serialization without the GPG/SSH signature.
@@ -1331,21 +1320,9 @@ class Tag(ShaFile):
         if self._signature is None:
             return
 
-        import gpg
+        from dulwich.signature import gpg_vendor
 
-        with gpg.Context() as ctx:
-            data, result = ctx.verify(
-                self.raw_without_sig(),
-                signature=self._signature,
-            )
-            if keyids:
-                keys = [ctx.get_key(key) for key in keyids]
-                for key in keys:
-                    for subkey in key.subkeys:
-                        for sig in result.signatures:
-                            if subkey.can_sign and subkey.fpr == sig.fpr:
-                                return
-                raise gpg.errors.MissingSignatures(result, keys, results=(data, result))
+        gpg_vendor.verify(self.raw_without_sig(), self._signature, keyids=keyids)
 
 
 class TreeEntry(NamedTuple):
@@ -2177,20 +2154,9 @@ class Commit(ShaFile):
           keyid: Optional GPG key ID to use for signing. If not specified,
                  the default GPG key will be used.
         """
-        import gpg
-
-        with gpg.Context(armor=True) as c:
-            if keyid is not None:
-                key = c.get_key(keyid)
-                with gpg.Context(armor=True, signers=[key]) as ctx:
-                    self.gpgsig, _unused_result = ctx.sign(
-                        self.as_raw_string(),
-                        mode=gpg.constants.sig.mode.DETACH,
-                    )
-            else:
-                self.gpgsig, _unused_result = c.sign(
-                    self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH
-                )
+        from dulwich.signature import gpg_vendor
+
+        self.gpgsig = gpg_vendor.sign(self.as_raw_string(), keyid=keyid)
 
     def raw_without_sig(self) -> bytes:
         """Return raw string serialization without the GPG/SSH signature.
@@ -2248,21 +2214,9 @@ class Commit(ShaFile):
         if self._gpgsig is None:
             return
 
-        import gpg
+        from dulwich.signature import gpg_vendor
 
-        with gpg.Context() as ctx:
-            data, result = ctx.verify(
-                self.raw_without_sig(),
-                signature=self._gpgsig,
-            )
-            if keyids:
-                keys = [ctx.get_key(key) for key in keyids]
-                for key in keys:
-                    for subkey in key.subkeys:
-                        for sig in result.signatures:
-                            if subkey.can_sign and subkey.fpr == sig.fpr:
-                                return
-                raise gpg.errors.MissingSignatures(result, keys, results=(data, result))
+        gpg_vendor.verify(self.raw_without_sig(), self._gpgsig, keyids=keyids)
 
     def _serialize(self) -> list[bytes]:
         headers = []

+ 130 - 0
dulwich/signature.py

@@ -0,0 +1,130 @@
+# signature.py -- Signature vendors for signing and verifying Git objects
+# Copyright (C) 2025 Jelmer Vernooij <jelmer@jelmer.uk>
+#
+# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
+# General Public License as public by the Free Software Foundation; version 2.0
+# or (at your option) any later version. You can redistribute it and/or
+# modify it under the terms of either of these two licenses.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# You should have received a copy of the licenses; if not, see
+# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
+# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
+# License, Version 2.0.
+#
+
+"""Signature vendors for signing and verifying Git objects."""
+
+from collections.abc import Iterable
+
+
+class SignatureVendor:
+    """A signature implementation for signing and verifying Git objects."""
+
+    def sign(self, data: bytes, keyid: str | None = None) -> bytes:
+        """Sign data with a key.
+
+        Args:
+          data: The data to sign
+          keyid: Optional key ID to use for signing. If not specified,
+                 the default key will be used.
+
+        Returns:
+          The signature as bytes
+        """
+        raise NotImplementedError(self.sign)
+
+    def verify(
+        self, data: bytes, signature: bytes, keyids: Iterable[str] | None = None
+    ) -> None:
+        """Verify a signature.
+
+        Args:
+          data: The data that was signed
+          signature: The signature to verify
+          keyids: Optional iterable of trusted key IDs.
+            If the signature was not created by any key in keyids, verification will
+            fail. If not specified, this function only verifies that the signature
+            is valid.
+
+        Raises:
+          Exception on verification failure (implementation-specific)
+        """
+        raise NotImplementedError(self.verify)
+
+
+class GPGSignatureVendor(SignatureVendor):
+    """Signature vendor that uses the GPG package for signing and verification."""
+
+    def sign(self, data: bytes, keyid: str | None = None) -> bytes:
+        """Sign data with a GPG key.
+
+        Args:
+          data: The data to sign
+          keyid: Optional GPG key ID to use for signing. If not specified,
+                 the default GPG key will be used.
+
+        Returns:
+          The signature as bytes
+        """
+        import gpg
+
+        signature: bytes
+        with gpg.Context(armor=True) as c:
+            if keyid is not None:
+                key = c.get_key(keyid)
+                with gpg.Context(armor=True, signers=[key]) as ctx:
+                    signature, _unused_result = ctx.sign(
+                        data,
+                        mode=gpg.constants.sig.mode.DETACH,
+                    )
+            else:
+                signature, _unused_result = c.sign(
+                    data, mode=gpg.constants.sig.mode.DETACH
+                )
+        return signature
+
+    def verify(
+        self, data: bytes, signature: bytes, keyids: Iterable[str] | None = None
+    ) -> None:
+        """Verify a GPG signature.
+
+        Args:
+          data: The data that was signed
+          signature: The signature to verify
+          keyids: Optional iterable of trusted GPG key IDs.
+            If the signature was not created by any key in keyids, verification will
+            fail. If not specified, this function only verifies that the signature
+            is valid.
+
+        Raises:
+          gpg.errors.BadSignatures: if GPG signature verification fails
+          gpg.errors.MissingSignatures: if the signature was not created by a key
+            specified in keyids
+        """
+        import gpg
+
+        with gpg.Context() as ctx:
+            verified_data, result = ctx.verify(
+                data,
+                signature=signature,
+            )
+            if keyids:
+                keys = [ctx.get_key(key) for key in keyids]
+                for key in keys:
+                    for subkey in key.subkeys:
+                        for sig in result.signatures:
+                            if subkey.can_sign and subkey.fpr == sig.fpr:
+                                return
+                raise gpg.errors.MissingSignatures(
+                    result, keys, results=(verified_data, result)
+                )
+
+
+# Default GPG vendor instance
+gpg_vendor = GPGSignatureVendor()

+ 108 - 0
tests/test_signature.py

@@ -0,0 +1,108 @@
+# test_signature.py -- tests for signature.py
+# Copyright (C) 2025 Jelmer Vernooij <jelmer@jelmer.uk>
+#
+# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
+# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
+# General Public License as published by the Free Software Foundation; version 2.0
+# or (at your option) any later version. You can redistribute it and/or
+# modify it under the terms of either of these two licenses.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# You should have received a copy of the licenses; if not, see
+# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
+# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
+# License, Version 2.0.
+#
+
+"""Tests for signature vendors."""
+
+import unittest
+
+from dulwich.signature import GPGSignatureVendor, SignatureVendor
+
+try:
+    import gpg
+except ImportError:
+    gpg = None
+
+
+class SignatureVendorTests(unittest.TestCase):
+    """Tests for SignatureVendor base class."""
+
+    def test_sign_not_implemented(self) -> None:
+        """Test that sign raises NotImplementedError."""
+        vendor = SignatureVendor()
+        with self.assertRaises(NotImplementedError):
+            vendor.sign(b"test data")
+
+    def test_verify_not_implemented(self) -> None:
+        """Test that verify raises NotImplementedError."""
+        vendor = SignatureVendor()
+        with self.assertRaises(NotImplementedError):
+            vendor.verify(b"test data", b"fake signature")
+
+
+@unittest.skipIf(gpg is None, "gpg not available")
+class GPGSignatureVendorTests(unittest.TestCase):
+    """Tests for GPGSignatureVendor."""
+
+    def test_sign_and_verify(self) -> None:
+        """Test basic sign and verify cycle.
+
+        Note: This test requires a GPG key to be configured in the test
+        environment. It may be skipped in environments without GPG setup.
+        """
+        vendor = GPGSignatureVendor()
+        test_data = b"test data to sign"
+
+        try:
+            # Sign the data
+            signature = vendor.sign(test_data)
+            self.assertIsInstance(signature, bytes)
+            self.assertGreater(len(signature), 0)
+
+            # Verify the signature
+            vendor.verify(test_data, signature)
+        except gpg.errors.GPGMEError as e:
+            # Skip test if no GPG key is available
+            self.skipTest(f"GPG key not available: {e}")
+
+    def test_verify_invalid_signature(self) -> None:
+        """Test that verify raises an error for invalid signatures."""
+        vendor = GPGSignatureVendor()
+        test_data = b"test data"
+        invalid_signature = b"this is not a valid signature"
+
+        with self.assertRaises(gpg.errors.GPGMEError):
+            vendor.verify(test_data, invalid_signature)
+
+    def test_sign_with_keyid(self) -> None:
+        """Test signing with a specific key ID.
+
+        Note: This test requires a GPG key to be configured in the test
+        environment. It may be skipped in environments without GPG setup.
+        """
+        vendor = GPGSignatureVendor()
+        test_data = b"test data to sign"
+
+        try:
+            # Try to get a key from the keyring
+            with gpg.Context() as ctx:
+                keys = list(ctx.keylist(secret=True))
+                if not keys:
+                    self.skipTest("No GPG keys available for testing")
+
+                key = keys[0]
+                signature = vendor.sign(test_data, keyid=key.fpr)
+                self.assertIsInstance(signature, bytes)
+                self.assertGreater(len(signature), 0)
+
+                # Verify the signature
+                vendor.verify(test_data, signature)
+        except gpg.errors.GPGMEError as e:
+            self.skipTest(f"GPG key not available: {e}")