fuzz_bundle.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. import sys
  2. from io import BytesIO
  3. import atheris
  4. with atheris.instrument_imports():
  5. # We instrument `test_utils` as well, so it doesn't block coverage analysis in Fuzz Introspector:
  6. from test_utils import EnhancedFuzzedDataProvider, is_expected_exception
  7. from dulwich.bundle import Bundle, read_bundle, write_bundle
  8. from dulwich.pack import PackData, write_pack_objects
  9. def TestOneInput(data):
  10. fdp = EnhancedFuzzedDataProvider(data)
  11. bundle = Bundle()
  12. bundle.version = fdp.PickValueInList([2, 3])
  13. bundle.references = {fdp.ConsumeRandomString(): fdp.ConsumeBytes(20)}
  14. bundle.prerequisites = [(fdp.ConsumeBytes(20), fdp.ConsumeRandomBytes())]
  15. bundle.capabilities = {
  16. fdp.ConsumeRandomString(): fdp.ConsumeRandomString(),
  17. }
  18. b = BytesIO()
  19. write_pack_objects(b.write, [])
  20. b.seek(0)
  21. bundle.pack_data = PackData.from_file(b)
  22. # Test __repr__ method
  23. _ = repr(bundle)
  24. try:
  25. bundle_file = BytesIO()
  26. write_bundle(bundle_file, bundle)
  27. except (AttributeError, UnicodeEncodeError) as e:
  28. expected_exceptions = [
  29. "'bytes' object has no attribute 'encode'",
  30. "surrogates not allowed",
  31. ]
  32. if is_expected_exception(expected_exceptions, e):
  33. return
  34. else:
  35. raise e
  36. bundle_file.seek(0)
  37. _ = read_bundle(bundle_file)
  38. # Test __eq__ method
  39. # Create a different bundle for inequality testing _after_ read/write tests.
  40. # The read/write tests may have consumed all the `data` via the `fdp` "Consume" methods, so we build the second
  41. # bundle _after_ so those tests can execute even before the fuzzing engine begins providing large enough inputs to
  42. # populate the second bundle's fields.
  43. other_bundle = Bundle()
  44. other_bundle.version = bundle.version
  45. other_bundle.references = {fdp.ConsumeRandomString(): fdp.ConsumeBytes(20)}
  46. other_bundle.prerequisites = [(fdp.ConsumeBytes(20), fdp.ConsumeRandomBytes())]
  47. other_bundle.capabilities = {
  48. fdp.ConsumeRandomString(): fdp.ConsumeRandomString(),
  49. }
  50. b2 = BytesIO()
  51. write_pack_objects(b2.write, [])
  52. b2.seek(0)
  53. other_bundle.pack_data = PackData.from_file(b2)
  54. _ = bundle != other_bundle
  55. def main():
  56. atheris.Setup(sys.argv, TestOneInput)
  57. atheris.Fuzz()
  58. if __name__ == "__main__":
  59. main()