test_signature.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528
  1. # test_signature.py -- tests for signature.py
  2. # Copyright (C) 2025 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for signature vendors."""
  22. import shutil
  23. import subprocess
  24. import unittest
  25. from dulwich.config import ConfigDict
  26. from dulwich.signature import (
  27. GPGCliSignatureVendor,
  28. GPGSignatureVendor,
  29. SignatureVendor,
  30. SSHCliSignatureVendor,
  31. SSHSignatureVendor,
  32. get_signature_vendor,
  33. )
  34. try:
  35. import gpg
  36. except ImportError:
  37. gpg = None
  38. class SignatureVendorTests(unittest.TestCase):
  39. """Tests for SignatureVendor base class."""
  40. def test_sign_not_implemented(self) -> None:
  41. """Test that sign raises NotImplementedError."""
  42. vendor = SignatureVendor()
  43. with self.assertRaises(NotImplementedError):
  44. vendor.sign(b"test data")
  45. def test_verify_not_implemented(self) -> None:
  46. """Test that verify raises NotImplementedError."""
  47. vendor = SignatureVendor()
  48. with self.assertRaises(NotImplementedError):
  49. vendor.verify(b"test data", b"fake signature")
  50. @unittest.skipIf(gpg is None, "gpg not available")
  51. class GPGSignatureVendorTests(unittest.TestCase):
  52. """Tests for GPGSignatureVendor."""
  53. def test_min_trust_level_from_config(self) -> None:
  54. """Test reading gpg.minTrustLevel from config."""
  55. config = ConfigDict()
  56. config.set((b"gpg",), b"minTrustLevel", b"marginal")
  57. vendor = GPGSignatureVendor(config=config)
  58. self.assertEqual(vendor.min_trust_level, "marginal")
  59. def test_min_trust_level_default(self) -> None:
  60. """Test default when gpg.minTrustLevel not in config."""
  61. vendor = GPGSignatureVendor()
  62. self.assertIsNone(vendor.min_trust_level)
  63. def test_sign_and_verify(self) -> None:
  64. """Test basic sign and verify cycle.
  65. Note: This test requires a GPG key to be configured in the test
  66. environment. It may be skipped in environments without GPG setup.
  67. """
  68. vendor = GPGSignatureVendor()
  69. test_data = b"test data to sign"
  70. try:
  71. # Sign the data
  72. signature = vendor.sign(test_data)
  73. self.assertIsInstance(signature, bytes)
  74. self.assertGreater(len(signature), 0)
  75. # Verify the signature
  76. vendor.verify(test_data, signature)
  77. except gpg.errors.GPGMEError as e:
  78. # Skip test if no GPG key is available
  79. self.skipTest(f"GPG key not available: {e}")
  80. def test_verify_invalid_signature(self) -> None:
  81. """Test that verify raises an error for invalid signatures."""
  82. vendor = GPGSignatureVendor()
  83. test_data = b"test data"
  84. invalid_signature = b"this is not a valid signature"
  85. with self.assertRaises(gpg.errors.GPGMEError):
  86. vendor.verify(test_data, invalid_signature)
  87. def test_sign_with_keyid(self) -> None:
  88. """Test signing with a specific key ID.
  89. Note: This test requires a GPG key to be configured in the test
  90. environment. It may be skipped in environments without GPG setup.
  91. """
  92. vendor = GPGSignatureVendor()
  93. test_data = b"test data to sign"
  94. try:
  95. # Try to get a key from the keyring
  96. with gpg.Context() as ctx:
  97. keys = list(ctx.keylist(secret=True))
  98. if not keys:
  99. self.skipTest("No GPG keys available for testing")
  100. key = keys[0]
  101. signature = vendor.sign(test_data, keyid=key.fpr)
  102. self.assertIsInstance(signature, bytes)
  103. self.assertGreater(len(signature), 0)
  104. # Verify the signature
  105. vendor.verify(test_data, signature)
  106. except gpg.errors.GPGMEError as e:
  107. self.skipTest(f"GPG key not available: {e}")
  108. class GPGCliSignatureVendorTests(unittest.TestCase):
  109. """Tests for GPGCliSignatureVendor."""
  110. def setUp(self) -> None:
  111. """Check if gpg command is available."""
  112. if shutil.which("gpg") is None:
  113. self.skipTest("gpg command not available")
  114. def test_sign_and_verify(self) -> None:
  115. """Test basic sign and verify cycle using CLI."""
  116. vendor = GPGCliSignatureVendor()
  117. test_data = b"test data to sign"
  118. try:
  119. # Sign the data
  120. signature = vendor.sign(test_data)
  121. self.assertIsInstance(signature, bytes)
  122. self.assertGreater(len(signature), 0)
  123. self.assertTrue(signature.startswith(b"-----BEGIN PGP SIGNATURE-----"))
  124. # Verify the signature
  125. vendor.verify(test_data, signature)
  126. except subprocess.CalledProcessError as e:
  127. # Skip test if no GPG key is available or configured
  128. self.skipTest(f"GPG signing failed: {e}")
  129. def test_verify_invalid_signature(self) -> None:
  130. """Test that verify raises an error for invalid signatures."""
  131. vendor = GPGCliSignatureVendor()
  132. test_data = b"test data"
  133. invalid_signature = b"this is not a valid signature"
  134. with self.assertRaises(subprocess.CalledProcessError):
  135. vendor.verify(test_data, invalid_signature)
  136. def test_sign_with_keyid(self) -> None:
  137. """Test signing with a specific key ID using CLI."""
  138. vendor = GPGCliSignatureVendor()
  139. test_data = b"test data to sign"
  140. try:
  141. # Try to get a key from the keyring
  142. result = subprocess.run(
  143. ["gpg", "--list-secret-keys", "--with-colons"],
  144. capture_output=True,
  145. check=True,
  146. text=True,
  147. )
  148. # Parse output to find a key fingerprint
  149. keyid = None
  150. for line in result.stdout.split("\n"):
  151. if line.startswith("fpr:"):
  152. keyid = line.split(":")[9]
  153. break
  154. if not keyid:
  155. self.skipTest("No GPG keys available for testing")
  156. signature = vendor.sign(test_data, keyid=keyid)
  157. self.assertIsInstance(signature, bytes)
  158. self.assertGreater(len(signature), 0)
  159. # Verify the signature
  160. vendor.verify(test_data, signature)
  161. except subprocess.CalledProcessError as e:
  162. self.skipTest(f"GPG key not available: {e}")
  163. def test_verify_with_keyids(self) -> None:
  164. """Test verifying with specific trusted key IDs."""
  165. vendor = GPGCliSignatureVendor()
  166. test_data = b"test data to sign"
  167. try:
  168. # Sign without specifying a key (use default)
  169. signature = vendor.sign(test_data)
  170. # Get the primary key fingerprint from the keyring
  171. result = subprocess.run(
  172. ["gpg", "--list-secret-keys", "--with-colons"],
  173. capture_output=True,
  174. check=True,
  175. text=True,
  176. )
  177. primary_keyid = None
  178. for line in result.stdout.split("\n"):
  179. if line.startswith("fpr:"):
  180. primary_keyid = line.split(":")[9]
  181. break
  182. if not primary_keyid:
  183. self.skipTest("No GPG keys available for testing")
  184. # Verify with the correct primary keyid - should succeed
  185. # (GPG shows primary key fingerprint even if signed by subkey)
  186. vendor.verify(test_data, signature, keyids=[primary_keyid])
  187. # Verify with a different keyid - should fail
  188. fake_keyid = "0" * 40 # Fake 40-character fingerprint
  189. with self.assertRaises(ValueError):
  190. vendor.verify(test_data, signature, keyids=[fake_keyid])
  191. except subprocess.CalledProcessError as e:
  192. self.skipTest(f"GPG key not available: {e}")
  193. def test_custom_gpg_command(self) -> None:
  194. """Test using a custom GPG command path."""
  195. vendor = GPGCliSignatureVendor(gpg_command="gpg")
  196. test_data = b"test data"
  197. try:
  198. signature = vendor.sign(test_data)
  199. self.assertIsInstance(signature, bytes)
  200. except subprocess.CalledProcessError as e:
  201. self.skipTest(f"GPG not available: {e}")
  202. def test_gpg_program_from_config(self) -> None:
  203. """Test reading gpg.program from config."""
  204. # Create a config with gpg.program set
  205. config = ConfigDict()
  206. config.set((b"gpg",), b"program", b"gpg2")
  207. vendor = GPGCliSignatureVendor(config=config)
  208. self.assertEqual(vendor.gpg_command, "gpg2")
  209. def test_gpg_program_override(self) -> None:
  210. """Test that gpg_command parameter overrides config."""
  211. config = ConfigDict()
  212. config.set((b"gpg",), b"program", b"gpg2")
  213. vendor = GPGCliSignatureVendor(config=config, gpg_command="gpg")
  214. self.assertEqual(vendor.gpg_command, "gpg")
  215. def test_gpg_program_default(self) -> None:
  216. """Test default gpg command when no config provided."""
  217. vendor = GPGCliSignatureVendor()
  218. self.assertEqual(vendor.gpg_command, "gpg")
  219. def test_gpg_program_default_when_not_in_config(self) -> None:
  220. """Test default gpg command when config doesn't have gpg.program."""
  221. config = ConfigDict()
  222. vendor = GPGCliSignatureVendor(config=config)
  223. self.assertEqual(vendor.gpg_command, "gpg")
  224. class GetSignatureVendorTests(unittest.TestCase):
  225. """Tests for get_signature_vendor function."""
  226. def test_default_format(self) -> None:
  227. """Test that default format is openpgp."""
  228. vendor = get_signature_vendor()
  229. self.assertIsInstance(vendor, (GPGSignatureVendor, GPGCliSignatureVendor))
  230. def test_explicit_openpgp_format(self) -> None:
  231. """Test explicitly requesting openpgp format."""
  232. vendor = get_signature_vendor(format="openpgp")
  233. self.assertIsInstance(vendor, (GPGSignatureVendor, GPGCliSignatureVendor))
  234. def test_format_from_config(self) -> None:
  235. """Test reading format from config."""
  236. config = ConfigDict()
  237. config.set((b"gpg",), b"format", b"openpgp")
  238. vendor = get_signature_vendor(config=config)
  239. self.assertIsInstance(vendor, (GPGSignatureVendor, GPGCliSignatureVendor))
  240. def test_format_case_insensitive(self) -> None:
  241. """Test that format is case-insensitive."""
  242. vendor = get_signature_vendor(format="OpenPGP")
  243. self.assertIsInstance(vendor, (GPGSignatureVendor, GPGCliSignatureVendor))
  244. def test_x509_not_supported(self) -> None:
  245. """Test that x509 format raises ValueError."""
  246. with self.assertRaises(ValueError) as cm:
  247. get_signature_vendor(format="x509")
  248. self.assertIn("X.509", str(cm.exception))
  249. def test_ssh_format_supported(self) -> None:
  250. """Test that ssh format is now supported."""
  251. vendor = get_signature_vendor(format="ssh")
  252. # Should be either SSHSignatureVendor or SSHCliSignatureVendor
  253. self.assertIsInstance(vendor, (SSHSignatureVendor, SSHCliSignatureVendor))
  254. def test_invalid_format(self) -> None:
  255. """Test that invalid format raises ValueError."""
  256. with self.assertRaises(ValueError) as cm:
  257. get_signature_vendor(format="invalid")
  258. self.assertIn("Unsupported", str(cm.exception))
  259. def test_config_passed_to_vendor(self) -> None:
  260. """Test that config is passed to the vendor."""
  261. config = ConfigDict()
  262. config.set((b"gpg",), b"program", b"gpg2")
  263. vendor = get_signature_vendor(format="openpgp", config=config)
  264. # If CLI vendor is used, check that config was passed
  265. if isinstance(vendor, GPGCliSignatureVendor):
  266. self.assertEqual(vendor.gpg_command, "gpg2")
  267. def test_ssh_format(self) -> None:
  268. """Test requesting SSH format."""
  269. vendor = get_signature_vendor(format="ssh")
  270. # Should be either SSHSignatureVendor or SSHCliSignatureVendor
  271. self.assertIsInstance(vendor, (SSHSignatureVendor, SSHCliSignatureVendor))
  272. class SSHSignatureVendorTests(unittest.TestCase):
  273. """Tests for SSHSignatureVendor (sshsig package implementation)."""
  274. def test_sign_not_supported(self) -> None:
  275. """Test that sign raises NotImplementedError with helpful message."""
  276. vendor = SSHSignatureVendor()
  277. with self.assertRaises(NotImplementedError) as cm:
  278. vendor.sign(b"test data", keyid="dummy")
  279. self.assertIn("SSHCliSignatureVendor", str(cm.exception))
  280. def test_verify_without_config_raises(self) -> None:
  281. """Test that verify without config or keyids raises ValueError."""
  282. vendor = SSHSignatureVendor()
  283. with self.assertRaises(ValueError) as cm:
  284. vendor.verify(b"test data", b"fake signature")
  285. self.assertIn("allowedSignersFile", str(cm.exception))
  286. def test_config_parsing(self) -> None:
  287. """Test parsing SSH config options."""
  288. config = ConfigDict()
  289. config.set((b"gpg", b"ssh"), b"allowedSignersFile", b"/path/to/allowed")
  290. config.set((b"gpg", b"ssh"), b"defaultKeyCommand", b"ssh-add -L")
  291. vendor = SSHSignatureVendor(config=config)
  292. self.assertEqual(vendor.allowed_signers_file, "/path/to/allowed")
  293. self.assertEqual(vendor.default_key_command, "ssh-add -L")
  294. def test_verify_with_cli_generated_signature(self) -> None:
  295. """Test verifying a signature created by SSH CLI vendor."""
  296. import os
  297. import tempfile
  298. if shutil.which("ssh-keygen") is None:
  299. self.skipTest("ssh-keygen not available")
  300. # Generate a test SSH key and signature using CLI vendor
  301. with tempfile.TemporaryDirectory() as tmpdir:
  302. private_key = os.path.join(tmpdir, "test_key")
  303. public_key = private_key + ".pub"
  304. allowed_signers = os.path.join(tmpdir, "allowed_signers")
  305. # Generate Ed25519 key
  306. subprocess.run(
  307. [
  308. "ssh-keygen",
  309. "-t",
  310. "ed25519",
  311. "-f",
  312. private_key,
  313. "-N",
  314. "",
  315. "-C",
  316. "test@example.com",
  317. ],
  318. capture_output=True,
  319. check=True,
  320. )
  321. # Create allowed_signers file
  322. with open(public_key) as pub:
  323. pub_key_content = pub.read().strip()
  324. with open(allowed_signers, "w") as allowed:
  325. allowed.write(f"* {pub_key_content}\n")
  326. # Sign with CLI vendor
  327. cli_config = ConfigDict()
  328. cli_config.set(
  329. (b"gpg", b"ssh"), b"allowedSignersFile", allowed_signers.encode()
  330. )
  331. cli_vendor = SSHCliSignatureVendor(config=cli_config)
  332. test_data = b"test data for sshsig verification"
  333. signature = cli_vendor.sign(test_data, keyid=private_key)
  334. # Verify with sshsig package vendor
  335. pkg_config = ConfigDict()
  336. pkg_config.set(
  337. (b"gpg", b"ssh"), b"allowedSignersFile", allowed_signers.encode()
  338. )
  339. pkg_vendor = SSHSignatureVendor(config=pkg_config)
  340. # This should succeed
  341. pkg_vendor.verify(test_data, signature)
  342. class SSHCliSignatureVendorTests(unittest.TestCase):
  343. """Tests for SSHCliSignatureVendor."""
  344. def setUp(self) -> None:
  345. """Check if ssh-keygen is available."""
  346. if shutil.which("ssh-keygen") is None:
  347. self.skipTest("ssh-keygen command not available")
  348. def test_ssh_program_from_config(self) -> None:
  349. """Test reading gpg.ssh.program from config."""
  350. config = ConfigDict()
  351. config.set((b"gpg", b"ssh"), b"program", b"/usr/bin/ssh-keygen")
  352. vendor = SSHCliSignatureVendor(config=config)
  353. self.assertEqual(vendor.ssh_command, "/usr/bin/ssh-keygen")
  354. def test_ssh_program_override(self) -> None:
  355. """Test that ssh_command parameter overrides config."""
  356. config = ConfigDict()
  357. config.set((b"gpg", b"ssh"), b"program", b"/usr/bin/ssh-keygen")
  358. vendor = SSHCliSignatureVendor(config=config, ssh_command="ssh-keygen")
  359. self.assertEqual(vendor.ssh_command, "ssh-keygen")
  360. def test_ssh_program_default(self) -> None:
  361. """Test default ssh-keygen command when no config provided."""
  362. vendor = SSHCliSignatureVendor()
  363. self.assertEqual(vendor.ssh_command, "ssh-keygen")
  364. def test_allowed_signers_from_config(self) -> None:
  365. """Test reading gpg.ssh.allowedSignersFile from config."""
  366. config = ConfigDict()
  367. config.set((b"gpg", b"ssh"), b"allowedSignersFile", b"/tmp/allowed_signers")
  368. vendor = SSHCliSignatureVendor(config=config)
  369. self.assertEqual(vendor.allowed_signers_file, "/tmp/allowed_signers")
  370. def test_sign_without_key_raises(self) -> None:
  371. """Test that signing without a key raises ValueError."""
  372. vendor = SSHCliSignatureVendor()
  373. with self.assertRaises(ValueError) as cm:
  374. vendor.sign(b"test data")
  375. self.assertIn("key", str(cm.exception).lower())
  376. def test_verify_without_allowed_signers_raises(self) -> None:
  377. """Test that verify without allowedSignersFile raises ValueError."""
  378. vendor = SSHCliSignatureVendor()
  379. with self.assertRaises(ValueError) as cm:
  380. vendor.verify(b"test data", b"fake signature")
  381. self.assertIn("allowedSignersFile", str(cm.exception))
  382. def test_sign_and_verify_with_ssh_key(self) -> None:
  383. """Test sign and verify cycle with SSH key."""
  384. import os
  385. import tempfile
  386. # Generate a test SSH key
  387. with tempfile.TemporaryDirectory() as tmpdir:
  388. private_key = os.path.join(tmpdir, "test_key")
  389. public_key = private_key + ".pub"
  390. allowed_signers = os.path.join(tmpdir, "allowed_signers")
  391. # Generate Ed25519 key (no passphrase)
  392. subprocess.run(
  393. [
  394. "ssh-keygen",
  395. "-t",
  396. "ed25519",
  397. "-f",
  398. private_key,
  399. "-N",
  400. "",
  401. "-C",
  402. "test@example.com",
  403. ],
  404. capture_output=True,
  405. check=True,
  406. )
  407. # Create allowed_signers file
  408. with open(public_key) as pub:
  409. pub_key_content = pub.read().strip()
  410. with open(allowed_signers, "w") as allowed:
  411. allowed.write(f"git {pub_key_content}\n")
  412. # Create vendor with config
  413. config = ConfigDict()
  414. config.set(
  415. (b"gpg", b"ssh"), b"allowedSignersFile", allowed_signers.encode()
  416. )
  417. vendor = SSHCliSignatureVendor(config=config)
  418. # Test signing and verification
  419. test_data = b"test data to sign with SSH"
  420. signature = vendor.sign(test_data, keyid=private_key)
  421. self.assertIsInstance(signature, bytes)
  422. self.assertGreater(len(signature), 0)
  423. self.assertTrue(signature.startswith(b"-----BEGIN SSH SIGNATURE-----"))
  424. # Verify the signature
  425. vendor.verify(test_data, signature)