annotate.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. # annotate.py -- Annotate files with last changed revision
  2. # Copyright (C) 2015 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # This program is free software; you can redistribute it and/or
  5. # modify it under the terms of the GNU General Public License
  6. # as published by the Free Software Foundation; either version 2
  7. # or (at your option) a later version of the License.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU General Public License
  15. # along with this program; if not, write to the Free Software
  16. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  17. # MA 02110-1301, USA.
  18. """Annotate file contents indicating when they were last changed.
  19. Annotated lines are represented as tuples with last modified revision SHA1
  20. and contents.
  21. Please note that this is a very naive annotate implementation. It works,
  22. but its speed could be improved - in particular because it uses
  23. Python's difflib.
  24. """
  25. __all__ = ["annotate_lines", "update_lines"]
  26. import difflib
  27. from collections.abc import Sequence
  28. from typing import TYPE_CHECKING
  29. from dulwich.objects import Blob
  30. from dulwich.walk import (
  31. ORDER_DATE,
  32. Walker,
  33. )
  34. if TYPE_CHECKING:
  35. from dulwich.diff_tree import TreeChange
  36. from dulwich.object_store import BaseObjectStore
  37. from dulwich.objects import Commit, ObjectID, TreeEntry
  38. # Walk over ancestry graph breadth-first
  39. # When checking each revision, find lines that according to difflib.Differ()
  40. # are common between versions.
  41. # Any lines that are not in common were introduced by the newer revision.
  42. # If there were no lines kept from the older version, stop going deeper in the
  43. # graph.
  44. def update_lines(
  45. annotated_lines: Sequence[tuple[tuple["Commit", "TreeEntry"], bytes]],
  46. new_history_data: tuple["Commit", "TreeEntry"],
  47. new_blob: "Blob",
  48. ) -> list[tuple[tuple["Commit", "TreeEntry"], bytes]]:
  49. """Update annotation lines with old blob lines."""
  50. ret: list[tuple[tuple[Commit, TreeEntry], bytes]] = []
  51. new_lines = new_blob.splitlines()
  52. matcher = difflib.SequenceMatcher(
  53. a=[line for (h, line) in annotated_lines], b=new_lines
  54. )
  55. for tag, i1, i2, j1, j2 in matcher.get_opcodes():
  56. if tag == "equal":
  57. ret.extend(annotated_lines[i1:i2])
  58. elif tag in ("insert", "replace"):
  59. ret.extend([(new_history_data, line) for line in new_lines[j1:j2]])
  60. elif tag == "delete":
  61. pass # don't care
  62. else:
  63. raise RuntimeError(f"Unknown tag {tag} returned in diff")
  64. return ret
  65. def annotate_lines(
  66. store: "BaseObjectStore",
  67. commit_id: "ObjectID",
  68. path: bytes,
  69. order: str = ORDER_DATE,
  70. lines: Sequence[tuple[tuple["Commit", "TreeEntry"], bytes]] | None = None,
  71. follow: bool = True,
  72. ) -> list[tuple[tuple["Commit", "TreeEntry"], bytes]]:
  73. """Annotate the lines of a blob.
  74. :param store: Object store to retrieve objects from
  75. :param commit_id: Commit id in which to annotate path
  76. :param path: Path to annotate
  77. :param order: Order in which to process history (defaults to ORDER_DATE)
  78. :param lines: Initial lines to compare to (defaults to specified)
  79. :param follow: Whether to follow changes across renames/copies
  80. :return: List of (commit, line) entries where
  81. commit is the oldest commit that changed a line
  82. """
  83. walker = Walker(
  84. store, include=[commit_id], paths=[path], order=order, follow=follow
  85. )
  86. revs: list[tuple[Commit, TreeEntry]] = []
  87. for log_entry in walker:
  88. for tree_change in log_entry.changes():
  89. changes: list[TreeChange]
  90. if isinstance(tree_change, list):
  91. changes = tree_change
  92. else:
  93. changes = [tree_change]
  94. for change in changes:
  95. if change.new is not None and change.new.path == path:
  96. if change.old is not None and change.old.path is not None:
  97. path = change.old.path
  98. revs.append((log_entry.commit, change.new))
  99. break
  100. lines_annotated: list[tuple[tuple[Commit, TreeEntry], bytes]] = []
  101. for commit, entry in reversed(revs):
  102. assert entry.sha is not None
  103. blob_obj = store[entry.sha]
  104. assert isinstance(blob_obj, Blob)
  105. lines_annotated = update_lines(lines_annotated, (commit, entry), blob_obj)
  106. return lines_annotated