test_signature.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772
  1. # test_signature.py -- tests for signature.py
  2. # Copyright (C) 2025 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for signature vendors."""
  22. import shutil
  23. import subprocess
  24. import unittest
  25. from dulwich.config import ConfigDict
  26. from dulwich.signature import (
  27. SIGNATURE_FORMAT_OPENPGP,
  28. SIGNATURE_FORMAT_SSH,
  29. SIGNATURE_FORMAT_X509,
  30. GPGCliSignatureVendor,
  31. GPGSignatureVendor,
  32. SignatureVendor,
  33. SSHCliSignatureVendor,
  34. SSHSigSignatureVendor,
  35. X509SignatureVendor,
  36. detect_signature_format,
  37. get_signature_vendor,
  38. get_signature_vendor_for_signature,
  39. )
  40. try:
  41. import gpg
  42. except ImportError:
  43. gpg = None
  44. def get_valid_gpg_key() -> str | None:
  45. """Get a valid (non-revoked, non-expired, can-sign) GPG key from the keyring.
  46. Returns:
  47. A key fingerprint string that can be used for signing, or None if no valid key found.
  48. Raises:
  49. unittest.SkipTest: if gpg module is not available
  50. """
  51. if gpg is None:
  52. raise unittest.SkipTest("gpg module not available")
  53. with gpg.Context() as ctx:
  54. keys = list(ctx.keylist(secret=True))
  55. if not keys:
  56. return None
  57. # Find a non-revoked, non-expired key that can sign
  58. for key in keys:
  59. if not key.revoked and not key.expired and key.can_sign:
  60. return str(key.fpr)
  61. return None
  62. def get_valid_gpg_key_cli() -> str | None:
  63. """Get a valid (non-revoked, non-expired) GPG key fingerprint using CLI.
  64. Returns:
  65. A key fingerprint string, or None if no valid key found.
  66. """
  67. result = subprocess.run(
  68. ["gpg", "--list-secret-keys", "--with-colons"],
  69. capture_output=True,
  70. check=True,
  71. text=True,
  72. )
  73. # Find a valid key (field 2 should be '-' for valid, 'e' for expired, 'r' for revoked)
  74. current_key_valid = False
  75. for line in result.stdout.split("\n"):
  76. if line.startswith("sec:"):
  77. fields = line.split(":")
  78. # Only accept valid keys (field 2 is '-')
  79. current_key_valid = fields[1] == "-"
  80. elif line.startswith("fpr:") and current_key_valid:
  81. return line.split(":")[9]
  82. return None
  83. class SignatureVendorTests(unittest.TestCase):
  84. """Tests for SignatureVendor base class."""
  85. def test_sign_not_implemented(self) -> None:
  86. """Test that sign raises NotImplementedError."""
  87. vendor = SignatureVendor()
  88. with self.assertRaises(NotImplementedError):
  89. vendor.sign(b"test data")
  90. def test_verify_not_implemented(self) -> None:
  91. """Test that verify raises NotImplementedError."""
  92. vendor = SignatureVendor()
  93. with self.assertRaises(NotImplementedError):
  94. vendor.verify(b"test data", b"fake signature")
  95. @unittest.skipIf(gpg is None, "gpg not available")
  96. class GPGSignatureVendorTests(unittest.TestCase):
  97. """Tests for GPGSignatureVendor."""
  98. def test_min_trust_level_from_config(self) -> None:
  99. """Test reading gpg.minTrustLevel from config."""
  100. config = ConfigDict()
  101. config.set((b"gpg",), b"minTrustLevel", b"marginal")
  102. vendor = GPGSignatureVendor(config=config)
  103. self.assertEqual(vendor.min_trust_level, "marginal")
  104. def test_min_trust_level_default(self) -> None:
  105. """Test default when gpg.minTrustLevel not in config."""
  106. vendor = GPGSignatureVendor()
  107. self.assertIsNone(vendor.min_trust_level)
  108. def test_available(self) -> None:
  109. """Test that available() returns boolean."""
  110. result = GPGSignatureVendor.available()
  111. self.assertIsInstance(result, bool)
  112. def test_sign_and_verify(self) -> None:
  113. """Test basic sign and verify cycle.
  114. Note: This test requires a GPG key to be configured in the test
  115. environment. It may be skipped in environments without GPG setup.
  116. """
  117. vendor = GPGSignatureVendor()
  118. test_data = b"test data to sign"
  119. try:
  120. # Sign the data
  121. signature = vendor.sign(test_data)
  122. self.assertIsInstance(signature, bytes)
  123. self.assertGreater(len(signature), 0)
  124. # Verify the signature
  125. vendor.verify(test_data, signature)
  126. except gpg.errors.GPGMEError as e:
  127. # Skip test if no GPG key is available
  128. self.skipTest(f"GPG key not available: {e}")
  129. def test_verify_invalid_signature(self) -> None:
  130. """Test that verify raises an error for invalid signatures."""
  131. vendor = GPGSignatureVendor()
  132. test_data = b"test data"
  133. invalid_signature = b"this is not a valid signature"
  134. with self.assertRaises(gpg.errors.GPGMEError):
  135. vendor.verify(test_data, invalid_signature)
  136. def test_sign_with_keyid(self) -> None:
  137. """Test signing with a specific key ID.
  138. Note: This test requires a GPG key to be configured in the test
  139. environment. It may be skipped in environments without GPG setup.
  140. """
  141. vendor = GPGSignatureVendor()
  142. test_data = b"test data to sign"
  143. try:
  144. key = get_valid_gpg_key()
  145. if not key:
  146. self.skipTest("No valid GPG keys available for testing")
  147. signature = vendor.sign(test_data, keyid=key)
  148. self.assertIsInstance(signature, bytes)
  149. self.assertGreater(len(signature), 0)
  150. # Verify the signature
  151. vendor.verify(test_data, signature)
  152. except gpg.errors.GPGMEError as e:
  153. self.skipTest(f"GPG key not available: {e}")
  154. class GPGCliSignatureVendorTests(unittest.TestCase):
  155. """Tests for GPGCliSignatureVendor."""
  156. def setUp(self) -> None:
  157. """Check if gpg command is available."""
  158. if shutil.which("gpg") is None:
  159. self.skipTest("gpg command not available")
  160. def test_sign_and_verify(self) -> None:
  161. """Test basic sign and verify cycle using CLI."""
  162. vendor = GPGCliSignatureVendor()
  163. test_data = b"test data to sign"
  164. try:
  165. # Sign the data
  166. signature = vendor.sign(test_data)
  167. self.assertIsInstance(signature, bytes)
  168. self.assertGreater(len(signature), 0)
  169. self.assertTrue(signature.startswith(b"-----BEGIN PGP SIGNATURE-----"))
  170. # Verify the signature
  171. vendor.verify(test_data, signature)
  172. except subprocess.CalledProcessError as e:
  173. # Skip test if no GPG key is available or configured
  174. self.skipTest(f"GPG signing failed: {e}")
  175. def test_verify_invalid_signature(self) -> None:
  176. """Test that verify raises an error for invalid signatures."""
  177. from dulwich.signature import BadSignature
  178. vendor = GPGCliSignatureVendor()
  179. test_data = b"test data"
  180. invalid_signature = b"this is not a valid signature"
  181. with self.assertRaises(BadSignature):
  182. vendor.verify(test_data, invalid_signature)
  183. def test_sign_with_keyid(self) -> None:
  184. """Test signing with a specific key ID using CLI."""
  185. vendor = GPGCliSignatureVendor()
  186. test_data = b"test data to sign"
  187. try:
  188. # Try to get a key from the keyring
  189. result = subprocess.run(
  190. ["gpg", "--list-secret-keys", "--with-colons"],
  191. capture_output=True,
  192. check=True,
  193. text=True,
  194. )
  195. # Parse output to find a key fingerprint
  196. keyid = None
  197. for line in result.stdout.split("\n"):
  198. if line.startswith("fpr:"):
  199. keyid = line.split(":")[9]
  200. break
  201. if not keyid:
  202. self.skipTest("No GPG keys available for testing")
  203. signature = vendor.sign(test_data, keyid=keyid)
  204. self.assertIsInstance(signature, bytes)
  205. self.assertGreater(len(signature), 0)
  206. # Verify the signature
  207. vendor.verify(test_data, signature)
  208. except subprocess.CalledProcessError as e:
  209. self.skipTest(f"GPG key not available: {e}")
  210. def test_verify_with_keyids(self) -> None:
  211. """Test verifying with specific trusted key IDs."""
  212. from dulwich.signature import UntrustedSignature
  213. vendor = GPGCliSignatureVendor()
  214. test_data = b"test data to sign"
  215. try:
  216. valid_keyid = get_valid_gpg_key_cli()
  217. if not valid_keyid:
  218. self.skipTest("No valid GPG keys available for testing")
  219. # Sign with the specific key
  220. signature = vendor.sign(test_data, keyid=valid_keyid)
  221. # Verify with the correct keyid - should succeed
  222. vendor.verify(test_data, signature, keyids=[valid_keyid])
  223. # Verify with a different keyid - should fail
  224. fake_keyid = "0" * 40 # Fake 40-character fingerprint
  225. with self.assertRaises(UntrustedSignature):
  226. vendor.verify(test_data, signature, keyids=[fake_keyid])
  227. except subprocess.CalledProcessError as e:
  228. self.skipTest(f"GPG key not available: {e}")
  229. def test_custom_gpg_command(self) -> None:
  230. """Test using a custom GPG command path."""
  231. vendor = GPGCliSignatureVendor(gpg_command="gpg")
  232. test_data = b"test data"
  233. try:
  234. signature = vendor.sign(test_data)
  235. self.assertIsInstance(signature, bytes)
  236. except subprocess.CalledProcessError as e:
  237. self.skipTest(f"GPG not available: {e}")
  238. def test_gpg_program_from_config(self) -> None:
  239. """Test reading gpg.program from config."""
  240. # Create a config with gpg.program set
  241. config = ConfigDict()
  242. config.set((b"gpg",), b"program", b"gpg2")
  243. vendor = GPGCliSignatureVendor(config=config)
  244. self.assertEqual(vendor.gpg_command, "gpg2")
  245. def test_gpg_program_override(self) -> None:
  246. """Test that gpg_command parameter overrides config."""
  247. config = ConfigDict()
  248. config.set((b"gpg",), b"program", b"gpg2")
  249. vendor = GPGCliSignatureVendor(config=config, gpg_command="gpg")
  250. self.assertEqual(vendor.gpg_command, "gpg")
  251. def test_gpg_program_default(self) -> None:
  252. """Test default gpg command when no config provided."""
  253. vendor = GPGCliSignatureVendor()
  254. self.assertEqual(vendor.gpg_command, "gpg")
  255. def test_gpg_program_default_when_not_in_config(self) -> None:
  256. """Test default gpg command when config doesn't have gpg.program."""
  257. config = ConfigDict()
  258. vendor = GPGCliSignatureVendor(config=config)
  259. self.assertEqual(vendor.gpg_command, "gpg")
  260. def test_available(self) -> None:
  261. """Test that available() returns boolean."""
  262. result = GPGCliSignatureVendor.available()
  263. self.assertIsInstance(result, bool)
  264. class X509SignatureVendorTests(unittest.TestCase):
  265. """Tests for X509SignatureVendor."""
  266. def test_gpgsm_program_default(self) -> None:
  267. """Test default gpgsm command is 'gpgsm'."""
  268. vendor = X509SignatureVendor()
  269. self.assertEqual(vendor.gpgsm_command, "gpgsm")
  270. def test_gpgsm_program_from_config(self) -> None:
  271. """Test reading gpg.x509.program from config."""
  272. config = ConfigDict()
  273. config.set((b"gpg", b"x509"), b"program", b"/usr/local/bin/gpgsm")
  274. vendor = X509SignatureVendor(config=config)
  275. self.assertEqual(vendor.gpgsm_command, "/usr/local/bin/gpgsm")
  276. def test_gpgsm_program_override(self) -> None:
  277. """Test gpgsm_command parameter overrides config."""
  278. config = ConfigDict()
  279. config.set((b"gpg", b"x509"), b"program", b"/usr/local/bin/gpgsm")
  280. vendor = X509SignatureVendor(config=config, gpgsm_command="/custom/gpgsm")
  281. self.assertEqual(vendor.gpgsm_command, "/custom/gpgsm")
  282. def test_gpgsm_program_default_when_not_in_config(self) -> None:
  283. """Test default when gpg.x509.program not in config."""
  284. config = ConfigDict()
  285. vendor = X509SignatureVendor(config=config)
  286. self.assertEqual(vendor.gpgsm_command, "gpgsm")
  287. def test_available(self) -> None:
  288. """Test that available() returns boolean."""
  289. result = X509SignatureVendor.available()
  290. self.assertIsInstance(result, bool)
  291. @unittest.skipIf(
  292. shutil.which("gpgsm") is None, "gpgsm command not available in PATH"
  293. )
  294. def test_sign_and_verify(self) -> None:
  295. """Test basic X.509 sign and verify cycle.
  296. Note: This test requires gpgsm and an X.509 certificate to be configured.
  297. It may be skipped in environments without gpgsm/certificate setup.
  298. """
  299. vendor = X509SignatureVendor()
  300. test_data = b"test data to sign"
  301. try:
  302. # Try to sign the data
  303. signature = vendor.sign(test_data)
  304. self.assertIsInstance(signature, bytes)
  305. self.assertGreater(len(signature), 0)
  306. # Verify the signature
  307. vendor.verify(test_data, signature)
  308. except subprocess.CalledProcessError:
  309. # Skip test if no X.509 certificate is available
  310. self.skipTest("No X.509 certificate available for signing")
  311. class GetSignatureVendorTests(unittest.TestCase):
  312. """Tests for get_signature_vendor function."""
  313. def test_default_format(self) -> None:
  314. """Test that default format is openpgp."""
  315. vendor = get_signature_vendor()
  316. self.assertIsInstance(vendor, (GPGSignatureVendor, GPGCliSignatureVendor))
  317. def test_explicit_openpgp_format(self) -> None:
  318. """Test explicitly requesting openpgp format."""
  319. vendor = get_signature_vendor(format="openpgp")
  320. self.assertIsInstance(vendor, (GPGSignatureVendor, GPGCliSignatureVendor))
  321. def test_format_from_config(self) -> None:
  322. """Test reading format from config."""
  323. config = ConfigDict()
  324. config.set((b"gpg",), b"format", b"openpgp")
  325. vendor = get_signature_vendor(config=config)
  326. self.assertIsInstance(vendor, (GPGSignatureVendor, GPGCliSignatureVendor))
  327. def test_format_case_insensitive(self) -> None:
  328. """Test that format is case-insensitive."""
  329. vendor = get_signature_vendor(format="OpenPGP")
  330. self.assertIsInstance(vendor, (GPGSignatureVendor, GPGCliSignatureVendor))
  331. def test_x509_format_supported(self) -> None:
  332. """Test that x509 format is now supported."""
  333. vendor = get_signature_vendor(format="x509")
  334. self.assertIsInstance(vendor, X509SignatureVendor)
  335. def test_ssh_format_supported(self) -> None:
  336. """Test that ssh format is now supported."""
  337. vendor = get_signature_vendor(format="ssh")
  338. # Should be either SSHSigSignatureVendor or SSHCliSignatureVendor
  339. self.assertIsInstance(vendor, (SSHSigSignatureVendor, SSHCliSignatureVendor))
  340. def test_invalid_format(self) -> None:
  341. """Test that invalid format raises ValueError."""
  342. with self.assertRaises(ValueError) as cm:
  343. get_signature_vendor(format="invalid")
  344. self.assertIn("Unsupported", str(cm.exception))
  345. def test_config_passed_to_vendor(self) -> None:
  346. """Test that config is passed to the vendor."""
  347. config = ConfigDict()
  348. config.set((b"gpg",), b"program", b"gpg2")
  349. vendor = get_signature_vendor(format="openpgp", config=config)
  350. # If CLI vendor is used, check that config was passed
  351. if isinstance(vendor, GPGCliSignatureVendor):
  352. self.assertEqual(vendor.gpg_command, "gpg2")
  353. def test_ssh_format(self) -> None:
  354. """Test requesting SSH format."""
  355. vendor = get_signature_vendor(format="ssh")
  356. # Should be either SSHSigSignatureVendor or SSHCliSignatureVendor
  357. self.assertIsInstance(vendor, (SSHSigSignatureVendor, SSHCliSignatureVendor))
  358. def test_x509_format(self) -> None:
  359. """Test requesting X.509 format."""
  360. vendor = get_signature_vendor(format="x509")
  361. self.assertIsInstance(vendor, X509SignatureVendor)
  362. class SSHSigSignatureVendorTests(unittest.TestCase):
  363. """Tests for SSHSigSignatureVendor (sshsig package implementation)."""
  364. def test_sign_not_supported(self) -> None:
  365. """Test that sign raises NotImplementedError with helpful message."""
  366. vendor = SSHSigSignatureVendor()
  367. with self.assertRaises(NotImplementedError) as cm:
  368. vendor.sign(b"test data", keyid="dummy")
  369. self.assertIn("SSHCliSignatureVendor", str(cm.exception))
  370. def test_verify_without_config_raises(self) -> None:
  371. """Test that verify without config or keyids raises UntrustedSignature."""
  372. from dulwich.signature import UntrustedSignature
  373. vendor = SSHSigSignatureVendor()
  374. with self.assertRaises(UntrustedSignature) as cm:
  375. vendor.verify(b"test data", b"fake signature")
  376. self.assertIn("allowedSignersFile", str(cm.exception))
  377. def test_config_parsing(self) -> None:
  378. """Test parsing SSH config options."""
  379. config = ConfigDict()
  380. config.set((b"gpg", b"ssh"), b"allowedSignersFile", b"/path/to/allowed")
  381. config.set((b"gpg", b"ssh"), b"defaultKeyCommand", b"ssh-add -L")
  382. vendor = SSHSigSignatureVendor(config=config)
  383. self.assertEqual(vendor.allowed_signers_file, "/path/to/allowed")
  384. self.assertEqual(vendor.default_key_command, "ssh-add -L")
  385. def test_available(self) -> None:
  386. """Test that available() returns boolean."""
  387. result = SSHSigSignatureVendor.available()
  388. self.assertIsInstance(result, bool)
  389. def test_verify_with_cli_generated_signature(self) -> None:
  390. """Test verifying a signature created by SSH CLI vendor."""
  391. import os
  392. import tempfile
  393. if shutil.which("ssh-keygen") is None:
  394. self.skipTest("ssh-keygen not available")
  395. # Generate a test SSH key and signature using CLI vendor
  396. with tempfile.TemporaryDirectory() as tmpdir:
  397. private_key = os.path.join(tmpdir, "test_key")
  398. public_key = private_key + ".pub"
  399. allowed_signers = os.path.join(tmpdir, "allowed_signers")
  400. # Generate Ed25519 key
  401. subprocess.run(
  402. [
  403. "ssh-keygen",
  404. "-t",
  405. "ed25519",
  406. "-f",
  407. private_key,
  408. "-N",
  409. "",
  410. "-C",
  411. "test@example.com",
  412. ],
  413. capture_output=True,
  414. check=True,
  415. )
  416. # Create allowed_signers file
  417. with open(public_key) as pub:
  418. pub_key_content = pub.read().strip()
  419. with open(allowed_signers, "w") as allowed:
  420. allowed.write(f"* {pub_key_content}\n")
  421. # Sign with CLI vendor
  422. cli_config = ConfigDict()
  423. cli_config.set(
  424. (b"gpg", b"ssh"), b"allowedSignersFile", allowed_signers.encode()
  425. )
  426. cli_vendor = SSHCliSignatureVendor(config=cli_config)
  427. test_data = b"test data for sshsig verification"
  428. signature = cli_vendor.sign(test_data, keyid=private_key)
  429. # Verify with sshsig package vendor
  430. pkg_config = ConfigDict()
  431. pkg_config.set(
  432. (b"gpg", b"ssh"), b"allowedSignersFile", allowed_signers.encode()
  433. )
  434. pkg_vendor = SSHSigSignatureVendor(config=pkg_config)
  435. # This should succeed
  436. pkg_vendor.verify(test_data, signature)
  437. class SSHCliSignatureVendorTests(unittest.TestCase):
  438. """Tests for SSHCliSignatureVendor."""
  439. def setUp(self) -> None:
  440. """Check if ssh-keygen is available."""
  441. if shutil.which("ssh-keygen") is None:
  442. self.skipTest("ssh-keygen command not available")
  443. def test_ssh_program_from_config(self) -> None:
  444. """Test reading gpg.ssh.program from config."""
  445. config = ConfigDict()
  446. config.set((b"gpg", b"ssh"), b"program", b"/usr/bin/ssh-keygen")
  447. vendor = SSHCliSignatureVendor(config=config)
  448. self.assertEqual(vendor.ssh_command, "/usr/bin/ssh-keygen")
  449. def test_ssh_program_override(self) -> None:
  450. """Test that ssh_command parameter overrides config."""
  451. config = ConfigDict()
  452. config.set((b"gpg", b"ssh"), b"program", b"/usr/bin/ssh-keygen")
  453. vendor = SSHCliSignatureVendor(config=config, ssh_command="ssh-keygen")
  454. self.assertEqual(vendor.ssh_command, "ssh-keygen")
  455. def test_ssh_program_default(self) -> None:
  456. """Test default ssh-keygen command when no config provided."""
  457. vendor = SSHCliSignatureVendor()
  458. self.assertEqual(vendor.ssh_command, "ssh-keygen")
  459. def test_allowed_signers_from_config(self) -> None:
  460. """Test reading gpg.ssh.allowedSignersFile from config."""
  461. config = ConfigDict()
  462. config.set((b"gpg", b"ssh"), b"allowedSignersFile", b"/tmp/allowed_signers")
  463. vendor = SSHCliSignatureVendor(config=config)
  464. self.assertEqual(vendor.allowed_signers_file, "/tmp/allowed_signers")
  465. def test_sign_without_key_raises(self) -> None:
  466. """Test that signing without a key raises ValueError."""
  467. vendor = SSHCliSignatureVendor()
  468. with self.assertRaises(ValueError) as cm:
  469. vendor.sign(b"test data")
  470. self.assertIn("key", str(cm.exception).lower())
  471. def test_verify_without_allowed_signers_raises(self) -> None:
  472. """Test that verify without allowedSignersFile raises UntrustedSignature."""
  473. from dulwich.signature import UntrustedSignature
  474. vendor = SSHCliSignatureVendor()
  475. with self.assertRaises(UntrustedSignature) as cm:
  476. vendor.verify(b"test data", b"fake signature")
  477. self.assertIn("allowedSignersFile", str(cm.exception))
  478. def test_sign_and_verify_with_ssh_key(self) -> None:
  479. """Test sign and verify cycle with SSH key."""
  480. import os
  481. import tempfile
  482. # Generate a test SSH key
  483. with tempfile.TemporaryDirectory() as tmpdir:
  484. private_key = os.path.join(tmpdir, "test_key")
  485. public_key = private_key + ".pub"
  486. allowed_signers = os.path.join(tmpdir, "allowed_signers")
  487. # Generate Ed25519 key (no passphrase)
  488. subprocess.run(
  489. [
  490. "ssh-keygen",
  491. "-t",
  492. "ed25519",
  493. "-f",
  494. private_key,
  495. "-N",
  496. "",
  497. "-C",
  498. "test@example.com",
  499. ],
  500. capture_output=True,
  501. check=True,
  502. )
  503. # Create allowed_signers file
  504. with open(public_key) as pub:
  505. pub_key_content = pub.read().strip()
  506. with open(allowed_signers, "w") as allowed:
  507. allowed.write(f"git {pub_key_content}\n")
  508. # Create vendor with config
  509. config = ConfigDict()
  510. config.set(
  511. (b"gpg", b"ssh"), b"allowedSignersFile", allowed_signers.encode()
  512. )
  513. vendor = SSHCliSignatureVendor(config=config)
  514. # Test signing and verification
  515. test_data = b"test data to sign with SSH"
  516. signature = vendor.sign(test_data, keyid=private_key)
  517. self.assertIsInstance(signature, bytes)
  518. self.assertGreater(len(signature), 0)
  519. self.assertTrue(signature.startswith(b"-----BEGIN SSH SIGNATURE-----"))
  520. # Verify the signature
  521. vendor.verify(test_data, signature)
  522. def test_default_key_command(self) -> None:
  523. """Test gpg.ssh.defaultKeyCommand support."""
  524. import os
  525. import tempfile
  526. # Generate a test SSH key
  527. with tempfile.TemporaryDirectory() as tmpdir:
  528. private_key = os.path.join(tmpdir, "test_key")
  529. # Generate Ed25519 key (no passphrase)
  530. subprocess.run(
  531. [
  532. "ssh-keygen",
  533. "-t",
  534. "ed25519",
  535. "-f",
  536. private_key,
  537. "-N",
  538. "",
  539. "-C",
  540. "test@example.com",
  541. ],
  542. capture_output=True,
  543. check=True,
  544. )
  545. # Create config with defaultKeyCommand that echoes the key path
  546. config = ConfigDict()
  547. config.set(
  548. (b"gpg", b"ssh"), b"defaultKeyCommand", f"echo {private_key}".encode()
  549. )
  550. vendor = SSHCliSignatureVendor(config=config)
  551. test_data = b"test data"
  552. # Sign without providing keyid - should use defaultKeyCommand
  553. signature = vendor.sign(test_data)
  554. self.assertIsInstance(signature, bytes)
  555. self.assertGreater(len(signature), 0)
  556. def test_revocation_file_config(self) -> None:
  557. """Test that revocation file is read from config."""
  558. config = ConfigDict()
  559. config.set((b"gpg", b"ssh"), b"revocationFile", b"/path/to/revoked_keys")
  560. vendor = SSHCliSignatureVendor(config=config)
  561. self.assertEqual(vendor.revocation_file, "/path/to/revoked_keys")
  562. def test_available(self) -> None:
  563. """Test that available() returns boolean."""
  564. result = SSHCliSignatureVendor.available()
  565. self.assertIsInstance(result, bool)
  566. class DetectSignatureFormatTests(unittest.TestCase):
  567. """Tests for detect_signature_format function."""
  568. def test_detect_ssh_signature(self) -> None:
  569. """Test detecting SSH signature format."""
  570. ssh_sig = b"-----BEGIN SSH SIGNATURE-----\nfoo\n-----END SSH SIGNATURE-----"
  571. self.assertEqual(detect_signature_format(ssh_sig), SIGNATURE_FORMAT_SSH)
  572. def test_detect_pgp_signature(self) -> None:
  573. """Test detecting PGP signature format."""
  574. pgp_sig = b"-----BEGIN PGP SIGNATURE-----\nfoo\n-----END PGP SIGNATURE-----"
  575. self.assertEqual(detect_signature_format(pgp_sig), SIGNATURE_FORMAT_OPENPGP)
  576. def test_detect_x509_signature_pkcs7(self) -> None:
  577. """Test detecting X.509 PKCS7 signature format."""
  578. x509_sig = b"-----BEGIN PKCS7-----\nfoo\n-----END PKCS7-----"
  579. self.assertEqual(detect_signature_format(x509_sig), SIGNATURE_FORMAT_X509)
  580. def test_detect_x509_signature_signed_message(self) -> None:
  581. """Test detecting X.509 signed message format."""
  582. x509_sig = b"-----BEGIN SIGNED MESSAGE-----\nfoo\n-----END SIGNED MESSAGE-----"
  583. self.assertEqual(detect_signature_format(x509_sig), SIGNATURE_FORMAT_X509)
  584. def test_unknown_signature_format(self) -> None:
  585. """Test that unknown format raises ValueError."""
  586. with self.assertRaises(ValueError) as cm:
  587. detect_signature_format(b"not a signature")
  588. self.assertIn("Unable to detect", str(cm.exception))
  589. class GetSignatureVendorForSignatureTests(unittest.TestCase):
  590. """Tests for get_signature_vendor_for_signature function."""
  591. def test_get_vendor_for_ssh_signature(self) -> None:
  592. """Test getting vendor for SSH signature."""
  593. ssh_sig = b"-----BEGIN SSH SIGNATURE-----\nfoo\n-----END SSH SIGNATURE-----"
  594. vendor = get_signature_vendor_for_signature(ssh_sig)
  595. self.assertIsInstance(vendor, (SSHSigSignatureVendor, SSHCliSignatureVendor))
  596. def test_get_vendor_for_pgp_signature(self) -> None:
  597. """Test getting vendor for PGP signature."""
  598. pgp_sig = b"-----BEGIN PGP SIGNATURE-----\nfoo\n-----END PGP SIGNATURE-----"
  599. vendor = get_signature_vendor_for_signature(pgp_sig)
  600. self.assertIsInstance(vendor, (GPGSignatureVendor, GPGCliSignatureVendor))
  601. def test_get_vendor_for_x509_signature(self) -> None:
  602. """Test getting vendor for X.509 signature."""
  603. x509_sig = b"-----BEGIN PKCS7-----\nfoo\n-----END PKCS7-----"
  604. vendor = get_signature_vendor_for_signature(x509_sig)
  605. self.assertIsInstance(vendor, X509SignatureVendor)
  606. def test_get_vendor_with_config(self) -> None:
  607. """Test that config is passed to vendor."""
  608. config = ConfigDict()
  609. config.set((b"gpg",), b"program", b"gpg2")
  610. pgp_sig = b"-----BEGIN PGP SIGNATURE-----\nfoo\n-----END PGP SIGNATURE-----"
  611. vendor = get_signature_vendor_for_signature(pgp_sig, config=config)
  612. # If CLI vendor is used, check config was passed
  613. if isinstance(vendor, GPGCliSignatureVendor):
  614. self.assertEqual(vendor.gpg_command, "gpg2")