test_signature.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730
  1. # test_signature.py -- tests for signature.py
  2. # Copyright (C) 2025 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for signature vendors."""
  22. import shutil
  23. import subprocess
  24. import unittest
  25. from dulwich.config import ConfigDict
  26. from dulwich.signature import (
  27. SIGNATURE_FORMAT_OPENPGP,
  28. SIGNATURE_FORMAT_SSH,
  29. SIGNATURE_FORMAT_X509,
  30. GPGCliSignatureVendor,
  31. GPGSignatureVendor,
  32. SignatureVendor,
  33. SSHCliSignatureVendor,
  34. SSHSigSignatureVendor,
  35. X509SignatureVendor,
  36. detect_signature_format,
  37. get_signature_vendor,
  38. get_signature_vendor_for_signature,
  39. )
  40. try:
  41. import gpg
  42. except ImportError:
  43. gpg = None
  44. class SignatureVendorTests(unittest.TestCase):
  45. """Tests for SignatureVendor base class."""
  46. def test_sign_not_implemented(self) -> None:
  47. """Test that sign raises NotImplementedError."""
  48. vendor = SignatureVendor()
  49. with self.assertRaises(NotImplementedError):
  50. vendor.sign(b"test data")
  51. def test_verify_not_implemented(self) -> None:
  52. """Test that verify raises NotImplementedError."""
  53. vendor = SignatureVendor()
  54. with self.assertRaises(NotImplementedError):
  55. vendor.verify(b"test data", b"fake signature")
  56. @unittest.skipIf(gpg is None, "gpg not available")
  57. class GPGSignatureVendorTests(unittest.TestCase):
  58. """Tests for GPGSignatureVendor."""
  59. def test_min_trust_level_from_config(self) -> None:
  60. """Test reading gpg.minTrustLevel from config."""
  61. config = ConfigDict()
  62. config.set((b"gpg",), b"minTrustLevel", b"marginal")
  63. vendor = GPGSignatureVendor(config=config)
  64. self.assertEqual(vendor.min_trust_level, "marginal")
  65. def test_min_trust_level_default(self) -> None:
  66. """Test default when gpg.minTrustLevel not in config."""
  67. vendor = GPGSignatureVendor()
  68. self.assertIsNone(vendor.min_trust_level)
  69. def test_available(self) -> None:
  70. """Test that available() returns boolean."""
  71. result = GPGSignatureVendor.available()
  72. self.assertIsInstance(result, bool)
  73. def test_sign_and_verify(self) -> None:
  74. """Test basic sign and verify cycle.
  75. Note: This test requires a GPG key to be configured in the test
  76. environment. It may be skipped in environments without GPG setup.
  77. """
  78. vendor = GPGSignatureVendor()
  79. test_data = b"test data to sign"
  80. try:
  81. # Sign the data
  82. signature = vendor.sign(test_data)
  83. self.assertIsInstance(signature, bytes)
  84. self.assertGreater(len(signature), 0)
  85. # Verify the signature
  86. vendor.verify(test_data, signature)
  87. except gpg.errors.GPGMEError as e:
  88. # Skip test if no GPG key is available
  89. self.skipTest(f"GPG key not available: {e}")
  90. def test_verify_invalid_signature(self) -> None:
  91. """Test that verify raises an error for invalid signatures."""
  92. vendor = GPGSignatureVendor()
  93. test_data = b"test data"
  94. invalid_signature = b"this is not a valid signature"
  95. with self.assertRaises(gpg.errors.GPGMEError):
  96. vendor.verify(test_data, invalid_signature)
  97. def test_sign_with_keyid(self) -> None:
  98. """Test signing with a specific key ID.
  99. Note: This test requires a GPG key to be configured in the test
  100. environment. It may be skipped in environments without GPG setup.
  101. """
  102. vendor = GPGSignatureVendor()
  103. test_data = b"test data to sign"
  104. try:
  105. # Try to get a key from the keyring
  106. with gpg.Context() as ctx:
  107. keys = list(ctx.keylist(secret=True))
  108. if not keys:
  109. self.skipTest("No GPG keys available for testing")
  110. key = keys[0]
  111. signature = vendor.sign(test_data, keyid=key.fpr)
  112. self.assertIsInstance(signature, bytes)
  113. self.assertGreater(len(signature), 0)
  114. # Verify the signature
  115. vendor.verify(test_data, signature)
  116. except gpg.errors.GPGMEError as e:
  117. self.skipTest(f"GPG key not available: {e}")
  118. class GPGCliSignatureVendorTests(unittest.TestCase):
  119. """Tests for GPGCliSignatureVendor."""
  120. def setUp(self) -> None:
  121. """Check if gpg command is available."""
  122. if shutil.which("gpg") is None:
  123. self.skipTest("gpg command not available")
  124. def test_sign_and_verify(self) -> None:
  125. """Test basic sign and verify cycle using CLI."""
  126. vendor = GPGCliSignatureVendor()
  127. test_data = b"test data to sign"
  128. try:
  129. # Sign the data
  130. signature = vendor.sign(test_data)
  131. self.assertIsInstance(signature, bytes)
  132. self.assertGreater(len(signature), 0)
  133. self.assertTrue(signature.startswith(b"-----BEGIN PGP SIGNATURE-----"))
  134. # Verify the signature
  135. vendor.verify(test_data, signature)
  136. except subprocess.CalledProcessError as e:
  137. # Skip test if no GPG key is available or configured
  138. self.skipTest(f"GPG signing failed: {e}")
  139. def test_verify_invalid_signature(self) -> None:
  140. """Test that verify raises an error for invalid signatures."""
  141. vendor = GPGCliSignatureVendor()
  142. test_data = b"test data"
  143. invalid_signature = b"this is not a valid signature"
  144. with self.assertRaises(subprocess.CalledProcessError):
  145. vendor.verify(test_data, invalid_signature)
  146. def test_sign_with_keyid(self) -> None:
  147. """Test signing with a specific key ID using CLI."""
  148. vendor = GPGCliSignatureVendor()
  149. test_data = b"test data to sign"
  150. try:
  151. # Try to get a key from the keyring
  152. result = subprocess.run(
  153. ["gpg", "--list-secret-keys", "--with-colons"],
  154. capture_output=True,
  155. check=True,
  156. text=True,
  157. )
  158. # Parse output to find a key fingerprint
  159. keyid = None
  160. for line in result.stdout.split("\n"):
  161. if line.startswith("fpr:"):
  162. keyid = line.split(":")[9]
  163. break
  164. if not keyid:
  165. self.skipTest("No GPG keys available for testing")
  166. signature = vendor.sign(test_data, keyid=keyid)
  167. self.assertIsInstance(signature, bytes)
  168. self.assertGreater(len(signature), 0)
  169. # Verify the signature
  170. vendor.verify(test_data, signature)
  171. except subprocess.CalledProcessError as e:
  172. self.skipTest(f"GPG key not available: {e}")
  173. def test_verify_with_keyids(self) -> None:
  174. """Test verifying with specific trusted key IDs."""
  175. vendor = GPGCliSignatureVendor()
  176. test_data = b"test data to sign"
  177. try:
  178. # Sign without specifying a key (use default)
  179. signature = vendor.sign(test_data)
  180. # Get the primary key fingerprint from the keyring
  181. result = subprocess.run(
  182. ["gpg", "--list-secret-keys", "--with-colons"],
  183. capture_output=True,
  184. check=True,
  185. text=True,
  186. )
  187. primary_keyid = None
  188. for line in result.stdout.split("\n"):
  189. if line.startswith("fpr:"):
  190. primary_keyid = line.split(":")[9]
  191. break
  192. if not primary_keyid:
  193. self.skipTest("No GPG keys available for testing")
  194. # Verify with the correct primary keyid - should succeed
  195. # (GPG shows primary key fingerprint even if signed by subkey)
  196. vendor.verify(test_data, signature, keyids=[primary_keyid])
  197. # Verify with a different keyid - should fail
  198. fake_keyid = "0" * 40 # Fake 40-character fingerprint
  199. with self.assertRaises(ValueError):
  200. vendor.verify(test_data, signature, keyids=[fake_keyid])
  201. except subprocess.CalledProcessError as e:
  202. self.skipTest(f"GPG key not available: {e}")
  203. def test_custom_gpg_command(self) -> None:
  204. """Test using a custom GPG command path."""
  205. vendor = GPGCliSignatureVendor(gpg_command="gpg")
  206. test_data = b"test data"
  207. try:
  208. signature = vendor.sign(test_data)
  209. self.assertIsInstance(signature, bytes)
  210. except subprocess.CalledProcessError as e:
  211. self.skipTest(f"GPG not available: {e}")
  212. def test_gpg_program_from_config(self) -> None:
  213. """Test reading gpg.program from config."""
  214. # Create a config with gpg.program set
  215. config = ConfigDict()
  216. config.set((b"gpg",), b"program", b"gpg2")
  217. vendor = GPGCliSignatureVendor(config=config)
  218. self.assertEqual(vendor.gpg_command, "gpg2")
  219. def test_gpg_program_override(self) -> None:
  220. """Test that gpg_command parameter overrides config."""
  221. config = ConfigDict()
  222. config.set((b"gpg",), b"program", b"gpg2")
  223. vendor = GPGCliSignatureVendor(config=config, gpg_command="gpg")
  224. self.assertEqual(vendor.gpg_command, "gpg")
  225. def test_gpg_program_default(self) -> None:
  226. """Test default gpg command when no config provided."""
  227. vendor = GPGCliSignatureVendor()
  228. self.assertEqual(vendor.gpg_command, "gpg")
  229. def test_gpg_program_default_when_not_in_config(self) -> None:
  230. """Test default gpg command when config doesn't have gpg.program."""
  231. config = ConfigDict()
  232. vendor = GPGCliSignatureVendor(config=config)
  233. self.assertEqual(vendor.gpg_command, "gpg")
  234. def test_available(self) -> None:
  235. """Test that available() returns boolean."""
  236. result = GPGCliSignatureVendor.available()
  237. self.assertIsInstance(result, bool)
  238. class X509SignatureVendorTests(unittest.TestCase):
  239. """Tests for X509SignatureVendor."""
  240. def test_gpgsm_program_default(self) -> None:
  241. """Test default gpgsm command is 'gpgsm'."""
  242. vendor = X509SignatureVendor()
  243. self.assertEqual(vendor.gpgsm_command, "gpgsm")
  244. def test_gpgsm_program_from_config(self) -> None:
  245. """Test reading gpg.x509.program from config."""
  246. config = ConfigDict()
  247. config.set((b"gpg", b"x509"), b"program", b"/usr/local/bin/gpgsm")
  248. vendor = X509SignatureVendor(config=config)
  249. self.assertEqual(vendor.gpgsm_command, "/usr/local/bin/gpgsm")
  250. def test_gpgsm_program_override(self) -> None:
  251. """Test gpgsm_command parameter overrides config."""
  252. config = ConfigDict()
  253. config.set((b"gpg", b"x509"), b"program", b"/usr/local/bin/gpgsm")
  254. vendor = X509SignatureVendor(config=config, gpgsm_command="/custom/gpgsm")
  255. self.assertEqual(vendor.gpgsm_command, "/custom/gpgsm")
  256. def test_gpgsm_program_default_when_not_in_config(self) -> None:
  257. """Test default when gpg.x509.program not in config."""
  258. config = ConfigDict()
  259. vendor = X509SignatureVendor(config=config)
  260. self.assertEqual(vendor.gpgsm_command, "gpgsm")
  261. def test_available(self) -> None:
  262. """Test that available() returns boolean."""
  263. result = X509SignatureVendor.available()
  264. self.assertIsInstance(result, bool)
  265. @unittest.skipIf(
  266. shutil.which("gpgsm") is None, "gpgsm command not available in PATH"
  267. )
  268. def test_sign_and_verify(self) -> None:
  269. """Test basic X.509 sign and verify cycle.
  270. Note: This test requires gpgsm and an X.509 certificate to be configured.
  271. It may be skipped in environments without gpgsm/certificate setup.
  272. """
  273. vendor = X509SignatureVendor()
  274. test_data = b"test data to sign"
  275. try:
  276. # Try to sign the data
  277. signature = vendor.sign(test_data)
  278. self.assertIsInstance(signature, bytes)
  279. self.assertGreater(len(signature), 0)
  280. # Verify the signature
  281. vendor.verify(test_data, signature)
  282. except subprocess.CalledProcessError:
  283. # Skip test if no X.509 certificate is available
  284. self.skipTest("No X.509 certificate available for signing")
  285. class GetSignatureVendorTests(unittest.TestCase):
  286. """Tests for get_signature_vendor function."""
  287. def test_default_format(self) -> None:
  288. """Test that default format is openpgp."""
  289. vendor = get_signature_vendor()
  290. self.assertIsInstance(vendor, (GPGSignatureVendor, GPGCliSignatureVendor))
  291. def test_explicit_openpgp_format(self) -> None:
  292. """Test explicitly requesting openpgp format."""
  293. vendor = get_signature_vendor(format="openpgp")
  294. self.assertIsInstance(vendor, (GPGSignatureVendor, GPGCliSignatureVendor))
  295. def test_format_from_config(self) -> None:
  296. """Test reading format from config."""
  297. config = ConfigDict()
  298. config.set((b"gpg",), b"format", b"openpgp")
  299. vendor = get_signature_vendor(config=config)
  300. self.assertIsInstance(vendor, (GPGSignatureVendor, GPGCliSignatureVendor))
  301. def test_format_case_insensitive(self) -> None:
  302. """Test that format is case-insensitive."""
  303. vendor = get_signature_vendor(format="OpenPGP")
  304. self.assertIsInstance(vendor, (GPGSignatureVendor, GPGCliSignatureVendor))
  305. def test_x509_format_supported(self) -> None:
  306. """Test that x509 format is now supported."""
  307. vendor = get_signature_vendor(format="x509")
  308. self.assertIsInstance(vendor, X509SignatureVendor)
  309. def test_ssh_format_supported(self) -> None:
  310. """Test that ssh format is now supported."""
  311. vendor = get_signature_vendor(format="ssh")
  312. # Should be either SSHSigSignatureVendor or SSHCliSignatureVendor
  313. self.assertIsInstance(vendor, (SSHSigSignatureVendor, SSHCliSignatureVendor))
  314. def test_invalid_format(self) -> None:
  315. """Test that invalid format raises ValueError."""
  316. with self.assertRaises(ValueError) as cm:
  317. get_signature_vendor(format="invalid")
  318. self.assertIn("Unsupported", str(cm.exception))
  319. def test_config_passed_to_vendor(self) -> None:
  320. """Test that config is passed to the vendor."""
  321. config = ConfigDict()
  322. config.set((b"gpg",), b"program", b"gpg2")
  323. vendor = get_signature_vendor(format="openpgp", config=config)
  324. # If CLI vendor is used, check that config was passed
  325. if isinstance(vendor, GPGCliSignatureVendor):
  326. self.assertEqual(vendor.gpg_command, "gpg2")
  327. def test_ssh_format(self) -> None:
  328. """Test requesting SSH format."""
  329. vendor = get_signature_vendor(format="ssh")
  330. # Should be either SSHSigSignatureVendor or SSHCliSignatureVendor
  331. self.assertIsInstance(vendor, (SSHSigSignatureVendor, SSHCliSignatureVendor))
  332. def test_x509_format(self) -> None:
  333. """Test requesting X.509 format."""
  334. vendor = get_signature_vendor(format="x509")
  335. self.assertIsInstance(vendor, X509SignatureVendor)
  336. class SSHSigSignatureVendorTests(unittest.TestCase):
  337. """Tests for SSHSigSignatureVendor (sshsig package implementation)."""
  338. def test_sign_not_supported(self) -> None:
  339. """Test that sign raises NotImplementedError with helpful message."""
  340. vendor = SSHSigSignatureVendor()
  341. with self.assertRaises(NotImplementedError) as cm:
  342. vendor.sign(b"test data", keyid="dummy")
  343. self.assertIn("SSHCliSignatureVendor", str(cm.exception))
  344. def test_verify_without_config_raises(self) -> None:
  345. """Test that verify without config or keyids raises ValueError."""
  346. vendor = SSHSigSignatureVendor()
  347. with self.assertRaises(ValueError) as cm:
  348. vendor.verify(b"test data", b"fake signature")
  349. self.assertIn("allowedSignersFile", str(cm.exception))
  350. def test_config_parsing(self) -> None:
  351. """Test parsing SSH config options."""
  352. config = ConfigDict()
  353. config.set((b"gpg", b"ssh"), b"allowedSignersFile", b"/path/to/allowed")
  354. config.set((b"gpg", b"ssh"), b"defaultKeyCommand", b"ssh-add -L")
  355. vendor = SSHSigSignatureVendor(config=config)
  356. self.assertEqual(vendor.allowed_signers_file, "/path/to/allowed")
  357. self.assertEqual(vendor.default_key_command, "ssh-add -L")
  358. def test_available(self) -> None:
  359. """Test that available() returns boolean."""
  360. result = SSHSigSignatureVendor.available()
  361. self.assertIsInstance(result, bool)
  362. def test_verify_with_cli_generated_signature(self) -> None:
  363. """Test verifying a signature created by SSH CLI vendor."""
  364. import os
  365. import tempfile
  366. if shutil.which("ssh-keygen") is None:
  367. self.skipTest("ssh-keygen not available")
  368. # Generate a test SSH key and signature using CLI vendor
  369. with tempfile.TemporaryDirectory() as tmpdir:
  370. private_key = os.path.join(tmpdir, "test_key")
  371. public_key = private_key + ".pub"
  372. allowed_signers = os.path.join(tmpdir, "allowed_signers")
  373. # Generate Ed25519 key
  374. subprocess.run(
  375. [
  376. "ssh-keygen",
  377. "-t",
  378. "ed25519",
  379. "-f",
  380. private_key,
  381. "-N",
  382. "",
  383. "-C",
  384. "test@example.com",
  385. ],
  386. capture_output=True,
  387. check=True,
  388. )
  389. # Create allowed_signers file
  390. with open(public_key) as pub:
  391. pub_key_content = pub.read().strip()
  392. with open(allowed_signers, "w") as allowed:
  393. allowed.write(f"* {pub_key_content}\n")
  394. # Sign with CLI vendor
  395. cli_config = ConfigDict()
  396. cli_config.set(
  397. (b"gpg", b"ssh"), b"allowedSignersFile", allowed_signers.encode()
  398. )
  399. cli_vendor = SSHCliSignatureVendor(config=cli_config)
  400. test_data = b"test data for sshsig verification"
  401. signature = cli_vendor.sign(test_data, keyid=private_key)
  402. # Verify with sshsig package vendor
  403. pkg_config = ConfigDict()
  404. pkg_config.set(
  405. (b"gpg", b"ssh"), b"allowedSignersFile", allowed_signers.encode()
  406. )
  407. pkg_vendor = SSHSigSignatureVendor(config=pkg_config)
  408. # This should succeed
  409. pkg_vendor.verify(test_data, signature)
  410. class SSHCliSignatureVendorTests(unittest.TestCase):
  411. """Tests for SSHCliSignatureVendor."""
  412. def setUp(self) -> None:
  413. """Check if ssh-keygen is available."""
  414. if shutil.which("ssh-keygen") is None:
  415. self.skipTest("ssh-keygen command not available")
  416. def test_ssh_program_from_config(self) -> None:
  417. """Test reading gpg.ssh.program from config."""
  418. config = ConfigDict()
  419. config.set((b"gpg", b"ssh"), b"program", b"/usr/bin/ssh-keygen")
  420. vendor = SSHCliSignatureVendor(config=config)
  421. self.assertEqual(vendor.ssh_command, "/usr/bin/ssh-keygen")
  422. def test_ssh_program_override(self) -> None:
  423. """Test that ssh_command parameter overrides config."""
  424. config = ConfigDict()
  425. config.set((b"gpg", b"ssh"), b"program", b"/usr/bin/ssh-keygen")
  426. vendor = SSHCliSignatureVendor(config=config, ssh_command="ssh-keygen")
  427. self.assertEqual(vendor.ssh_command, "ssh-keygen")
  428. def test_ssh_program_default(self) -> None:
  429. """Test default ssh-keygen command when no config provided."""
  430. vendor = SSHCliSignatureVendor()
  431. self.assertEqual(vendor.ssh_command, "ssh-keygen")
  432. def test_allowed_signers_from_config(self) -> None:
  433. """Test reading gpg.ssh.allowedSignersFile from config."""
  434. config = ConfigDict()
  435. config.set((b"gpg", b"ssh"), b"allowedSignersFile", b"/tmp/allowed_signers")
  436. vendor = SSHCliSignatureVendor(config=config)
  437. self.assertEqual(vendor.allowed_signers_file, "/tmp/allowed_signers")
  438. def test_sign_without_key_raises(self) -> None:
  439. """Test that signing without a key raises ValueError."""
  440. vendor = SSHCliSignatureVendor()
  441. with self.assertRaises(ValueError) as cm:
  442. vendor.sign(b"test data")
  443. self.assertIn("key", str(cm.exception).lower())
  444. def test_verify_without_allowed_signers_raises(self) -> None:
  445. """Test that verify without allowedSignersFile raises ValueError."""
  446. vendor = SSHCliSignatureVendor()
  447. with self.assertRaises(ValueError) as cm:
  448. vendor.verify(b"test data", b"fake signature")
  449. self.assertIn("allowedSignersFile", str(cm.exception))
  450. def test_sign_and_verify_with_ssh_key(self) -> None:
  451. """Test sign and verify cycle with SSH key."""
  452. import os
  453. import tempfile
  454. # Generate a test SSH key
  455. with tempfile.TemporaryDirectory() as tmpdir:
  456. private_key = os.path.join(tmpdir, "test_key")
  457. public_key = private_key + ".pub"
  458. allowed_signers = os.path.join(tmpdir, "allowed_signers")
  459. # Generate Ed25519 key (no passphrase)
  460. subprocess.run(
  461. [
  462. "ssh-keygen",
  463. "-t",
  464. "ed25519",
  465. "-f",
  466. private_key,
  467. "-N",
  468. "",
  469. "-C",
  470. "test@example.com",
  471. ],
  472. capture_output=True,
  473. check=True,
  474. )
  475. # Create allowed_signers file
  476. with open(public_key) as pub:
  477. pub_key_content = pub.read().strip()
  478. with open(allowed_signers, "w") as allowed:
  479. allowed.write(f"git {pub_key_content}\n")
  480. # Create vendor with config
  481. config = ConfigDict()
  482. config.set(
  483. (b"gpg", b"ssh"), b"allowedSignersFile", allowed_signers.encode()
  484. )
  485. vendor = SSHCliSignatureVendor(config=config)
  486. # Test signing and verification
  487. test_data = b"test data to sign with SSH"
  488. signature = vendor.sign(test_data, keyid=private_key)
  489. self.assertIsInstance(signature, bytes)
  490. self.assertGreater(len(signature), 0)
  491. self.assertTrue(signature.startswith(b"-----BEGIN SSH SIGNATURE-----"))
  492. # Verify the signature
  493. vendor.verify(test_data, signature)
  494. def test_default_key_command(self) -> None:
  495. """Test gpg.ssh.defaultKeyCommand support."""
  496. import os
  497. import tempfile
  498. # Generate a test SSH key
  499. with tempfile.TemporaryDirectory() as tmpdir:
  500. private_key = os.path.join(tmpdir, "test_key")
  501. # Generate Ed25519 key (no passphrase)
  502. subprocess.run(
  503. [
  504. "ssh-keygen",
  505. "-t",
  506. "ed25519",
  507. "-f",
  508. private_key,
  509. "-N",
  510. "",
  511. "-C",
  512. "test@example.com",
  513. ],
  514. capture_output=True,
  515. check=True,
  516. )
  517. # Create config with defaultKeyCommand that echoes the key path
  518. config = ConfigDict()
  519. config.set(
  520. (b"gpg", b"ssh"), b"defaultKeyCommand", f"echo {private_key}".encode()
  521. )
  522. vendor = SSHCliSignatureVendor(config=config)
  523. test_data = b"test data"
  524. # Sign without providing keyid - should use defaultKeyCommand
  525. signature = vendor.sign(test_data)
  526. self.assertIsInstance(signature, bytes)
  527. self.assertGreater(len(signature), 0)
  528. def test_revocation_file_config(self) -> None:
  529. """Test that revocation file is read from config."""
  530. config = ConfigDict()
  531. config.set((b"gpg", b"ssh"), b"revocationFile", b"/path/to/revoked_keys")
  532. vendor = SSHCliSignatureVendor(config=config)
  533. self.assertEqual(vendor.revocation_file, "/path/to/revoked_keys")
  534. def test_available(self) -> None:
  535. """Test that available() returns boolean."""
  536. result = SSHCliSignatureVendor.available()
  537. self.assertIsInstance(result, bool)
  538. class DetectSignatureFormatTests(unittest.TestCase):
  539. """Tests for detect_signature_format function."""
  540. def test_detect_ssh_signature(self) -> None:
  541. """Test detecting SSH signature format."""
  542. ssh_sig = b"-----BEGIN SSH SIGNATURE-----\nfoo\n-----END SSH SIGNATURE-----"
  543. self.assertEqual(detect_signature_format(ssh_sig), SIGNATURE_FORMAT_SSH)
  544. def test_detect_pgp_signature(self) -> None:
  545. """Test detecting PGP signature format."""
  546. pgp_sig = b"-----BEGIN PGP SIGNATURE-----\nfoo\n-----END PGP SIGNATURE-----"
  547. self.assertEqual(detect_signature_format(pgp_sig), SIGNATURE_FORMAT_OPENPGP)
  548. def test_detect_x509_signature_pkcs7(self) -> None:
  549. """Test detecting X.509 PKCS7 signature format."""
  550. x509_sig = b"-----BEGIN PKCS7-----\nfoo\n-----END PKCS7-----"
  551. self.assertEqual(detect_signature_format(x509_sig), SIGNATURE_FORMAT_X509)
  552. def test_detect_x509_signature_signed_message(self) -> None:
  553. """Test detecting X.509 signed message format."""
  554. x509_sig = b"-----BEGIN SIGNED MESSAGE-----\nfoo\n-----END SIGNED MESSAGE-----"
  555. self.assertEqual(detect_signature_format(x509_sig), SIGNATURE_FORMAT_X509)
  556. def test_unknown_signature_format(self) -> None:
  557. """Test that unknown format raises ValueError."""
  558. with self.assertRaises(ValueError) as cm:
  559. detect_signature_format(b"not a signature")
  560. self.assertIn("Unable to detect", str(cm.exception))
  561. class GetSignatureVendorForSignatureTests(unittest.TestCase):
  562. """Tests for get_signature_vendor_for_signature function."""
  563. def test_get_vendor_for_ssh_signature(self) -> None:
  564. """Test getting vendor for SSH signature."""
  565. ssh_sig = b"-----BEGIN SSH SIGNATURE-----\nfoo\n-----END SSH SIGNATURE-----"
  566. vendor = get_signature_vendor_for_signature(ssh_sig)
  567. self.assertIsInstance(vendor, (SSHSigSignatureVendor, SSHCliSignatureVendor))
  568. def test_get_vendor_for_pgp_signature(self) -> None:
  569. """Test getting vendor for PGP signature."""
  570. pgp_sig = b"-----BEGIN PGP SIGNATURE-----\nfoo\n-----END PGP SIGNATURE-----"
  571. vendor = get_signature_vendor_for_signature(pgp_sig)
  572. self.assertIsInstance(vendor, (GPGSignatureVendor, GPGCliSignatureVendor))
  573. def test_get_vendor_for_x509_signature(self) -> None:
  574. """Test getting vendor for X.509 signature."""
  575. x509_sig = b"-----BEGIN PKCS7-----\nfoo\n-----END PKCS7-----"
  576. vendor = get_signature_vendor_for_signature(x509_sig)
  577. self.assertIsInstance(vendor, X509SignatureVendor)
  578. def test_get_vendor_with_config(self) -> None:
  579. """Test that config is passed to vendor."""
  580. config = ConfigDict()
  581. config.set((b"gpg",), b"program", b"gpg2")
  582. pgp_sig = b"-----BEGIN PGP SIGNATURE-----\nfoo\n-----END PGP SIGNATURE-----"
  583. vendor = get_signature_vendor_for_signature(pgp_sig, config=config)
  584. # If CLI vendor is used, check config was passed
  585. if isinstance(vendor, GPGCliSignatureVendor):
  586. self.assertEqual(vendor.gpg_command, "gpg2")