浏览代码

Add fuzz target & `EnhancedFuzzedDataProvider` test utility

`EnhancedFuzzedDataProvider` extends `atheris.FuzzedDataProvider` to
abstract some common usecases into DRY method calls.

`fuzz_bundle.py` tests the `Bundle` related functionality using fuzzer
provided data. This test is based on the unit test of the same
functionality (i.e., `test_bundle.py`.
David Lakin 10 月之前
父节点
当前提交
d37edaa791
共有 3 个文件被更改,包括 126 次插入0 次删除
  1. 2 0
      fuzzing/dictionaries/fuzz_bundle.dict
  2. 72 0
      fuzzing/fuzz-targets/fuzz_bundle.py
  3. 52 0
      fuzzing/fuzz-targets/test_utils.py

+ 2 - 0
fuzzing/dictionaries/fuzz_bundle.dict

@@ -0,0 +1,2 @@
+"# v2 git bundle"
+"# v3 git bundle"

+ 72 - 0
fuzzing/fuzz-targets/fuzz_bundle.py

@@ -0,0 +1,72 @@
+import sys
+from io import BytesIO
+
+import atheris
+
+with atheris.instrument_imports():
+    # We instrument `test_utils` as well, so it doesn't block coverage analysis in Fuzz Introspector:
+    from test_utils import EnhancedFuzzedDataProvider, is_expected_exception
+    from dulwich.bundle import Bundle, read_bundle, write_bundle
+    from dulwich.pack import PackData, write_pack_objects
+
+
+def TestOneInput(data):
+    fdp = EnhancedFuzzedDataProvider(data)
+    bundle = Bundle()
+    bundle.version = fdp.PickValueInList([2, 3])
+    bundle.references = {fdp.ConsumeRandomString(): fdp.ConsumeBytes(20)}
+    bundle.prerequisites = [(fdp.ConsumeBytes(20), fdp.ConsumeRandomBytes())]
+    bundle.capabilities = {
+        fdp.ConsumeRandomString(): fdp.ConsumeRandomString(),
+    }
+
+    b = BytesIO()
+    write_pack_objects(b.write, [])
+    b.seek(0)
+    bundle.pack_data = PackData.from_file(b)
+
+    # Test __repr__ method
+    _ = repr(bundle)
+
+    try:
+        bundle_file = BytesIO()
+        write_bundle(bundle_file, bundle)
+    except (AttributeError, UnicodeEncodeError) as e:
+        expected_exceptions = [
+            "'bytes' object has no attribute 'encode'",
+            "surrogates not allowed",
+        ]
+        if is_expected_exception(expected_exceptions, e):
+            return
+        else:
+            raise e
+
+    bundle_file.seek(0)
+    _ = read_bundle(bundle_file)
+
+    # Test __eq__ method
+    # Create a different bundle for inequality testing _after_ read/write tests.
+    # The read/write tests may have consumed all the `data` via the `fdp` "Consume" methods, so we build the second
+    # bundle _after_ so those tests can execute even before the fuzzing engine begins providing large enough inputs to
+    # populate the second bundle's fields.
+    other_bundle = Bundle()
+    other_bundle.version = bundle.version
+    other_bundle.references = {fdp.ConsumeRandomString(): fdp.ConsumeBytes(20)}
+    other_bundle.prerequisites = [(fdp.ConsumeBytes(20), fdp.ConsumeRandomBytes())]
+    other_bundle.capabilities = {
+        fdp.ConsumeRandomString(): fdp.ConsumeRandomString(),
+    }
+    b2 = BytesIO()
+    write_pack_objects(b2.write, [])
+    b2.seek(0)
+    other_bundle.pack_data = PackData.from_file(b2)
+    _ = bundle != other_bundle
+
+
+def main():
+    atheris.Setup(sys.argv, TestOneInput)
+    atheris.Fuzz()
+
+
+if __name__ == "__main__":
+    main()

+ 52 - 0
fuzzing/fuzz-targets/test_utils.py

@@ -18,3 +18,55 @@ def is_expected_exception(
         if error in str(exception):
             return True
     return False
+
+
+class EnhancedFuzzedDataProvider(atheris.FuzzedDataProvider):  # pragma: no cover
+    """Extends atheris.FuzzedDataProvider to offer additional methods to make fuzz testing slightly more DRY."""
+
+    def __init__(self, data):
+        """Initializes the EnhancedFuzzedDataProvider with fuzzing data from the argument provided to TestOneInput.
+
+        Args:
+            data (bytes): The binary data used for fuzzing.
+        """
+        super().__init__(data)
+
+    def ConsumeRemainingBytes(self) -> bytes:
+        """Consume the remaining bytes in the bytes container.
+
+        Returns:
+          bytes: Zero or more bytes.
+        """
+        return self.ConsumeBytes(self.remaining_bytes())
+
+    def ConsumeRandomBytes(self, max_length=None) -> bytes:
+        """Consume a random count of bytes from the bytes container.
+
+        Args:
+          max_length (int, optional): The maximum length of the string. Defaults to the number of remaining bytes.
+
+        Returns:
+          bytes: Zero or more bytes.
+        """
+        if max_length is None:
+            max_length = self.remaining_bytes()
+        else:
+            max_length = min(max_length, self.remaining_bytes())
+
+        return self.ConsumeBytes(self.ConsumeIntInRange(0, max_length))
+
+    def ConsumeRandomString(self, max_length=None) -> str:
+        """Consume bytes to produce a Unicode string.
+
+        Args:
+          max_length (int, optional): The maximum length of the string. Defaults to the number of remaining bytes.
+
+        Returns:
+         str: A Unicode string.
+        """
+        if max_length is None:
+            max_length = self.remaining_bytes()
+        else:
+            max_length = min(max_length, self.remaining_bytes())
+
+        return self.ConsumeUnicode(self.ConsumeIntInRange(0, max_length))