|
|
@@ -32,7 +32,6 @@ from dulwich.signature import (
|
|
|
SIGNATURE_FORMAT_X509,
|
|
|
GPGCliSignatureVendor,
|
|
|
GPGSignatureVendor,
|
|
|
- SignatureVendor,
|
|
|
SSHCliSignatureVendor,
|
|
|
SSHSigSignatureVendor,
|
|
|
X509SignatureVendor,
|
|
|
@@ -98,22 +97,6 @@ def get_valid_gpg_key_cli() -> str | None:
|
|
|
return None
|
|
|
|
|
|
|
|
|
-class SignatureVendorTests(unittest.TestCase):
|
|
|
- """Tests for SignatureVendor base class."""
|
|
|
-
|
|
|
- def test_sign_not_implemented(self) -> None:
|
|
|
- """Test that sign raises NotImplementedError."""
|
|
|
- vendor = SignatureVendor()
|
|
|
- with self.assertRaises(NotImplementedError):
|
|
|
- vendor.sign(b"test data")
|
|
|
-
|
|
|
- def test_verify_not_implemented(self) -> None:
|
|
|
- """Test that verify raises NotImplementedError."""
|
|
|
- vendor = SignatureVendor()
|
|
|
- with self.assertRaises(NotImplementedError):
|
|
|
- vendor.verify(b"test data", b"fake signature")
|
|
|
-
|
|
|
-
|
|
|
@unittest.skipIf(gpg is None, "gpg not available")
|
|
|
class GPGSignatureVendorTests(unittest.TestCase):
|
|
|
"""Tests for GPGSignatureVendor."""
|
|
|
@@ -123,7 +106,7 @@ class GPGSignatureVendorTests(unittest.TestCase):
|
|
|
config = ConfigDict()
|
|
|
config.set((b"gpg",), b"minTrustLevel", b"marginal")
|
|
|
|
|
|
- vendor = GPGSignatureVendor(config=config)
|
|
|
+ vendor = GPGSignatureVendor.from_config(config=config)
|
|
|
self.assertEqual(vendor.min_trust_level, "marginal")
|
|
|
|
|
|
def test_min_trust_level_default(self) -> None:
|
|
|
@@ -264,7 +247,7 @@ class GPGCliSignatureVendorTests(unittest.TestCase):
|
|
|
"""Test verifying with specific trusted key IDs."""
|
|
|
from dulwich.signature import UntrustedSignature
|
|
|
|
|
|
- vendor = GPGCliSignatureVendor()
|
|
|
+ signer = GPGCliSignatureVendor()
|
|
|
test_data = b"test data to sign"
|
|
|
|
|
|
try:
|
|
|
@@ -273,15 +256,17 @@ class GPGCliSignatureVendorTests(unittest.TestCase):
|
|
|
self.skipTest("No valid GPG keys available for testing")
|
|
|
|
|
|
# Sign with the specific key
|
|
|
- signature = vendor.sign(test_data, keyid=valid_keyid)
|
|
|
+ signature = signer.sign(test_data, keyid=valid_keyid)
|
|
|
|
|
|
# Verify with the correct keyid - should succeed
|
|
|
- vendor.verify(test_data, signature, keyids=[valid_keyid])
|
|
|
+ verifier_trusted = GPGCliSignatureVendor(keyids=[valid_keyid])
|
|
|
+ verifier_trusted.verify(test_data, signature)
|
|
|
|
|
|
# Verify with a different keyid - should fail
|
|
|
fake_keyid = "0" * 40 # Fake 40-character fingerprint
|
|
|
+ verifier_untrusted = GPGCliSignatureVendor(keyids=[fake_keyid])
|
|
|
with self.assertRaises(UntrustedSignature):
|
|
|
- vendor.verify(test_data, signature, keyids=[fake_keyid])
|
|
|
+ verifier_untrusted.verify(test_data, signature)
|
|
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
|
self.skipTest(f"GPG key not available: {e}")
|
|
|
@@ -303,16 +288,13 @@ class GPGCliSignatureVendorTests(unittest.TestCase):
|
|
|
config = ConfigDict()
|
|
|
config.set((b"gpg",), b"program", b"gpg2")
|
|
|
|
|
|
- vendor = GPGCliSignatureVendor(config=config)
|
|
|
+ vendor = GPGCliSignatureVendor.from_config(config=config)
|
|
|
self.assertEqual(vendor.gpg_command, "gpg2")
|
|
|
|
|
|
- def test_gpg_program_override(self) -> None:
|
|
|
- """Test that gpg_command parameter overrides config."""
|
|
|
- config = ConfigDict()
|
|
|
- config.set((b"gpg",), b"program", b"gpg2")
|
|
|
-
|
|
|
- vendor = GPGCliSignatureVendor(config=config, gpg_command="gpg")
|
|
|
- self.assertEqual(vendor.gpg_command, "gpg")
|
|
|
+ def test_gpg_program_explicit(self) -> None:
|
|
|
+ """Test that gpg_command parameter works when passed directly."""
|
|
|
+ vendor = GPGCliSignatureVendor(gpg_command="gpg2")
|
|
|
+ self.assertEqual(vendor.gpg_command, "gpg2")
|
|
|
|
|
|
def test_gpg_program_default(self) -> None:
|
|
|
"""Test default gpg command when no config provided."""
|
|
|
@@ -322,7 +304,7 @@ class GPGCliSignatureVendorTests(unittest.TestCase):
|
|
|
def test_gpg_program_default_when_not_in_config(self) -> None:
|
|
|
"""Test default gpg command when config doesn't have gpg.program."""
|
|
|
config = ConfigDict()
|
|
|
- vendor = GPGCliSignatureVendor(config=config)
|
|
|
+ vendor = GPGCliSignatureVendor.from_config(config=config)
|
|
|
self.assertEqual(vendor.gpg_command, "gpg")
|
|
|
|
|
|
def test_available(self) -> None:
|
|
|
@@ -344,21 +326,18 @@ class X509SignatureVendorTests(unittest.TestCase):
|
|
|
config = ConfigDict()
|
|
|
config.set((b"gpg", b"x509"), b"program", b"/usr/local/bin/gpgsm")
|
|
|
|
|
|
- vendor = X509SignatureVendor(config=config)
|
|
|
+ vendor = X509SignatureVendor.from_config(config=config)
|
|
|
self.assertEqual(vendor.gpgsm_command, "/usr/local/bin/gpgsm")
|
|
|
|
|
|
- def test_gpgsm_program_override(self) -> None:
|
|
|
- """Test gpgsm_command parameter overrides config."""
|
|
|
- config = ConfigDict()
|
|
|
- config.set((b"gpg", b"x509"), b"program", b"/usr/local/bin/gpgsm")
|
|
|
-
|
|
|
- vendor = X509SignatureVendor(config=config, gpgsm_command="/custom/gpgsm")
|
|
|
+ def test_gpgsm_program_explicit(self) -> None:
|
|
|
+ """Test gpgsm_command parameter works when passed directly."""
|
|
|
+ vendor = X509SignatureVendor(gpgsm_command="/custom/gpgsm")
|
|
|
self.assertEqual(vendor.gpgsm_command, "/custom/gpgsm")
|
|
|
|
|
|
def test_gpgsm_program_default_when_not_in_config(self) -> None:
|
|
|
"""Test default when gpg.x509.program not in config."""
|
|
|
config = ConfigDict()
|
|
|
- vendor = X509SignatureVendor(config=config)
|
|
|
+ vendor = X509SignatureVendor.from_config(config=config)
|
|
|
self.assertEqual(vendor.gpgsm_command, "gpgsm")
|
|
|
|
|
|
def test_available(self) -> None:
|
|
|
@@ -459,13 +438,6 @@ class GetSignatureVendorTests(unittest.TestCase):
|
|
|
class SSHSigSignatureVendorTests(unittest.TestCase):
|
|
|
"""Tests for SSHSigSignatureVendor (sshsig package implementation)."""
|
|
|
|
|
|
- def test_sign_not_supported(self) -> None:
|
|
|
- """Test that sign raises NotImplementedError with helpful message."""
|
|
|
- vendor = SSHSigSignatureVendor()
|
|
|
- with self.assertRaises(NotImplementedError) as cm:
|
|
|
- vendor.sign(b"test data", keyid="dummy")
|
|
|
- self.assertIn("SSHCliSignatureVendor", str(cm.exception))
|
|
|
-
|
|
|
def test_verify_without_config_raises(self) -> None:
|
|
|
"""Test that verify without config or keyids raises UntrustedSignature."""
|
|
|
from dulwich.signature import UntrustedSignature
|
|
|
@@ -481,7 +453,7 @@ class SSHSigSignatureVendorTests(unittest.TestCase):
|
|
|
config.set((b"gpg", b"ssh"), b"allowedSignersFile", b"/path/to/allowed")
|
|
|
config.set((b"gpg", b"ssh"), b"defaultKeyCommand", b"ssh-add -L")
|
|
|
|
|
|
- vendor = SSHSigSignatureVendor(config=config)
|
|
|
+ vendor = SSHSigSignatureVendor.from_config(config=config)
|
|
|
self.assertEqual(vendor.allowed_signers_file, "/path/to/allowed")
|
|
|
self.assertEqual(vendor.default_key_command, "ssh-add -L")
|
|
|
|
|
|
@@ -532,7 +504,7 @@ class SSHSigSignatureVendorTests(unittest.TestCase):
|
|
|
cli_config.set(
|
|
|
(b"gpg", b"ssh"), b"allowedSignersFile", allowed_signers.encode()
|
|
|
)
|
|
|
- cli_vendor = SSHCliSignatureVendor(config=cli_config)
|
|
|
+ cli_vendor = SSHCliSignatureVendor.from_config(config=cli_config)
|
|
|
|
|
|
test_data = b"test data for sshsig verification"
|
|
|
signature = cli_vendor.sign(test_data, keyid=private_key)
|
|
|
@@ -542,11 +514,168 @@ class SSHSigSignatureVendorTests(unittest.TestCase):
|
|
|
pkg_config.set(
|
|
|
(b"gpg", b"ssh"), b"allowedSignersFile", allowed_signers.encode()
|
|
|
)
|
|
|
- pkg_vendor = SSHSigSignatureVendor(config=pkg_config)
|
|
|
+ pkg_vendor = SSHSigSignatureVendor.from_config(config=pkg_config)
|
|
|
|
|
|
# This should succeed
|
|
|
pkg_vendor.verify(test_data, signature)
|
|
|
|
|
|
+ def test_key_lifetime_validation(self) -> None:
|
|
|
+ """Test SSH key lifetime validation (valid-after/valid-before).
|
|
|
+
|
|
|
+ Note: The current version of the sshsig library does not parse options
|
|
|
+ from allowed_signers files, so this test verifies the code is in place
|
|
|
+ but will be skipped until the library adds support.
|
|
|
+ """
|
|
|
+ import io
|
|
|
+
|
|
|
+ # Check if sshsig library supports parsing options
|
|
|
+ import sshsig.allowed_signers
|
|
|
+
|
|
|
+ test_content = 'valid-after="20260104" test@example.com ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIOMqqnkVzrm0SdG6UOoqKLsabgH5C9okWi0dh2l9GKJl'
|
|
|
+ f = io.StringIO(test_content)
|
|
|
+ signers = list(sshsig.allowed_signers.load_allowed_signers_file(f))
|
|
|
+
|
|
|
+ if not signers or signers[0].options is None:
|
|
|
+ self.skipTest(
|
|
|
+ "sshsig library does not yet support parsing options from allowed_signers files"
|
|
|
+ )
|
|
|
+
|
|
|
+ # If we get here, the library supports options, so we can test
|
|
|
+ import os
|
|
|
+ import tempfile
|
|
|
+ import time
|
|
|
+
|
|
|
+ from dulwich.signature import UntrustedSignature
|
|
|
+
|
|
|
+ if shutil.which("ssh-keygen") is None:
|
|
|
+ self.skipTest("ssh-keygen not available")
|
|
|
+
|
|
|
+ with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
+ private_key = os.path.join(tmpdir, "test_key")
|
|
|
+ public_key = private_key + ".pub"
|
|
|
+ allowed_signers = os.path.join(tmpdir, "allowed_signers")
|
|
|
+
|
|
|
+ # Generate Ed25519 key
|
|
|
+ subprocess.run(
|
|
|
+ [
|
|
|
+ "ssh-keygen",
|
|
|
+ "-t",
|
|
|
+ "ed25519",
|
|
|
+ "-f",
|
|
|
+ private_key,
|
|
|
+ "-N",
|
|
|
+ "",
|
|
|
+ "-C",
|
|
|
+ "test@example.com",
|
|
|
+ ],
|
|
|
+ capture_output=True,
|
|
|
+ check=True,
|
|
|
+ )
|
|
|
+
|
|
|
+ # Read public key
|
|
|
+ with open(public_key) as pub:
|
|
|
+ pub_key_content = pub.read().strip()
|
|
|
+
|
|
|
+ # Test 1: Key with valid-after in the future (should fail)
|
|
|
+ future_time = int(time.time()) + 86400 # 1 day from now
|
|
|
+ future_timestamp = time.strftime("%Y%m%d", time.gmtime(future_time))
|
|
|
+ with open(allowed_signers, "w") as allowed:
|
|
|
+ allowed.write(
|
|
|
+ f'valid-after="{future_timestamp}" test@example.com {pub_key_content}\n'
|
|
|
+ )
|
|
|
+
|
|
|
+ cli_config = ConfigDict()
|
|
|
+ cli_config.set(
|
|
|
+ (b"gpg", b"ssh"), b"allowedSignersFile", allowed_signers.encode()
|
|
|
+ )
|
|
|
+ cli_vendor = SSHCliSignatureVendor.from_config(config=cli_config)
|
|
|
+
|
|
|
+ test_data = b"test data for lifetime validation"
|
|
|
+ signature = cli_vendor.sign(test_data, keyid=private_key)
|
|
|
+
|
|
|
+ pkg_config = ConfigDict()
|
|
|
+ pkg_config.set(
|
|
|
+ (b"gpg", b"ssh"), b"allowedSignersFile", allowed_signers.encode()
|
|
|
+ )
|
|
|
+ pkg_vendor = SSHSigSignatureVendor.from_config(config=pkg_config)
|
|
|
+
|
|
|
+ # Verification should fail because key is not yet valid
|
|
|
+ with self.assertRaises(UntrustedSignature) as cm:
|
|
|
+ pkg_vendor.verify(test_data, signature)
|
|
|
+ self.assertIn("not yet valid", str(cm.exception))
|
|
|
+
|
|
|
+ def test_revocation_checking(self) -> None:
|
|
|
+ """Test SSH key revocation checking."""
|
|
|
+ import os
|
|
|
+ import tempfile
|
|
|
+
|
|
|
+ from dulwich.signature import UntrustedSignature
|
|
|
+
|
|
|
+ if shutil.which("ssh-keygen") is None:
|
|
|
+ self.skipTest("ssh-keygen not available")
|
|
|
+
|
|
|
+ with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
+ private_key = os.path.join(tmpdir, "test_key")
|
|
|
+ public_key = private_key + ".pub"
|
|
|
+ allowed_signers = os.path.join(tmpdir, "allowed_signers")
|
|
|
+ revocation_file = os.path.join(tmpdir, "revoked_keys")
|
|
|
+
|
|
|
+ # Generate Ed25519 key
|
|
|
+ subprocess.run(
|
|
|
+ [
|
|
|
+ "ssh-keygen",
|
|
|
+ "-t",
|
|
|
+ "ed25519",
|
|
|
+ "-f",
|
|
|
+ private_key,
|
|
|
+ "-N",
|
|
|
+ "",
|
|
|
+ "-C",
|
|
|
+ "test@example.com",
|
|
|
+ ],
|
|
|
+ capture_output=True,
|
|
|
+ check=True,
|
|
|
+ )
|
|
|
+
|
|
|
+ # Read public key
|
|
|
+ with open(public_key) as pub:
|
|
|
+ pub_key_content = pub.read().strip()
|
|
|
+
|
|
|
+ # Create allowed_signers file
|
|
|
+ with open(allowed_signers, "w") as allowed:
|
|
|
+ allowed.write(f"* {pub_key_content}\n")
|
|
|
+
|
|
|
+ # Sign some data
|
|
|
+ cli_config = ConfigDict()
|
|
|
+ cli_config.set(
|
|
|
+ (b"gpg", b"ssh"), b"allowedSignersFile", allowed_signers.encode()
|
|
|
+ )
|
|
|
+ cli_vendor = SSHCliSignatureVendor.from_config(config=cli_config)
|
|
|
+
|
|
|
+ test_data = b"test data for revocation checking"
|
|
|
+ signature = cli_vendor.sign(test_data, keyid=private_key)
|
|
|
+
|
|
|
+ # Test 1: Without revocation file (should succeed)
|
|
|
+ pkg_config = ConfigDict()
|
|
|
+ pkg_config.set(
|
|
|
+ (b"gpg", b"ssh"), b"allowedSignersFile", allowed_signers.encode()
|
|
|
+ )
|
|
|
+ pkg_vendor = SSHSigSignatureVendor.from_config(config=pkg_config)
|
|
|
+ pkg_vendor.verify(test_data, signature)
|
|
|
+
|
|
|
+ # Test 2: With revocation file containing this key (should fail)
|
|
|
+ with open(revocation_file, "w") as revoked:
|
|
|
+ revoked.write(f"{pub_key_content}\n")
|
|
|
+
|
|
|
+ pkg_config.set(
|
|
|
+ (b"gpg", b"ssh"), b"revocationFile", revocation_file.encode()
|
|
|
+ )
|
|
|
+ pkg_vendor = SSHSigSignatureVendor.from_config(config=pkg_config)
|
|
|
+
|
|
|
+ with self.assertRaises(UntrustedSignature) as cm:
|
|
|
+ pkg_vendor.verify(test_data, signature)
|
|
|
+ self.assertIn("revoked", str(cm.exception))
|
|
|
+
|
|
|
|
|
|
class SSHCliSignatureVendorTests(unittest.TestCase):
|
|
|
"""Tests for SSHCliSignatureVendor."""
|
|
|
@@ -561,16 +690,13 @@ class SSHCliSignatureVendorTests(unittest.TestCase):
|
|
|
config = ConfigDict()
|
|
|
config.set((b"gpg", b"ssh"), b"program", b"/usr/bin/ssh-keygen")
|
|
|
|
|
|
- vendor = SSHCliSignatureVendor(config=config)
|
|
|
+ vendor = SSHCliSignatureVendor.from_config(config=config)
|
|
|
self.assertEqual(vendor.ssh_command, "/usr/bin/ssh-keygen")
|
|
|
|
|
|
- def test_ssh_program_override(self) -> None:
|
|
|
- """Test that ssh_command parameter overrides config."""
|
|
|
- config = ConfigDict()
|
|
|
- config.set((b"gpg", b"ssh"), b"program", b"/usr/bin/ssh-keygen")
|
|
|
-
|
|
|
- vendor = SSHCliSignatureVendor(config=config, ssh_command="ssh-keygen")
|
|
|
- self.assertEqual(vendor.ssh_command, "ssh-keygen")
|
|
|
+ def test_ssh_program_explicit(self) -> None:
|
|
|
+ """Test that ssh_command parameter works when passed directly."""
|
|
|
+ vendor = SSHCliSignatureVendor(ssh_command="/usr/bin/ssh-keygen")
|
|
|
+ self.assertEqual(vendor.ssh_command, "/usr/bin/ssh-keygen")
|
|
|
|
|
|
def test_ssh_program_default(self) -> None:
|
|
|
"""Test default ssh-keygen command when no config provided."""
|
|
|
@@ -582,7 +708,7 @@ class SSHCliSignatureVendorTests(unittest.TestCase):
|
|
|
config = ConfigDict()
|
|
|
config.set((b"gpg", b"ssh"), b"allowedSignersFile", b"/tmp/allowed_signers")
|
|
|
|
|
|
- vendor = SSHCliSignatureVendor(config=config)
|
|
|
+ vendor = SSHCliSignatureVendor.from_config(config=config)
|
|
|
self.assertEqual(vendor.allowed_signers_file, "/tmp/allowed_signers")
|
|
|
|
|
|
def test_sign_without_key_raises(self) -> None:
|
|
|
@@ -641,7 +767,7 @@ class SSHCliSignatureVendorTests(unittest.TestCase):
|
|
|
(b"gpg", b"ssh"), b"allowedSignersFile", allowed_signers.encode()
|
|
|
)
|
|
|
|
|
|
- vendor = SSHCliSignatureVendor(config=config)
|
|
|
+ vendor = SSHCliSignatureVendor.from_config(config=config)
|
|
|
|
|
|
# Test signing and verification
|
|
|
test_data = b"test data to sign with SSH"
|
|
|
@@ -686,7 +812,7 @@ class SSHCliSignatureVendorTests(unittest.TestCase):
|
|
|
(b"gpg", b"ssh"), b"defaultKeyCommand", f"echo {private_key}".encode()
|
|
|
)
|
|
|
|
|
|
- vendor = SSHCliSignatureVendor(config=config)
|
|
|
+ vendor = SSHCliSignatureVendor.from_config(config=config)
|
|
|
test_data = b"test data"
|
|
|
|
|
|
# Sign without providing keyid - should use defaultKeyCommand
|
|
|
@@ -699,7 +825,7 @@ class SSHCliSignatureVendorTests(unittest.TestCase):
|
|
|
config = ConfigDict()
|
|
|
config.set((b"gpg", b"ssh"), b"revocationFile", b"/path/to/revoked_keys")
|
|
|
|
|
|
- vendor = SSHCliSignatureVendor(config=config)
|
|
|
+ vendor = SSHCliSignatureVendor.from_config(config=config)
|
|
|
self.assertEqual(vendor.revocation_file, "/path/to/revoked_keys")
|
|
|
|
|
|
def test_available(self) -> None:
|