فهرست منبع

Add initial object store fuzz target

Based off of the example code in the object store tutorial,
`fuzz_object_store.py` uses a `MemoryRepo` to improve efficency of the
test by avoiding disk IO.

A `ConsumeRandomInt` method was added to `EnhancedFuzzedDataProvider` to
make getting an `int` easier to do when we don't care about the upper
and lower bounds of the value in the test.
David Lakin 10 ماه پیش
والد
کامیت
0922d2225d
2فایلهای تغییر یافته به همراه76 افزوده شده و 0 حذف شده
  1. 62 0
      fuzzing/fuzz-targets/fuzz_object_store.py
  2. 14 0
      fuzzing/fuzz-targets/test_utils.py

+ 62 - 0
fuzzing/fuzz-targets/fuzz_object_store.py

@@ -0,0 +1,62 @@
+import sys
+import stat
+
+import atheris
+
+with atheris.instrument_imports():
+    # We instrument `test_utils` as well, so it doesn't block coverage analysis in Fuzz Introspector:
+    from test_utils import EnhancedFuzzedDataProvider, is_expected_exception
+    from dulwich.objects import Blob, Tree, Commit, S_IFGITLINK
+    from dulwich.errors import ObjectFormatException
+    from dulwich.repo import (
+        MemoryRepo,
+        InvalidUserIdentity,
+    )
+
+
+def TestOneInput(data):
+    fdp = EnhancedFuzzedDataProvider(data)
+    repo = MemoryRepo()
+    blob = Blob.from_string(fdp.ConsumeRandomBytes())
+    tree = Tree()
+    tree.add(
+        fdp.ConsumeRandomBytes(),
+        fdp.PickValueInList([stat.S_IFREG, stat.S_IFLNK, stat.S_IFDIR, S_IFGITLINK]),
+        blob.id,
+    )
+    commit = Commit()
+    commit.tree = tree.id
+    commit.author = fdp.ConsumeRandomBytes()
+    commit.committer = fdp.ConsumeRandomBytes()
+    commit.commit_time = fdp.ConsumeRandomInt()
+    commit.commit_timezone = fdp.ConsumeRandomInt()
+    commit.author_time = fdp.ConsumeRandomInt()
+    commit.author_timezone = fdp.ConsumeRandomInt()
+    commit.message = fdp.ConsumeRandomBytes()
+
+    object_store = repo.object_store
+
+    try:
+        object_store.add_object(blob)
+        object_store.add_object(tree)
+        object_store.add_object(commit)
+    except (InvalidUserIdentity, ObjectFormatException):
+        return -1
+    except ValueError as e:
+        expected_exceptions = [
+            "subsection not found",
+            "Unable to handle non-minute offset",
+        ]
+        if is_expected_exception(expected_exceptions, e):
+            return -1
+        else:
+            raise e
+
+
+def main():
+    atheris.Setup(sys.argv, TestOneInput)
+    atheris.Fuzz()
+
+
+if __name__ == "__main__":
+    main()

+ 14 - 0
fuzzing/fuzz-targets/test_utils.py

@@ -2,6 +2,7 @@ import atheris  # pragma: no cover
 from typing import List  # pragma: no cover
 
 
+@atheris.instrument_func
 def is_expected_exception(
     error_message_list: List[str], exception: Exception
 ):  # pragma: no cover
@@ -70,3 +71,16 @@ class EnhancedFuzzedDataProvider(atheris.FuzzedDataProvider):  # pragma: no cove
             max_length = min(max_length, self.remaining_bytes())
 
         return self.ConsumeUnicode(self.ConsumeIntInRange(0, max_length))
+
+    def ConsumeRandomInt(self, minimum=0, maximum=1234567890) -> int:
+        """Consume bytes to produce an integer.
+
+        Args:
+          minimum (int, optional): The minimum value of the integer. Defaults to 0.
+          maximum (int, optional): The maximum value of the integer. Defaults to 1234567890.
+
+        Returns:
+          int: An integer.
+        """
+
+        return self.ConsumeIntInRange(minimum, maximum)