test_varint.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. """Tests for variable-width integer encoding/decoding."""
  2. import unittest
  3. from io import BytesIO
  4. from dulwich.varint import (
  5. decode_varint,
  6. decode_varint_from_stream,
  7. encode_varint,
  8. )
  9. class TestVarint(unittest.TestCase):
  10. """Test variable-width integer encoding and decoding."""
  11. def test_encode_decode_basic(self):
  12. """Test basic varint encoding/decoding."""
  13. test_values = [0, 1, 127, 128, 255, 256, 16383, 16384, 65535, 65536]
  14. for value in test_values:
  15. encoded = encode_varint(value)
  16. decoded, new_offset = decode_varint(encoded, 0)
  17. self.assertEqual(value, decoded, f"Failed for value {value}")
  18. self.assertEqual(
  19. len(encoded), new_offset, f"Offset mismatch for value {value}"
  20. )
  21. def test_encode_decode_stream(self):
  22. """Test varint encoding/decoding with streams."""
  23. test_values = [0, 1, 127, 128, 255, 256, 16383, 16384, 65535, 65536]
  24. for value in test_values:
  25. encoded = encode_varint(value)
  26. stream = BytesIO(encoded)
  27. decoded = decode_varint_from_stream(stream)
  28. self.assertEqual(
  29. value, decoded, f"Failed for stream decode of value {value}"
  30. )
  31. def test_multiple_varints(self):
  32. """Test encoding/decoding multiple varints in sequence."""
  33. values = [42, 127, 128, 1000, 0]
  34. # Encode all values
  35. encoded_data = b""
  36. for value in values:
  37. encoded_data += encode_varint(value)
  38. # Decode all values using byte offset
  39. offset = 0
  40. decoded_values = []
  41. for _ in values:
  42. decoded, offset = decode_varint(encoded_data, offset)
  43. decoded_values.append(decoded)
  44. self.assertEqual(values, decoded_values)
  45. # Decode all values using stream
  46. stream = BytesIO(encoded_data)
  47. decoded_values_stream = []
  48. for _ in values:
  49. decoded = decode_varint_from_stream(stream)
  50. self.assertIsNotNone(decoded)
  51. decoded_values_stream.append(decoded)
  52. self.assertEqual(values, decoded_values_stream)
  53. def test_stream_end_of_data(self):
  54. """Test that stream decode returns None at end of data."""
  55. stream = BytesIO(b"")
  56. result = decode_varint_from_stream(stream)
  57. self.assertIsNone(result)
  58. # Test with some data followed by end
  59. stream = BytesIO(encode_varint(42))
  60. result1 = decode_varint_from_stream(stream)
  61. self.assertEqual(42, result1)
  62. result2 = decode_varint_from_stream(stream)
  63. self.assertIsNone(result2)
  64. def test_specific_encoding_values(self):
  65. """Test specific encoding patterns."""
  66. # Single byte values (0-127)
  67. for i in range(128):
  68. encoded = encode_varint(i)
  69. self.assertEqual(1, len(encoded))
  70. self.assertEqual(i, encoded[0])
  71. # Two byte values (128-16383)
  72. encoded_128 = encode_varint(128)
  73. self.assertEqual(b"\x80\x01", encoded_128)
  74. encoded_255 = encode_varint(255)
  75. self.assertEqual(b"\xff\x01", encoded_255)
  76. encoded_256 = encode_varint(256)
  77. self.assertEqual(b"\x80\x02", encoded_256)
  78. def test_large_values(self):
  79. """Test encoding/decoding of large values."""
  80. large_values = [
  81. (1 << 7) - 1, # 127
  82. (1 << 7), # 128
  83. (1 << 14) - 1, # 16383
  84. (1 << 14), # 16384
  85. (1 << 21) - 1, # 2097151
  86. (1 << 21), # 2097152
  87. (1 << 28) - 1, # 268435455
  88. (1 << 28), # 268435456
  89. ]
  90. for value in large_values:
  91. encoded = encode_varint(value)
  92. decoded, _ = decode_varint(encoded, 0)
  93. self.assertEqual(value, decoded, f"Failed for large value {value}")
  94. if __name__ == "__main__":
  95. unittest.main()