test_reftable.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. """Tests for the reftable refs storage format."""
  2. import tempfile
  3. import unittest
  4. from io import BytesIO
  5. from dulwich.reftable import (
  6. REF_VALUE_REF,
  7. REF_VALUE_SYMREF,
  8. RefBlock,
  9. RefRecord,
  10. ReftableReader,
  11. ReftableRefsContainer,
  12. ReftableWriter,
  13. decode_varint_from_stream,
  14. encode_varint,
  15. )
  16. class TestVarintEncoding(unittest.TestCase):
  17. """Test varint encoding/decoding."""
  18. def test_encode_decode_small(self):
  19. """Test encoding/decoding small integers."""
  20. for i in range(128):
  21. encoded = encode_varint(i)
  22. stream = BytesIO(encoded)
  23. decoded = decode_varint_from_stream(stream)
  24. self.assertEqual(i, decoded)
  25. def test_encode_decode_large(self):
  26. """Test encoding/decoding larger integers."""
  27. test_values = [127, 128, 255, 256, 16383, 16384, 65535, 65536, 1048576]
  28. for i in test_values:
  29. encoded = encode_varint(i)
  30. stream = BytesIO(encoded)
  31. decoded = decode_varint_from_stream(stream)
  32. self.assertEqual(i, decoded)
  33. class TestRefRecord(unittest.TestCase):
  34. """Test ref record encoding/decoding."""
  35. def test_encode_decode_ref(self):
  36. """Test encoding/decoding a direct ref."""
  37. sha = b"a" * 40
  38. ref = RefRecord(b"refs/heads/master", REF_VALUE_REF, sha)
  39. encoded = ref.encode()
  40. stream = BytesIO(encoded)
  41. decoded_ref, refname = RefRecord.decode(stream)
  42. self.assertEqual(ref.refname, decoded_ref.refname)
  43. self.assertEqual(ref.value_type, decoded_ref.value_type)
  44. self.assertEqual(ref.value, decoded_ref.value)
  45. self.assertEqual(b"refs/heads/master", refname)
  46. def test_encode_decode_symref(self):
  47. """Test encoding/decoding a symbolic ref."""
  48. target = b"refs/heads/main"
  49. ref = RefRecord(b"HEAD", REF_VALUE_SYMREF, target)
  50. encoded = ref.encode()
  51. stream = BytesIO(encoded)
  52. decoded_ref, refname = RefRecord.decode(stream)
  53. self.assertEqual(ref.refname, decoded_ref.refname)
  54. self.assertEqual(ref.value_type, decoded_ref.value_type)
  55. self.assertEqual(ref.value, decoded_ref.value)
  56. self.assertEqual(b"HEAD", refname)
  57. def test_prefix_compression(self):
  58. """Test prefix compression in ref encoding."""
  59. ref2 = RefRecord(b"refs/heads/main", REF_VALUE_REF, b"b" * 40)
  60. # Encode ref2 with ref1 as prefix
  61. encoded = ref2.encode(b"refs/heads/master")
  62. # Should be shorter due to common prefix
  63. encoded_no_prefix = ref2.encode()
  64. self.assertLess(len(encoded), len(encoded_no_prefix))
  65. # Decode should still work
  66. stream = BytesIO(encoded)
  67. decoded_ref, refname = RefRecord.decode(stream, b"refs/heads/master")
  68. self.assertEqual(b"refs/heads/main", refname)
  69. class TestRefBlock(unittest.TestCase):
  70. """Test ref block encoding/decoding."""
  71. def test_encode_decode_block(self):
  72. """Test encoding/decoding a ref block."""
  73. block = RefBlock()
  74. block.add_ref(b"refs/heads/master", REF_VALUE_REF, b"a" * 40)
  75. block.add_ref(b"refs/heads/main", REF_VALUE_REF, b"b" * 40)
  76. block.add_ref(b"HEAD", REF_VALUE_SYMREF, b"refs/heads/master")
  77. encoded = block.encode()
  78. decoded_block = RefBlock.decode(encoded)
  79. self.assertEqual(len(block.refs), len(decoded_block.refs))
  80. # Check that refs are sorted
  81. ref_names = [ref.refname for ref in decoded_block.refs]
  82. self.assertEqual(ref_names, sorted(ref_names))
  83. class TestReftableIO(unittest.TestCase):
  84. """Test reftable file I/O."""
  85. def test_write_read_reftable(self):
  86. """Test writing and reading a reftable file."""
  87. with tempfile.NamedTemporaryFile() as f:
  88. # Write reftable
  89. writer = ReftableWriter(f)
  90. writer.add_ref(b"refs/heads/master", b"a" * 40)
  91. writer.add_ref(b"refs/heads/main", b"b" * 40)
  92. writer.add_symbolic_ref(b"HEAD", b"refs/heads/master")
  93. writer.write()
  94. # Read reftable
  95. f.seek(0)
  96. reader = ReftableReader(f)
  97. # Check refs
  98. master_ref = reader.get_ref(b"refs/heads/master")
  99. self.assertIsNotNone(master_ref)
  100. self.assertEqual(master_ref[0], REF_VALUE_REF)
  101. self.assertEqual(master_ref[1], b"a" * 40)
  102. main_ref = reader.get_ref(b"refs/heads/main")
  103. self.assertIsNotNone(main_ref)
  104. self.assertEqual(main_ref[0], REF_VALUE_REF)
  105. self.assertEqual(main_ref[1], b"b" * 40)
  106. head_ref = reader.get_ref(b"HEAD")
  107. self.assertIsNotNone(head_ref)
  108. self.assertEqual(head_ref[0], REF_VALUE_SYMREF)
  109. self.assertEqual(head_ref[1], b"refs/heads/master")
  110. class TestReftableRefsContainer(unittest.TestCase):
  111. """Test ReftableRefsContainer functionality."""
  112. def setUp(self):
  113. """Set up test environment."""
  114. self.test_dir = tempfile.mkdtemp()
  115. self.container = ReftableRefsContainer(self.test_dir)
  116. def tearDown(self):
  117. """Clean up test environment."""
  118. import shutil
  119. shutil.rmtree(self.test_dir)
  120. def test_empty_container(self):
  121. """Test operations on empty container."""
  122. self.assertEqual(set(), self.container.allkeys())
  123. with self.assertRaises(KeyError):
  124. self.container.read_loose_ref(b"refs/heads/master")
  125. def test_add_ref(self):
  126. """Test adding a reference."""
  127. sha = b"a" * 40
  128. success = self.container.add_if_new(b"refs/heads/master", sha)
  129. self.assertTrue(success)
  130. # Check ref exists
  131. self.assertIn(b"refs/heads/master", self.container.allkeys())
  132. self.assertEqual(sha, self.container.read_loose_ref(b"refs/heads/master"))
  133. # Adding again should fail
  134. success = self.container.add_if_new(b"refs/heads/master", b"b" * 40)
  135. self.assertFalse(success)
  136. def test_set_ref(self):
  137. """Test setting a reference."""
  138. sha1 = b"a" * 40
  139. sha2 = b"b" * 40
  140. # Set initial ref
  141. success = self.container.set_if_equals(b"refs/heads/master", None, sha1)
  142. self.assertTrue(success)
  143. # Update ref
  144. success = self.container.set_if_equals(b"refs/heads/master", sha1, sha2)
  145. self.assertTrue(success)
  146. self.assertEqual(sha2, self.container.read_loose_ref(b"refs/heads/master"))
  147. # Try to update with wrong old value
  148. success = self.container.set_if_equals(b"refs/heads/master", sha1, b"c" * 40)
  149. self.assertFalse(success)
  150. self.assertEqual(sha2, self.container.read_loose_ref(b"refs/heads/master"))
  151. def test_remove_ref(self):
  152. """Test removing a reference."""
  153. sha = b"a" * 40
  154. # Add ref
  155. self.container.add_if_new(b"refs/heads/master", sha)
  156. self.assertIn(b"refs/heads/master", self.container.allkeys())
  157. # Remove ref
  158. success = self.container.remove_if_equals(b"refs/heads/master", sha)
  159. self.assertTrue(success)
  160. self.assertNotIn(b"refs/heads/master", self.container.allkeys())
  161. # Try to remove again
  162. success = self.container.remove_if_equals(b"refs/heads/master", sha)
  163. self.assertFalse(success)
  164. def test_symbolic_ref(self):
  165. """Test symbolic references."""
  166. # Add target ref
  167. sha = b"a" * 40
  168. self.container.add_if_new(b"refs/heads/master", sha)
  169. # Add symbolic ref
  170. self.container.set_symbolic_ref(b"HEAD", b"refs/heads/master")
  171. # Check symbolic ref is stored correctly
  172. self.assertEqual(
  173. b"ref: refs/heads/master", self.container.read_loose_ref(b"HEAD")
  174. )
  175. self.assertIn(b"HEAD", self.container.allkeys())
  176. # Check that following the symref works
  177. self.assertEqual(sha, self.container[b"HEAD"])
  178. def test_packed_refs(self):
  179. """Test packed refs functionality."""
  180. refs = {
  181. b"refs/heads/master": b"a" * 40,
  182. b"refs/heads/main": b"b" * 40,
  183. b"refs/tags/v1.0": b"c" * 40,
  184. }
  185. # Add packed refs
  186. self.container.add_packed_refs(refs)
  187. # Check refs exist
  188. for refname, sha in refs.items():
  189. self.assertIn(refname, self.container.allkeys())
  190. self.assertEqual(sha, self.container.read_loose_ref(refname))
  191. # Get packed refs
  192. packed = self.container.get_packed_refs()
  193. for refname, sha in refs.items():
  194. self.assertEqual(sha, packed[refname])
  195. def test_multiple_table_files(self):
  196. """Test with multiple reftable files."""
  197. # Add some refs
  198. self.container.add_if_new(b"refs/heads/master", b"a" * 40)
  199. self.container.add_if_new(b"refs/heads/main", b"b" * 40)
  200. # Update a ref (creates new table file)
  201. self.container.set_if_equals(b"refs/heads/master", b"a" * 40, b"c" * 40)
  202. # Check updated ref
  203. self.assertEqual(b"c" * 40, self.container.read_loose_ref(b"refs/heads/master"))
  204. self.assertEqual(b"b" * 40, self.container.read_loose_ref(b"refs/heads/main"))
  205. # Should have multiple table files
  206. table_files = self.container._get_table_files()
  207. self.assertGreaterEqual(len(table_files), 2)
  208. if __name__ == "__main__":
  209. unittest.main()