|
|
@@ -1,8 +1,9 @@
|
|
|
# signature.py -- Signature vendors for signing and verifying Git objects
|
|
|
# Copyright (C) 2025 Jelmer Vernooij <jelmer@jelmer.uk>
|
|
|
#
|
|
|
+# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
|
|
|
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
|
|
|
-# General Public License as public by the Free Software Foundation; version 2.0
|
|
|
+# General Public License as published by the Free Software Foundation; version 2.0
|
|
|
# or (at your option) any later version. You can redistribute it and/or
|
|
|
# modify it under the terms of either of these two licenses.
|
|
|
#
|
|
|
@@ -26,6 +27,70 @@ from typing import TYPE_CHECKING
|
|
|
if TYPE_CHECKING:
|
|
|
from dulwich.config import Config
|
|
|
|
|
|
+__all__ = [
|
|
|
+ "SIGNATURE_FORMAT_OPENPGP",
|
|
|
+ "SIGNATURE_FORMAT_SSH",
|
|
|
+ "SIGNATURE_FORMAT_X509",
|
|
|
+ "BadSignature",
|
|
|
+ "SignatureVendor",
|
|
|
+ "SignatureVerificationError",
|
|
|
+ "UntrustedSignature",
|
|
|
+ "detect_signature_format",
|
|
|
+ "get_available_vendors",
|
|
|
+ "get_signature_vendor",
|
|
|
+ "get_signature_vendor_for_signature",
|
|
|
+]
|
|
|
+
|
|
|
+
|
|
|
+# Signature verification exceptions
|
|
|
+class SignatureVerificationError(Exception):
|
|
|
+ """Base exception for signature verification failures."""
|
|
|
+
|
|
|
+
|
|
|
+class BadSignature(SignatureVerificationError):
|
|
|
+ """Exception raised when a signature is invalid or cannot be verified.
|
|
|
+
|
|
|
+ Attributes:
|
|
|
+ detail: Optional additional detail about the failure
|
|
|
+ """
|
|
|
+
|
|
|
+ def __init__(self, message: str, detail: str | None = None):
|
|
|
+ """Initialize BadSignature exception.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ message: Error message
|
|
|
+ detail: Optional additional detail about the failure
|
|
|
+ """
|
|
|
+ super().__init__(message)
|
|
|
+ self.detail = detail
|
|
|
+
|
|
|
+
|
|
|
+class UntrustedSignature(SignatureVerificationError):
|
|
|
+ """Exception raised when a signature is not from a trusted key.
|
|
|
+
|
|
|
+ Attributes:
|
|
|
+ signing_keys: List of key IDs that signed the data (if determinable)
|
|
|
+ trusted_keys: List of key IDs that were trusted (if applicable)
|
|
|
+ """
|
|
|
+
|
|
|
+ def __init__(
|
|
|
+ self,
|
|
|
+ message: str,
|
|
|
+ signing_keys: list[str] | None = None,
|
|
|
+ trusted_keys: list[str] | None = None,
|
|
|
+ ):
|
|
|
+ """Initialize UntrustedSignature exception.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ message: Error message
|
|
|
+ signing_keys: List of key IDs that signed the data (if determinable)
|
|
|
+ trusted_keys: List of key IDs that were trusted (if applicable)
|
|
|
+ """
|
|
|
+ super().__init__(message)
|
|
|
+ self.signing_keys = signing_keys or []
|
|
|
+ self.trusted_keys = trusted_keys or []
|
|
|
+
|
|
|
+
|
|
|
# Git signature format constants
|
|
|
SIGNATURE_FORMAT_OPENPGP = "openpgp"
|
|
|
SIGNATURE_FORMAT_X509 = "x509"
|
|
|
@@ -79,7 +144,8 @@ class SignatureVendor:
|
|
|
is valid.
|
|
|
|
|
|
Raises:
|
|
|
- Exception on verification failure (implementation-specific)
|
|
|
+ BadSignature: if signature verification fails
|
|
|
+ UntrustedSignature: if signature was not created by a trusted key
|
|
|
"""
|
|
|
raise NotImplementedError(self.verify)
|
|
|
|
|
|
@@ -162,10 +228,9 @@ class GPGSignatureVendor(SignatureVendor):
|
|
|
is valid.
|
|
|
|
|
|
Raises:
|
|
|
- gpg.errors.BadSignatures: if GPG signature verification fails
|
|
|
- gpg.errors.MissingSignatures: if the signature was not created by a key
|
|
|
- specified in keyids
|
|
|
- ValueError: if signature trust level is below minimum configured level
|
|
|
+ BadSignature: if GPG signature verification fails
|
|
|
+ UntrustedSignature: if the signature was not created by a key
|
|
|
+ specified in keyids or trust level is below minimum
|
|
|
"""
|
|
|
import gpg
|
|
|
|
|
|
@@ -178,33 +243,40 @@ class GPGSignatureVendor(SignatureVendor):
|
|
|
"ultimate": gpg.constants.validity.ULTIMATE,
|
|
|
}
|
|
|
|
|
|
- with gpg.Context() as ctx:
|
|
|
- verified_data, result = ctx.verify(
|
|
|
- data,
|
|
|
- signature=signature,
|
|
|
- )
|
|
|
-
|
|
|
- # Check minimum trust level if configured
|
|
|
- if self.min_trust_level is not None:
|
|
|
- min_validity = trust_level_map.get(self.min_trust_level)
|
|
|
- if min_validity is not None:
|
|
|
- for sig in result.signatures:
|
|
|
- if sig.validity < min_validity:
|
|
|
- raise ValueError(
|
|
|
- f"Signature trust level {sig.validity} is below "
|
|
|
- f"minimum required level {self.min_trust_level}"
|
|
|
- )
|
|
|
+ try:
|
|
|
+ with gpg.Context() as ctx:
|
|
|
+ _verified_data, result = ctx.verify(
|
|
|
+ data,
|
|
|
+ signature=signature,
|
|
|
+ )
|
|
|
|
|
|
- if keyids:
|
|
|
- keys = [ctx.get_key(key) for key in keyids]
|
|
|
- for key in keys:
|
|
|
- for subkey in key.subkeys:
|
|
|
+ # Check minimum trust level if configured
|
|
|
+ if self.min_trust_level is not None:
|
|
|
+ min_validity = trust_level_map.get(self.min_trust_level)
|
|
|
+ if min_validity is not None:
|
|
|
for sig in result.signatures:
|
|
|
- if subkey.can_sign and subkey.fpr == sig.fpr:
|
|
|
- return
|
|
|
- raise gpg.errors.MissingSignatures(
|
|
|
- result, keys, results=(verified_data, result)
|
|
|
- )
|
|
|
+ if sig.validity < min_validity:
|
|
|
+ raise UntrustedSignature(
|
|
|
+ f"Signature trust level {sig.validity} is below "
|
|
|
+ f"minimum required level {self.min_trust_level}"
|
|
|
+ )
|
|
|
+
|
|
|
+ if keyids:
|
|
|
+ keys = [ctx.get_key(key) for key in keyids]
|
|
|
+ for key in keys:
|
|
|
+ for subkey in key.subkeys:
|
|
|
+ for sig in result.signatures:
|
|
|
+ if subkey.can_sign and subkey.fpr == sig.fpr:
|
|
|
+ return
|
|
|
+ # Extract signing key fingerprints from the signatures
|
|
|
+ signing_fprs = [sig.fpr for sig in result.signatures]
|
|
|
+ raise UntrustedSignature(
|
|
|
+ f"Signature not created by any of the trusted keys: {keyids}",
|
|
|
+ signing_keys=signing_fprs,
|
|
|
+ trusted_keys=list(keyids),
|
|
|
+ )
|
|
|
+ except gpg.errors.BadSignatures as e:
|
|
|
+ raise BadSignature(f"GPG signature verification failed: {e}") from e
|
|
|
|
|
|
|
|
|
class GPGCliSignatureVendor(SignatureVendor):
|
|
|
@@ -286,8 +358,8 @@ class GPGCliSignatureVendor(SignatureVendor):
|
|
|
is valid.
|
|
|
|
|
|
Raises:
|
|
|
- subprocess.CalledProcessError: if GPG signature verification fails
|
|
|
- ValueError: if signature was not created by a trusted key
|
|
|
+ BadSignature: if GPG signature verification fails
|
|
|
+ UntrustedSignature: if signature was not created by a trusted key
|
|
|
"""
|
|
|
import subprocess
|
|
|
import tempfile
|
|
|
@@ -305,11 +377,16 @@ class GPGCliSignatureVendor(SignatureVendor):
|
|
|
|
|
|
args = [self.gpg_command, "--verify", sig_file.name, data_file.name]
|
|
|
|
|
|
- result = subprocess.run(
|
|
|
- args,
|
|
|
- capture_output=True,
|
|
|
- check=True,
|
|
|
- )
|
|
|
+ try:
|
|
|
+ result = subprocess.run(
|
|
|
+ args,
|
|
|
+ capture_output=True,
|
|
|
+ check=True,
|
|
|
+ )
|
|
|
+ except subprocess.CalledProcessError as e:
|
|
|
+ raise BadSignature(
|
|
|
+ f"GPG signature verification failed: {e.stderr.decode('utf-8', errors='replace')}"
|
|
|
+ ) from e
|
|
|
|
|
|
# If keyids are specified, check that the signature was made by one of them
|
|
|
if keyids:
|
|
|
@@ -338,7 +415,9 @@ class GPGCliSignatureVendor(SignatureVendor):
|
|
|
signing_keys.append(fpr)
|
|
|
|
|
|
if not signing_keys:
|
|
|
- raise ValueError("Could not determine signing key from GPG output")
|
|
|
+ raise UntrustedSignature(
|
|
|
+ "Could not determine signing key from GPG output"
|
|
|
+ )
|
|
|
|
|
|
# Check if any of the signing keys (subkey or primary) match the trusted keyids
|
|
|
keyids_normalized = [k.replace(" ", "").upper() for k in keyids]
|
|
|
@@ -355,9 +434,11 @@ class GPGCliSignatureVendor(SignatureVendor):
|
|
|
return
|
|
|
|
|
|
# None of the signing keys matched
|
|
|
- raise ValueError(
|
|
|
+ raise UntrustedSignature(
|
|
|
f"Signature not created by a trusted key. "
|
|
|
- f"Signed by: {signing_keys}, trusted keys: {list(keyids)}"
|
|
|
+ f"Signed by: {signing_keys}, trusted keys: {list(keyids)}",
|
|
|
+ signing_keys=signing_keys,
|
|
|
+ trusted_keys=list(keyids),
|
|
|
)
|
|
|
|
|
|
|
|
|
@@ -443,8 +524,8 @@ class X509SignatureVendor(SignatureVendor):
|
|
|
fail. If not specified, this function only verifies that the signature is valid.
|
|
|
|
|
|
Raises:
|
|
|
- subprocess.CalledProcessError: if gpgsm signature verification fails
|
|
|
- ValueError: if signature was not created by a trusted certificate
|
|
|
+ BadSignature: if gpgsm signature verification fails
|
|
|
+ UntrustedSignature: if signature was not created by a trusted certificate
|
|
|
"""
|
|
|
import subprocess
|
|
|
import tempfile
|
|
|
@@ -462,11 +543,16 @@ class X509SignatureVendor(SignatureVendor):
|
|
|
|
|
|
args = [self.gpgsm_command, "--verify", sig_file.name, data_file.name]
|
|
|
|
|
|
- result = subprocess.run(
|
|
|
- args,
|
|
|
- capture_output=True,
|
|
|
- check=True,
|
|
|
- )
|
|
|
+ try:
|
|
|
+ result = subprocess.run(
|
|
|
+ args,
|
|
|
+ capture_output=True,
|
|
|
+ check=True,
|
|
|
+ )
|
|
|
+ except subprocess.CalledProcessError as e:
|
|
|
+ raise BadSignature(
|
|
|
+ f"X.509 signature verification failed: {e.stderr.decode('utf-8', errors='replace')}"
|
|
|
+ ) from e
|
|
|
|
|
|
# If keyids are specified, check that the signature was made by one of them
|
|
|
if keyids:
|
|
|
@@ -488,7 +574,7 @@ class X509SignatureVendor(SignatureVendor):
|
|
|
signing_certs.append(part)
|
|
|
|
|
|
if not signing_certs:
|
|
|
- raise ValueError(
|
|
|
+ raise UntrustedSignature(
|
|
|
"Could not determine signing certificate from gpgsm output"
|
|
|
)
|
|
|
|
|
|
@@ -504,9 +590,11 @@ class X509SignatureVendor(SignatureVendor):
|
|
|
return
|
|
|
|
|
|
# None of the signing certs matched
|
|
|
- raise ValueError(
|
|
|
+ raise UntrustedSignature(
|
|
|
f"Signature not created by a trusted certificate. "
|
|
|
- f"Signed by: {signing_certs}, trusted certs: {list(keyids)}"
|
|
|
+ f"Signed by: {signing_certs}, trusted certs: {list(keyids)}",
|
|
|
+ signing_keys=signing_certs,
|
|
|
+ trusted_keys=list(keyids),
|
|
|
)
|
|
|
|
|
|
|
|
|
@@ -600,8 +688,8 @@ class SSHSigSignatureVendor(SignatureVendor):
|
|
|
If not provided, uses gpg.ssh.allowedSignersFile from config.
|
|
|
|
|
|
Raises:
|
|
|
- ValueError: if no allowed signers are configured or provided
|
|
|
- sshsig.sshsig.InvalidSignature: if signature verification fails
|
|
|
+ UntrustedSignature: if no allowed signers are configured or provided
|
|
|
+ BadSignature: if signature verification fails
|
|
|
"""
|
|
|
from typing import Any
|
|
|
|
|
|
@@ -643,17 +731,17 @@ class SSHSigSignatureVendor(SignatureVendor):
|
|
|
)
|
|
|
)
|
|
|
except FileNotFoundError:
|
|
|
- raise ValueError(
|
|
|
+ raise UntrustedSignature(
|
|
|
f"Allowed signers file not found: {self.allowed_signers_file}"
|
|
|
)
|
|
|
else:
|
|
|
- raise ValueError(
|
|
|
+ raise UntrustedSignature(
|
|
|
"SSH signature verification requires either keyids or "
|
|
|
"gpg.ssh.allowedSignersFile to be configured"
|
|
|
)
|
|
|
|
|
|
if not allowed_keys:
|
|
|
- raise ValueError("No valid allowed signers found")
|
|
|
+ raise UntrustedSignature("No valid allowed signers found")
|
|
|
|
|
|
# Verify the signature
|
|
|
# sshsig.verify expects armored signature as string
|
|
|
@@ -662,12 +750,15 @@ class SSHSigSignatureVendor(SignatureVendor):
|
|
|
)
|
|
|
|
|
|
# Verify with namespace "git" (Git's default)
|
|
|
- sshsig.sshsig.verify(
|
|
|
- msg_in=data,
|
|
|
- armored_signature=sig_str,
|
|
|
- allowed_signers=allowed_keys,
|
|
|
- namespace="git",
|
|
|
- )
|
|
|
+ try:
|
|
|
+ sshsig.sshsig.verify(
|
|
|
+ msg_in=data,
|
|
|
+ armored_signature=sig_str,
|
|
|
+ allowed_signers=allowed_keys,
|
|
|
+ namespace="git",
|
|
|
+ )
|
|
|
+ except Exception as e:
|
|
|
+ raise BadSignature(f"SSH signature verification failed: {e}") from e
|
|
|
|
|
|
|
|
|
class SSHCliSignatureVendor(SignatureVendor):
|
|
|
@@ -818,19 +909,25 @@ class SSHCliSignatureVendor(SignatureVendor):
|
|
|
Args:
|
|
|
data: The data that was signed
|
|
|
signature: The signature to verify
|
|
|
- keyids: Not used for SSH verification. Instead, allowed signers
|
|
|
- are read from gpg.ssh.allowedSignersFile config
|
|
|
+ keyids: Not supported for SSH CLI verification. If provided, an error
|
|
|
+ will be raised. Use gpg.ssh.allowedSignersFile config instead.
|
|
|
|
|
|
Raises:
|
|
|
- subprocess.CalledProcessError: if signature verification fails
|
|
|
- ValueError: if allowedSignersFile is not configured
|
|
|
+ BadSignature: if signature verification fails
|
|
|
+ UntrustedSignature: if allowedSignersFile is not configured or keyids is provided
|
|
|
"""
|
|
|
import os
|
|
|
import subprocess
|
|
|
import tempfile
|
|
|
|
|
|
+ if keyids is not None:
|
|
|
+ raise UntrustedSignature(
|
|
|
+ "SSHCliSignatureVendor does not support keyids parameter. "
|
|
|
+ "Use gpg.ssh.allowedSignersFile configuration instead."
|
|
|
+ )
|
|
|
+
|
|
|
if self.allowed_signers_file is None:
|
|
|
- raise ValueError(
|
|
|
+ raise UntrustedSignature(
|
|
|
"SSH signature verification requires gpg.ssh.allowedSignersFile "
|
|
|
"to be configured"
|
|
|
)
|
|
|
@@ -867,12 +964,17 @@ class SSHCliSignatureVendor(SignatureVendor):
|
|
|
if self.revocation_file:
|
|
|
args.extend(["-r", self.revocation_file])
|
|
|
|
|
|
- subprocess.run(
|
|
|
- args,
|
|
|
- stdin=open(data_filename, "rb"),
|
|
|
- capture_output=True,
|
|
|
- check=True,
|
|
|
- )
|
|
|
+ try:
|
|
|
+ subprocess.run(
|
|
|
+ args,
|
|
|
+ stdin=open(data_filename, "rb"),
|
|
|
+ capture_output=True,
|
|
|
+ check=True,
|
|
|
+ )
|
|
|
+ except subprocess.CalledProcessError as e:
|
|
|
+ raise BadSignature(
|
|
|
+ f"SSH signature verification failed: {e.stderr.decode('utf-8', errors='replace')}"
|
|
|
+ ) from e
|
|
|
|
|
|
|
|
|
def get_signature_vendor(
|
|
|
@@ -993,5 +1095,54 @@ def get_signature_vendor_for_signature(
|
|
|
return get_signature_vendor(format=format, config=config)
|
|
|
|
|
|
|
|
|
+def get_available_vendors() -> dict[str, list[type[SignatureVendor]]]:
|
|
|
+ """Get all available signature vendors on this system.
|
|
|
+
|
|
|
+ Returns a dictionary mapping signature format names to lists of available
|
|
|
+ vendor classes for each format. Only vendors whose dependencies are available
|
|
|
+ are included.
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Dictionary mapping format names (e.g. "openpgp", "ssh", "x509") to lists
|
|
|
+ of available vendor classes. If no vendors are available for a format,
|
|
|
+ that format will not be present in the dictionary.
|
|
|
+
|
|
|
+ Example:
|
|
|
+ >>> vendors = get_available_vendors()
|
|
|
+ >>> if "openpgp" in vendors:
|
|
|
+ ... print(f"GPG vendors: {vendors['openpgp']}")
|
|
|
+ >>> if "ssh" in vendors:
|
|
|
+ ... print(f"SSH vendors: {vendors['ssh']}")
|
|
|
+ """
|
|
|
+ available: dict[str, list[type[SignatureVendor]]] = {}
|
|
|
+
|
|
|
+ # Check OpenPGP/GPG vendors
|
|
|
+ openpgp_vendors: list[type[SignatureVendor]] = []
|
|
|
+ if GPGSignatureVendor.available():
|
|
|
+ openpgp_vendors.append(GPGSignatureVendor)
|
|
|
+ if GPGCliSignatureVendor.available():
|
|
|
+ openpgp_vendors.append(GPGCliSignatureVendor)
|
|
|
+ if openpgp_vendors:
|
|
|
+ available[SIGNATURE_FORMAT_OPENPGP] = openpgp_vendors
|
|
|
+
|
|
|
+ # Check X.509 vendors
|
|
|
+ x509_vendors: list[type[SignatureVendor]] = []
|
|
|
+ if X509SignatureVendor.available():
|
|
|
+ x509_vendors.append(X509SignatureVendor)
|
|
|
+ if x509_vendors:
|
|
|
+ available[SIGNATURE_FORMAT_X509] = x509_vendors
|
|
|
+
|
|
|
+ # Check SSH vendors
|
|
|
+ ssh_vendors: list[type[SignatureVendor]] = []
|
|
|
+ if SSHSigSignatureVendor.available():
|
|
|
+ ssh_vendors.append(SSHSigSignatureVendor)
|
|
|
+ if SSHCliSignatureVendor.available():
|
|
|
+ ssh_vendors.append(SSHCliSignatureVendor)
|
|
|
+ if ssh_vendors:
|
|
|
+ available[SIGNATURE_FORMAT_SSH] = ssh_vendors
|
|
|
+
|
|
|
+ return available
|
|
|
+
|
|
|
+
|
|
|
# Default GPG vendor instance
|
|
|
gpg_vendor = GPGSignatureVendor()
|