test_signature.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903
  1. # test_signature.py -- tests for signature.py
  2. # Copyright (C) 2025 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for signature vendors."""
  22. import shutil
  23. import subprocess
  24. import unittest
  25. from dulwich.config import ConfigDict
  26. from dulwich.signature import (
  27. SIGNATURE_FORMAT_OPENPGP,
  28. SIGNATURE_FORMAT_SSH,
  29. SIGNATURE_FORMAT_X509,
  30. GPGCliSignatureVendor,
  31. GPGSignatureVendor,
  32. SSHCliSignatureVendor,
  33. SSHSigSignatureVendor,
  34. X509SignatureVendor,
  35. detect_signature_format,
  36. get_signature_vendor,
  37. get_signature_vendor_for_signature,
  38. )
  39. try:
  40. import gpg
  41. except ImportError:
  42. gpg = None
  43. def get_valid_gpg_key() -> str | None:
  44. """Get a valid (non-revoked, non-expired, can-sign) GPG key from the keyring.
  45. Returns:
  46. A key fingerprint string that can be used for signing, or None if no valid key found.
  47. Raises:
  48. unittest.SkipTest: if gpg module is not available
  49. """
  50. if gpg is None:
  51. raise unittest.SkipTest("gpg module not available")
  52. with gpg.Context() as ctx:
  53. keys = list(ctx.keylist(secret=True))
  54. if not keys:
  55. return None
  56. # Find a non-revoked, non-expired key that can sign
  57. for key in keys:
  58. if not key.revoked and not key.expired and key.can_sign:
  59. return str(key.fpr)
  60. return None
  61. def get_valid_gpg_key_cli() -> str | None:
  62. """Get a valid (non-revoked, non-expired) GPG key fingerprint using CLI.
  63. Returns:
  64. A key fingerprint string, or None if no valid key found.
  65. """
  66. result = subprocess.run(
  67. ["gpg", "--list-secret-keys", "--with-colons"],
  68. capture_output=True,
  69. check=True,
  70. text=True,
  71. )
  72. # Find a valid key (field 2 should be '-' for valid, 'e' for expired, 'r' for revoked)
  73. current_key_valid = False
  74. for line in result.stdout.split("\n"):
  75. if line.startswith("sec:"):
  76. fields = line.split(":")
  77. # Only accept valid keys (field 2 is '-')
  78. current_key_valid = fields[1] == "-"
  79. elif line.startswith("fpr:") and current_key_valid:
  80. return line.split(":")[9]
  81. return None
  82. @unittest.skipIf(gpg is None, "gpg not available")
  83. class GPGSignatureVendorTests(unittest.TestCase):
  84. """Tests for GPGSignatureVendor."""
  85. def test_min_trust_level_from_config(self) -> None:
  86. """Test reading gpg.minTrustLevel from config."""
  87. config = ConfigDict()
  88. config.set((b"gpg",), b"minTrustLevel", b"marginal")
  89. vendor = GPGSignatureVendor.from_config(config=config)
  90. self.assertEqual(vendor.min_trust_level, "marginal")
  91. def test_min_trust_level_default(self) -> None:
  92. """Test default when gpg.minTrustLevel not in config."""
  93. vendor = GPGSignatureVendor()
  94. self.assertIsNone(vendor.min_trust_level)
  95. def test_available(self) -> None:
  96. """Test that available() returns boolean."""
  97. result = GPGSignatureVendor.available()
  98. self.assertIsInstance(result, bool)
  99. def test_sign_and_verify(self) -> None:
  100. """Test basic sign and verify cycle.
  101. Note: This test requires a GPG key to be configured in the test
  102. environment. It may be skipped in environments without GPG setup.
  103. """
  104. vendor = GPGSignatureVendor()
  105. test_data = b"test data to sign"
  106. try:
  107. # Sign the data
  108. signature = vendor.sign(test_data)
  109. self.assertIsInstance(signature, bytes)
  110. self.assertGreater(len(signature), 0)
  111. # Verify the signature
  112. vendor.verify(test_data, signature)
  113. except gpg.errors.GPGMEError as e:
  114. # Skip test if no GPG key is available
  115. self.skipTest(f"GPG key not available: {e}")
  116. def test_verify_invalid_signature(self) -> None:
  117. """Test that verify raises an error for invalid signatures."""
  118. vendor = GPGSignatureVendor()
  119. test_data = b"test data"
  120. invalid_signature = b"this is not a valid signature"
  121. with self.assertRaises(gpg.errors.GPGMEError):
  122. vendor.verify(test_data, invalid_signature)
  123. def test_sign_with_keyid(self) -> None:
  124. """Test signing with a specific key ID.
  125. Note: This test requires a GPG key to be configured in the test
  126. environment. It may be skipped in environments without GPG setup.
  127. """
  128. vendor = GPGSignatureVendor()
  129. test_data = b"test data to sign"
  130. try:
  131. key = get_valid_gpg_key()
  132. if not key:
  133. self.skipTest("No valid GPG keys available for testing")
  134. signature = vendor.sign(test_data, keyid=key)
  135. self.assertIsInstance(signature, bytes)
  136. self.assertGreater(len(signature), 0)
  137. # Verify the signature
  138. vendor.verify(test_data, signature)
  139. except gpg.errors.GPGMEError as e:
  140. self.skipTest(f"GPG key not available: {e}")
  141. class GPGCliSignatureVendorTests(unittest.TestCase):
  142. """Tests for GPGCliSignatureVendor."""
  143. def setUp(self) -> None:
  144. """Check if gpg command is available."""
  145. if shutil.which("gpg") is None:
  146. self.skipTest("gpg command not available")
  147. def test_sign_and_verify(self) -> None:
  148. """Test basic sign and verify cycle using CLI."""
  149. vendor = GPGCliSignatureVendor()
  150. test_data = b"test data to sign"
  151. try:
  152. # Sign the data
  153. signature = vendor.sign(test_data)
  154. self.assertIsInstance(signature, bytes)
  155. self.assertGreater(len(signature), 0)
  156. self.assertTrue(signature.startswith(b"-----BEGIN PGP SIGNATURE-----"))
  157. # Verify the signature
  158. vendor.verify(test_data, signature)
  159. except subprocess.CalledProcessError as e:
  160. # Skip test if no GPG key is available or configured
  161. self.skipTest(f"GPG signing failed: {e}")
  162. def test_verify_invalid_signature(self) -> None:
  163. """Test that verify raises an error for invalid signatures."""
  164. from dulwich.signature import BadSignature
  165. vendor = GPGCliSignatureVendor()
  166. test_data = b"test data"
  167. invalid_signature = b"this is not a valid signature"
  168. with self.assertRaises(BadSignature):
  169. vendor.verify(test_data, invalid_signature)
  170. def test_sign_with_keyid(self) -> None:
  171. """Test signing with a specific key ID using CLI."""
  172. vendor = GPGCliSignatureVendor()
  173. test_data = b"test data to sign"
  174. try:
  175. # Try to get a key from the keyring
  176. result = subprocess.run(
  177. ["gpg", "--list-secret-keys", "--with-colons"],
  178. capture_output=True,
  179. check=True,
  180. text=True,
  181. )
  182. # Parse output to find a key fingerprint
  183. keyid = None
  184. for line in result.stdout.split("\n"):
  185. if line.startswith("fpr:"):
  186. keyid = line.split(":")[9]
  187. break
  188. if not keyid:
  189. self.skipTest("No GPG keys available for testing")
  190. signature = vendor.sign(test_data, keyid=keyid)
  191. self.assertIsInstance(signature, bytes)
  192. self.assertGreater(len(signature), 0)
  193. # Verify the signature
  194. vendor.verify(test_data, signature)
  195. except subprocess.CalledProcessError as e:
  196. self.skipTest(f"GPG key not available: {e}")
  197. def test_verify_with_keyids(self) -> None:
  198. """Test verifying with specific trusted key IDs."""
  199. from dulwich.signature import UntrustedSignature
  200. signer = GPGCliSignatureVendor()
  201. test_data = b"test data to sign"
  202. try:
  203. valid_keyid = get_valid_gpg_key_cli()
  204. if not valid_keyid:
  205. self.skipTest("No valid GPG keys available for testing")
  206. # Sign with the specific key
  207. signature = signer.sign(test_data, keyid=valid_keyid)
  208. # Verify with the correct keyid - should succeed
  209. verifier_trusted = GPGCliSignatureVendor(keyids=[valid_keyid])
  210. verifier_trusted.verify(test_data, signature)
  211. # Verify with a different keyid - should fail
  212. fake_keyid = "0" * 40 # Fake 40-character fingerprint
  213. verifier_untrusted = GPGCliSignatureVendor(keyids=[fake_keyid])
  214. with self.assertRaises(UntrustedSignature):
  215. verifier_untrusted.verify(test_data, signature)
  216. except subprocess.CalledProcessError as e:
  217. self.skipTest(f"GPG key not available: {e}")
  218. def test_custom_gpg_command(self) -> None:
  219. """Test using a custom GPG command path."""
  220. vendor = GPGCliSignatureVendor(gpg_command="gpg")
  221. test_data = b"test data"
  222. try:
  223. signature = vendor.sign(test_data)
  224. self.assertIsInstance(signature, bytes)
  225. except subprocess.CalledProcessError as e:
  226. self.skipTest(f"GPG not available: {e}")
  227. def test_gpg_program_from_config(self) -> None:
  228. """Test reading gpg.program from config."""
  229. # Create a config with gpg.program set
  230. config = ConfigDict()
  231. config.set((b"gpg",), b"program", b"gpg2")
  232. vendor = GPGCliSignatureVendor.from_config(config=config)
  233. self.assertEqual(vendor.gpg_command, "gpg2")
  234. def test_gpg_program_explicit(self) -> None:
  235. """Test that gpg_command parameter works when passed directly."""
  236. vendor = GPGCliSignatureVendor(gpg_command="gpg2")
  237. self.assertEqual(vendor.gpg_command, "gpg2")
  238. def test_gpg_program_default(self) -> None:
  239. """Test default gpg command when no config provided."""
  240. vendor = GPGCliSignatureVendor()
  241. self.assertEqual(vendor.gpg_command, "gpg")
  242. def test_gpg_program_default_when_not_in_config(self) -> None:
  243. """Test default gpg command when config doesn't have gpg.program."""
  244. config = ConfigDict()
  245. vendor = GPGCliSignatureVendor.from_config(config=config)
  246. self.assertEqual(vendor.gpg_command, "gpg")
  247. def test_available(self) -> None:
  248. """Test that available() returns boolean."""
  249. result = GPGCliSignatureVendor.available()
  250. self.assertIsInstance(result, bool)
  251. class X509SignatureVendorTests(unittest.TestCase):
  252. """Tests for X509SignatureVendor."""
  253. def test_gpgsm_program_default(self) -> None:
  254. """Test default gpgsm command is 'gpgsm'."""
  255. vendor = X509SignatureVendor()
  256. self.assertEqual(vendor.gpgsm_command, "gpgsm")
  257. def test_gpgsm_program_from_config(self) -> None:
  258. """Test reading gpg.x509.program from config."""
  259. config = ConfigDict()
  260. config.set((b"gpg", b"x509"), b"program", b"/usr/local/bin/gpgsm")
  261. vendor = X509SignatureVendor.from_config(config=config)
  262. self.assertEqual(vendor.gpgsm_command, "/usr/local/bin/gpgsm")
  263. def test_gpgsm_program_explicit(self) -> None:
  264. """Test gpgsm_command parameter works when passed directly."""
  265. vendor = X509SignatureVendor(gpgsm_command="/custom/gpgsm")
  266. self.assertEqual(vendor.gpgsm_command, "/custom/gpgsm")
  267. def test_gpgsm_program_default_when_not_in_config(self) -> None:
  268. """Test default when gpg.x509.program not in config."""
  269. config = ConfigDict()
  270. vendor = X509SignatureVendor.from_config(config=config)
  271. self.assertEqual(vendor.gpgsm_command, "gpgsm")
  272. def test_available(self) -> None:
  273. """Test that available() returns boolean."""
  274. result = X509SignatureVendor.available()
  275. self.assertIsInstance(result, bool)
  276. @unittest.skipIf(
  277. shutil.which("gpgsm") is None, "gpgsm command not available in PATH"
  278. )
  279. def test_sign_and_verify(self) -> None:
  280. """Test basic X.509 sign and verify cycle.
  281. Note: This test requires gpgsm and an X.509 certificate to be configured.
  282. It may be skipped in environments without gpgsm/certificate setup.
  283. """
  284. vendor = X509SignatureVendor()
  285. test_data = b"test data to sign"
  286. try:
  287. # Try to sign the data
  288. signature = vendor.sign(test_data)
  289. self.assertIsInstance(signature, bytes)
  290. self.assertGreater(len(signature), 0)
  291. # Verify the signature
  292. vendor.verify(test_data, signature)
  293. except subprocess.CalledProcessError:
  294. # Skip test if no X.509 certificate is available
  295. self.skipTest("No X.509 certificate available for signing")
  296. class GetSignatureVendorTests(unittest.TestCase):
  297. """Tests for get_signature_vendor function."""
  298. def test_default_format(self) -> None:
  299. """Test that default format is openpgp."""
  300. vendor = get_signature_vendor()
  301. self.assertIsInstance(vendor, (GPGSignatureVendor, GPGCliSignatureVendor))
  302. def test_explicit_openpgp_format(self) -> None:
  303. """Test explicitly requesting openpgp format."""
  304. vendor = get_signature_vendor(format="openpgp")
  305. self.assertIsInstance(vendor, (GPGSignatureVendor, GPGCliSignatureVendor))
  306. def test_format_from_config(self) -> None:
  307. """Test reading format from config."""
  308. config = ConfigDict()
  309. config.set((b"gpg",), b"format", b"openpgp")
  310. vendor = get_signature_vendor(config=config)
  311. self.assertIsInstance(vendor, (GPGSignatureVendor, GPGCliSignatureVendor))
  312. def test_format_case_insensitive(self) -> None:
  313. """Test that format is case-insensitive."""
  314. vendor = get_signature_vendor(format="OpenPGP")
  315. self.assertIsInstance(vendor, (GPGSignatureVendor, GPGCliSignatureVendor))
  316. def test_x509_format_supported(self) -> None:
  317. """Test that x509 format is now supported."""
  318. vendor = get_signature_vendor(format="x509")
  319. self.assertIsInstance(vendor, X509SignatureVendor)
  320. def test_ssh_format_supported(self) -> None:
  321. """Test that ssh format is now supported."""
  322. vendor = get_signature_vendor(format="ssh")
  323. # Should be either SSHSigSignatureVendor or SSHCliSignatureVendor
  324. self.assertIsInstance(vendor, (SSHSigSignatureVendor, SSHCliSignatureVendor))
  325. def test_invalid_format(self) -> None:
  326. """Test that invalid format raises ValueError."""
  327. with self.assertRaises(ValueError) as cm:
  328. get_signature_vendor(format="invalid")
  329. self.assertIn("Unsupported", str(cm.exception))
  330. def test_config_passed_to_vendor(self) -> None:
  331. """Test that config is passed to the vendor."""
  332. config = ConfigDict()
  333. config.set((b"gpg",), b"program", b"gpg2")
  334. vendor = get_signature_vendor(format="openpgp", config=config)
  335. # If CLI vendor is used, check that config was passed
  336. if isinstance(vendor, GPGCliSignatureVendor):
  337. self.assertEqual(vendor.gpg_command, "gpg2")
  338. def test_ssh_format(self) -> None:
  339. """Test requesting SSH format."""
  340. vendor = get_signature_vendor(format="ssh")
  341. # Should be either SSHSigSignatureVendor or SSHCliSignatureVendor
  342. self.assertIsInstance(vendor, (SSHSigSignatureVendor, SSHCliSignatureVendor))
  343. def test_x509_format(self) -> None:
  344. """Test requesting X.509 format."""
  345. vendor = get_signature_vendor(format="x509")
  346. self.assertIsInstance(vendor, X509SignatureVendor)
  347. class SSHSigSignatureVendorTests(unittest.TestCase):
  348. """Tests for SSHSigSignatureVendor (sshsig package implementation)."""
  349. def setUp(self) -> None:
  350. """Check if sshsig package is available."""
  351. if not SSHSigSignatureVendor.available():
  352. self.skipTest("sshsig package not available")
  353. def test_verify_without_config_raises(self) -> None:
  354. """Test that verify without config or keyids raises UntrustedSignature."""
  355. from dulwich.signature import UntrustedSignature
  356. vendor = SSHSigSignatureVendor()
  357. with self.assertRaises(UntrustedSignature) as cm:
  358. vendor.verify(b"test data", b"fake signature")
  359. self.assertIn("allowedSignersFile", str(cm.exception))
  360. def test_config_parsing(self) -> None:
  361. """Test parsing SSH config options."""
  362. config = ConfigDict()
  363. config.set((b"gpg", b"ssh"), b"allowedSignersFile", b"/path/to/allowed")
  364. config.set((b"gpg", b"ssh"), b"defaultKeyCommand", b"ssh-add -L")
  365. vendor = SSHSigSignatureVendor.from_config(config=config)
  366. self.assertEqual(vendor.allowed_signers_file, "/path/to/allowed")
  367. self.assertEqual(vendor.default_key_command, "ssh-add -L")
  368. def test_available(self) -> None:
  369. """Test that available() returns boolean."""
  370. result = SSHSigSignatureVendor.available()
  371. self.assertIsInstance(result, bool)
  372. def test_verify_with_cli_generated_signature(self) -> None:
  373. """Test verifying a signature created by SSH CLI vendor."""
  374. import os
  375. import tempfile
  376. if shutil.which("ssh-keygen") is None:
  377. self.skipTest("ssh-keygen not available")
  378. # Generate a test SSH key and signature using CLI vendor
  379. with tempfile.TemporaryDirectory() as tmpdir:
  380. private_key = os.path.join(tmpdir, "test_key")
  381. public_key = private_key + ".pub"
  382. allowed_signers = os.path.join(tmpdir, "allowed_signers")
  383. # Generate Ed25519 key
  384. subprocess.run(
  385. [
  386. "ssh-keygen",
  387. "-t",
  388. "ed25519",
  389. "-f",
  390. private_key,
  391. "-N",
  392. "",
  393. "-C",
  394. "test@example.com",
  395. ],
  396. capture_output=True,
  397. check=True,
  398. )
  399. # Create allowed_signers file
  400. with open(public_key) as pub:
  401. pub_key_content = pub.read().strip()
  402. with open(allowed_signers, "w") as allowed:
  403. allowed.write(f"* {pub_key_content}\n")
  404. # Sign with CLI vendor
  405. cli_config = ConfigDict()
  406. cli_config.set(
  407. (b"gpg", b"ssh"), b"allowedSignersFile", allowed_signers.encode()
  408. )
  409. cli_vendor = SSHCliSignatureVendor.from_config(config=cli_config)
  410. test_data = b"test data for sshsig verification"
  411. signature = cli_vendor.sign(test_data, keyid=private_key)
  412. # Verify with sshsig package vendor
  413. pkg_config = ConfigDict()
  414. pkg_config.set(
  415. (b"gpg", b"ssh"), b"allowedSignersFile", allowed_signers.encode()
  416. )
  417. pkg_vendor = SSHSigSignatureVendor.from_config(config=pkg_config)
  418. # This should succeed
  419. pkg_vendor.verify(test_data, signature)
  420. def test_key_lifetime_validation(self) -> None:
  421. """Test SSH key lifetime validation (valid-after/valid-before).
  422. Note: The current version of the sshsig library does not parse options
  423. from allowed_signers files, so this test verifies the code is in place
  424. but will be skipped until the library adds support.
  425. """
  426. import io
  427. # Check if sshsig library supports parsing options
  428. import sshsig.allowed_signers
  429. test_content = 'valid-after="20260104" test@example.com ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIOMqqnkVzrm0SdG6UOoqKLsabgH5C9okWi0dh2l9GKJl'
  430. f = io.StringIO(test_content)
  431. signers = list(sshsig.allowed_signers.load_allowed_signers_file(f))
  432. if not signers or signers[0].options is None:
  433. self.skipTest(
  434. "sshsig library does not yet support parsing options from allowed_signers files"
  435. )
  436. # If we get here, the library supports options, so we can test
  437. import os
  438. import tempfile
  439. import time
  440. from dulwich.signature import UntrustedSignature
  441. if shutil.which("ssh-keygen") is None:
  442. self.skipTest("ssh-keygen not available")
  443. with tempfile.TemporaryDirectory() as tmpdir:
  444. private_key = os.path.join(tmpdir, "test_key")
  445. public_key = private_key + ".pub"
  446. allowed_signers = os.path.join(tmpdir, "allowed_signers")
  447. # Generate Ed25519 key
  448. subprocess.run(
  449. [
  450. "ssh-keygen",
  451. "-t",
  452. "ed25519",
  453. "-f",
  454. private_key,
  455. "-N",
  456. "",
  457. "-C",
  458. "test@example.com",
  459. ],
  460. capture_output=True,
  461. check=True,
  462. )
  463. # Read public key
  464. with open(public_key) as pub:
  465. pub_key_content = pub.read().strip()
  466. # Test 1: Key with valid-after in the future (should fail)
  467. future_time = int(time.time()) + 86400 # 1 day from now
  468. future_timestamp = time.strftime("%Y%m%d", time.gmtime(future_time))
  469. with open(allowed_signers, "w") as allowed:
  470. allowed.write(
  471. f'valid-after="{future_timestamp}" test@example.com {pub_key_content}\n'
  472. )
  473. cli_config = ConfigDict()
  474. cli_config.set(
  475. (b"gpg", b"ssh"), b"allowedSignersFile", allowed_signers.encode()
  476. )
  477. cli_vendor = SSHCliSignatureVendor.from_config(config=cli_config)
  478. test_data = b"test data for lifetime validation"
  479. signature = cli_vendor.sign(test_data, keyid=private_key)
  480. pkg_config = ConfigDict()
  481. pkg_config.set(
  482. (b"gpg", b"ssh"), b"allowedSignersFile", allowed_signers.encode()
  483. )
  484. pkg_vendor = SSHSigSignatureVendor.from_config(config=pkg_config)
  485. # Verification should fail because key is not yet valid
  486. with self.assertRaises(UntrustedSignature) as cm:
  487. pkg_vendor.verify(test_data, signature)
  488. self.assertIn("not yet valid", str(cm.exception))
  489. def test_revocation_checking(self) -> None:
  490. """Test SSH key revocation checking."""
  491. import os
  492. import tempfile
  493. from dulwich.signature import UntrustedSignature
  494. if shutil.which("ssh-keygen") is None:
  495. self.skipTest("ssh-keygen not available")
  496. with tempfile.TemporaryDirectory() as tmpdir:
  497. private_key = os.path.join(tmpdir, "test_key")
  498. public_key = private_key + ".pub"
  499. allowed_signers = os.path.join(tmpdir, "allowed_signers")
  500. revocation_file = os.path.join(tmpdir, "revoked_keys")
  501. # Generate Ed25519 key
  502. subprocess.run(
  503. [
  504. "ssh-keygen",
  505. "-t",
  506. "ed25519",
  507. "-f",
  508. private_key,
  509. "-N",
  510. "",
  511. "-C",
  512. "test@example.com",
  513. ],
  514. capture_output=True,
  515. check=True,
  516. )
  517. # Read public key
  518. with open(public_key) as pub:
  519. pub_key_content = pub.read().strip()
  520. # Create allowed_signers file
  521. with open(allowed_signers, "w") as allowed:
  522. allowed.write(f"* {pub_key_content}\n")
  523. # Sign some data
  524. cli_config = ConfigDict()
  525. cli_config.set(
  526. (b"gpg", b"ssh"), b"allowedSignersFile", allowed_signers.encode()
  527. )
  528. cli_vendor = SSHCliSignatureVendor.from_config(config=cli_config)
  529. test_data = b"test data for revocation checking"
  530. signature = cli_vendor.sign(test_data, keyid=private_key)
  531. # Test 1: Without revocation file (should succeed)
  532. pkg_config = ConfigDict()
  533. pkg_config.set(
  534. (b"gpg", b"ssh"), b"allowedSignersFile", allowed_signers.encode()
  535. )
  536. pkg_vendor = SSHSigSignatureVendor.from_config(config=pkg_config)
  537. pkg_vendor.verify(test_data, signature)
  538. # Test 2: With revocation file containing this key (should fail)
  539. with open(revocation_file, "w") as revoked:
  540. revoked.write(f"{pub_key_content}\n")
  541. pkg_config.set(
  542. (b"gpg", b"ssh"), b"revocationFile", revocation_file.encode()
  543. )
  544. pkg_vendor = SSHSigSignatureVendor.from_config(config=pkg_config)
  545. with self.assertRaises(UntrustedSignature) as cm:
  546. pkg_vendor.verify(test_data, signature)
  547. self.assertIn("revoked", str(cm.exception))
  548. class SSHCliSignatureVendorTests(unittest.TestCase):
  549. """Tests for SSHCliSignatureVendor."""
  550. def setUp(self) -> None:
  551. """Check if ssh-keygen is available."""
  552. if shutil.which("ssh-keygen") is None:
  553. self.skipTest("ssh-keygen command not available")
  554. def test_ssh_program_from_config(self) -> None:
  555. """Test reading gpg.ssh.program from config."""
  556. config = ConfigDict()
  557. config.set((b"gpg", b"ssh"), b"program", b"/usr/bin/ssh-keygen")
  558. vendor = SSHCliSignatureVendor.from_config(config=config)
  559. self.assertEqual(vendor.ssh_command, "/usr/bin/ssh-keygen")
  560. def test_ssh_program_explicit(self) -> None:
  561. """Test that ssh_command parameter works when passed directly."""
  562. vendor = SSHCliSignatureVendor(ssh_command="/usr/bin/ssh-keygen")
  563. self.assertEqual(vendor.ssh_command, "/usr/bin/ssh-keygen")
  564. def test_ssh_program_default(self) -> None:
  565. """Test default ssh-keygen command when no config provided."""
  566. vendor = SSHCliSignatureVendor()
  567. self.assertEqual(vendor.ssh_command, "ssh-keygen")
  568. def test_allowed_signers_from_config(self) -> None:
  569. """Test reading gpg.ssh.allowedSignersFile from config."""
  570. config = ConfigDict()
  571. config.set((b"gpg", b"ssh"), b"allowedSignersFile", b"/tmp/allowed_signers")
  572. vendor = SSHCliSignatureVendor.from_config(config=config)
  573. self.assertEqual(vendor.allowed_signers_file, "/tmp/allowed_signers")
  574. def test_sign_without_key_raises(self) -> None:
  575. """Test that signing without a key raises ValueError."""
  576. vendor = SSHCliSignatureVendor()
  577. with self.assertRaises(ValueError) as cm:
  578. vendor.sign(b"test data")
  579. self.assertIn("key", str(cm.exception).lower())
  580. def test_verify_without_allowed_signers_raises(self) -> None:
  581. """Test that verify without allowedSignersFile raises UntrustedSignature."""
  582. from dulwich.signature import UntrustedSignature
  583. vendor = SSHCliSignatureVendor()
  584. with self.assertRaises(UntrustedSignature) as cm:
  585. vendor.verify(b"test data", b"fake signature")
  586. self.assertIn("allowedSignersFile", str(cm.exception))
  587. def test_sign_and_verify_with_ssh_key(self) -> None:
  588. """Test sign and verify cycle with SSH key."""
  589. import os
  590. import tempfile
  591. # Generate a test SSH key
  592. with tempfile.TemporaryDirectory() as tmpdir:
  593. private_key = os.path.join(tmpdir, "test_key")
  594. public_key = private_key + ".pub"
  595. allowed_signers = os.path.join(tmpdir, "allowed_signers")
  596. # Generate Ed25519 key (no passphrase)
  597. subprocess.run(
  598. [
  599. "ssh-keygen",
  600. "-t",
  601. "ed25519",
  602. "-f",
  603. private_key,
  604. "-N",
  605. "",
  606. "-C",
  607. "test@example.com",
  608. ],
  609. capture_output=True,
  610. check=True,
  611. )
  612. # Create allowed_signers file
  613. with open(public_key) as pub:
  614. pub_key_content = pub.read().strip()
  615. with open(allowed_signers, "w") as allowed:
  616. allowed.write(f"git {pub_key_content}\n")
  617. # Create vendor with config
  618. config = ConfigDict()
  619. config.set(
  620. (b"gpg", b"ssh"), b"allowedSignersFile", allowed_signers.encode()
  621. )
  622. vendor = SSHCliSignatureVendor.from_config(config=config)
  623. # Test signing and verification
  624. test_data = b"test data to sign with SSH"
  625. signature = vendor.sign(test_data, keyid=private_key)
  626. self.assertIsInstance(signature, bytes)
  627. self.assertGreater(len(signature), 0)
  628. self.assertTrue(signature.startswith(b"-----BEGIN SSH SIGNATURE-----"))
  629. # Verify the signature
  630. vendor.verify(test_data, signature)
  631. def test_default_key_command(self) -> None:
  632. """Test gpg.ssh.defaultKeyCommand support."""
  633. import os
  634. import tempfile
  635. # Generate a test SSH key
  636. with tempfile.TemporaryDirectory() as tmpdir:
  637. private_key = os.path.join(tmpdir, "test_key")
  638. # Generate Ed25519 key (no passphrase)
  639. subprocess.run(
  640. [
  641. "ssh-keygen",
  642. "-t",
  643. "ed25519",
  644. "-f",
  645. private_key,
  646. "-N",
  647. "",
  648. "-C",
  649. "test@example.com",
  650. ],
  651. capture_output=True,
  652. check=True,
  653. )
  654. # Create config with defaultKeyCommand that echoes the key path
  655. config = ConfigDict()
  656. config.set(
  657. (b"gpg", b"ssh"), b"defaultKeyCommand", f"echo {private_key}".encode()
  658. )
  659. vendor = SSHCliSignatureVendor.from_config(config=config)
  660. test_data = b"test data"
  661. # Sign without providing keyid - should use defaultKeyCommand
  662. signature = vendor.sign(test_data)
  663. self.assertIsInstance(signature, bytes)
  664. self.assertGreater(len(signature), 0)
  665. def test_revocation_file_config(self) -> None:
  666. """Test that revocation file is read from config."""
  667. config = ConfigDict()
  668. config.set((b"gpg", b"ssh"), b"revocationFile", b"/path/to/revoked_keys")
  669. vendor = SSHCliSignatureVendor.from_config(config=config)
  670. self.assertEqual(vendor.revocation_file, "/path/to/revoked_keys")
  671. def test_available(self) -> None:
  672. """Test that available() returns boolean."""
  673. result = SSHCliSignatureVendor.available()
  674. self.assertIsInstance(result, bool)
  675. class DetectSignatureFormatTests(unittest.TestCase):
  676. """Tests for detect_signature_format function."""
  677. def test_detect_ssh_signature(self) -> None:
  678. """Test detecting SSH signature format."""
  679. ssh_sig = b"-----BEGIN SSH SIGNATURE-----\nfoo\n-----END SSH SIGNATURE-----"
  680. self.assertEqual(detect_signature_format(ssh_sig), SIGNATURE_FORMAT_SSH)
  681. def test_detect_pgp_signature(self) -> None:
  682. """Test detecting PGP signature format."""
  683. pgp_sig = b"-----BEGIN PGP SIGNATURE-----\nfoo\n-----END PGP SIGNATURE-----"
  684. self.assertEqual(detect_signature_format(pgp_sig), SIGNATURE_FORMAT_OPENPGP)
  685. def test_detect_x509_signature_pkcs7(self) -> None:
  686. """Test detecting X.509 PKCS7 signature format."""
  687. x509_sig = b"-----BEGIN PKCS7-----\nfoo\n-----END PKCS7-----"
  688. self.assertEqual(detect_signature_format(x509_sig), SIGNATURE_FORMAT_X509)
  689. def test_detect_x509_signature_signed_message(self) -> None:
  690. """Test detecting X.509 signed message format."""
  691. x509_sig = b"-----BEGIN SIGNED MESSAGE-----\nfoo\n-----END SIGNED MESSAGE-----"
  692. self.assertEqual(detect_signature_format(x509_sig), SIGNATURE_FORMAT_X509)
  693. def test_unknown_signature_format(self) -> None:
  694. """Test that unknown format raises ValueError."""
  695. with self.assertRaises(ValueError) as cm:
  696. detect_signature_format(b"not a signature")
  697. self.assertIn("Unable to detect", str(cm.exception))
  698. class GetSignatureVendorForSignatureTests(unittest.TestCase):
  699. """Tests for get_signature_vendor_for_signature function."""
  700. def test_get_vendor_for_ssh_signature(self) -> None:
  701. """Test getting vendor for SSH signature."""
  702. ssh_sig = b"-----BEGIN SSH SIGNATURE-----\nfoo\n-----END SSH SIGNATURE-----"
  703. vendor = get_signature_vendor_for_signature(ssh_sig)
  704. self.assertIsInstance(vendor, (SSHSigSignatureVendor, SSHCliSignatureVendor))
  705. def test_get_vendor_for_pgp_signature(self) -> None:
  706. """Test getting vendor for PGP signature."""
  707. pgp_sig = b"-----BEGIN PGP SIGNATURE-----\nfoo\n-----END PGP SIGNATURE-----"
  708. vendor = get_signature_vendor_for_signature(pgp_sig)
  709. self.assertIsInstance(vendor, (GPGSignatureVendor, GPGCliSignatureVendor))
  710. def test_get_vendor_for_x509_signature(self) -> None:
  711. """Test getting vendor for X.509 signature."""
  712. x509_sig = b"-----BEGIN PKCS7-----\nfoo\n-----END PKCS7-----"
  713. vendor = get_signature_vendor_for_signature(x509_sig)
  714. self.assertIsInstance(vendor, X509SignatureVendor)
  715. def test_get_vendor_with_config(self) -> None:
  716. """Test that config is passed to vendor."""
  717. config = ConfigDict()
  718. config.set((b"gpg",), b"program", b"gpg2")
  719. pgp_sig = b"-----BEGIN PGP SIGNATURE-----\nfoo\n-----END PGP SIGNATURE-----"
  720. vendor = get_signature_vendor_for_signature(pgp_sig, config=config)
  721. # If CLI vendor is used, check config was passed
  722. if isinstance(vendor, GPGCliSignatureVendor):
  723. self.assertEqual(vendor.gpg_command, "gpg2")