reflog.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. # reflog.py -- Parsing and writing reflog files
  2. # Copyright (C) 2015 Jelmer Vernooij and others.
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as public by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Utilities for reading and generating reflogs."""
  22. import collections
  23. from collections.abc import Generator
  24. from typing import BinaryIO, Optional, Union
  25. from .objects import ZERO_SHA, format_timezone, parse_timezone
  26. Entry = collections.namedtuple(
  27. "Entry",
  28. ["old_sha", "new_sha", "committer", "timestamp", "timezone", "message"],
  29. )
  30. def format_reflog_line(
  31. old_sha: Optional[bytes],
  32. new_sha: bytes,
  33. committer: bytes,
  34. timestamp: Union[int, float],
  35. timezone: int,
  36. message: bytes,
  37. ) -> bytes:
  38. """Generate a single reflog line.
  39. Args:
  40. old_sha: Old Commit SHA
  41. new_sha: New Commit SHA
  42. committer: Committer name and e-mail
  43. timestamp: Timestamp
  44. timezone: Timezone
  45. message: Message
  46. """
  47. if old_sha is None:
  48. old_sha = ZERO_SHA
  49. return (
  50. old_sha
  51. + b" "
  52. + new_sha
  53. + b" "
  54. + committer
  55. + b" "
  56. + str(int(timestamp)).encode("ascii")
  57. + b" "
  58. + format_timezone(timezone)
  59. + b"\t"
  60. + message
  61. )
  62. def parse_reflog_line(line: bytes) -> Entry:
  63. """Parse a reflog line.
  64. Args:
  65. line: Line to parse
  66. Returns: Tuple of (old_sha, new_sha, committer, timestamp, timezone,
  67. message)
  68. """
  69. (begin, message) = line.split(b"\t", 1)
  70. (old_sha, new_sha, rest) = begin.split(b" ", 2)
  71. (committer, timestamp_str, timezone_str) = rest.rsplit(b" ", 2)
  72. return Entry(
  73. old_sha,
  74. new_sha,
  75. committer,
  76. int(timestamp_str),
  77. parse_timezone(timezone_str)[0],
  78. message,
  79. )
  80. def read_reflog(f: BinaryIO) -> Generator[Entry, None, None]:
  81. """Read reflog.
  82. Args:
  83. f: File-like object
  84. Returns: Iterator over Entry objects
  85. """
  86. for line in f:
  87. yield parse_reflog_line(line)
  88. def drop_reflog_entry(f: BinaryIO, index: int, rewrite: bool = False) -> None:
  89. """Drop the specified reflog entry.
  90. Args:
  91. f: File-like object
  92. index: Reflog entry index (in Git reflog reverse 0-indexed order)
  93. rewrite: If a reflog entry's predecessor is removed, set its
  94. old SHA to the new SHA of the entry that now precedes it
  95. """
  96. if index < 0:
  97. raise ValueError(f"Invalid reflog index {index}")
  98. log = []
  99. offset = f.tell()
  100. for line in f:
  101. log.append((offset, parse_reflog_line(line)))
  102. offset = f.tell()
  103. inverse_index = len(log) - index - 1
  104. write_offset = log[inverse_index][0]
  105. f.seek(write_offset)
  106. if index == 0:
  107. f.truncate()
  108. return
  109. del log[inverse_index]
  110. if rewrite and index > 0 and log:
  111. if inverse_index == 0:
  112. previous_new = ZERO_SHA
  113. else:
  114. previous_new = log[inverse_index - 1][1].new_sha
  115. offset, entry = log[inverse_index]
  116. log[inverse_index] = (
  117. offset,
  118. Entry(
  119. previous_new,
  120. entry.new_sha,
  121. entry.committer,
  122. entry.timestamp,
  123. entry.timezone,
  124. entry.message,
  125. ),
  126. )
  127. for _, entry in log[inverse_index:]:
  128. f.write(
  129. format_reflog_line(
  130. entry.old_sha,
  131. entry.new_sha,
  132. entry.committer,
  133. entry.timestamp,
  134. entry.timezone,
  135. entry.message,
  136. )
  137. )
  138. f.truncate()