bisect.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. # bisect.py -- Git bisect algorithm implementation
  2. # Copyright (C) 2025 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  5. # General Public License as published by the Free Software Foundation; version 2.0
  6. # or (at your option) any later version. You can redistribute it and/or
  7. # modify it under the terms of either of these two licenses.
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. #
  15. # You should have received a copy of the licenses; if not, see
  16. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  17. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  18. # License, Version 2.0.
  19. #
  20. """Git bisect implementation."""
  21. __all__ = ["BisectState"]
  22. import os
  23. from collections.abc import Sequence, Set
  24. from dulwich.object_store import peel_sha
  25. from dulwich.objects import Commit, ObjectID
  26. from dulwich.refs import HEADREF, Ref
  27. from dulwich.repo import Repo
  28. class BisectState:
  29. """Manages the state of a bisect session."""
  30. def __init__(self, repo: Repo) -> None:
  31. """Initialize BisectState.
  32. Args:
  33. repo: Repository to perform bisect on
  34. """
  35. self.repo = repo
  36. self._bisect_dir = os.path.join(repo.controldir(), "BISECT_START")
  37. @property
  38. def is_active(self) -> bool:
  39. """Check if a bisect session is active."""
  40. return os.path.exists(self._bisect_dir)
  41. def start(
  42. self,
  43. bad: ObjectID | None = None,
  44. good: Sequence[ObjectID] | None = None,
  45. paths: Sequence[bytes] | None = None,
  46. no_checkout: bool = False,
  47. term_bad: str = "bad",
  48. term_good: str = "good",
  49. ) -> None:
  50. """Start a new bisect session.
  51. Args:
  52. bad: The bad commit SHA (defaults to HEAD)
  53. good: List of good commit SHAs
  54. paths: Optional paths to limit bisect to
  55. no_checkout: If True, don't checkout commits during bisect
  56. term_bad: Term to use for bad commits (default: "bad")
  57. term_good: Term to use for good commits (default: "good")
  58. """
  59. if self.is_active:
  60. raise ValueError("Bisect session already in progress")
  61. # Create bisect state directory
  62. bisect_refs_dir = os.path.join(self.repo.controldir(), "refs", "bisect")
  63. os.makedirs(bisect_refs_dir, exist_ok=True)
  64. # Store current branch/commit
  65. try:
  66. ref_chain, sha = self.repo.refs.follow(HEADREF)
  67. if sha is None:
  68. # No HEAD exists
  69. raise ValueError("Cannot start bisect: repository has no HEAD")
  70. # Use the first non-HEAD ref in the chain, or the SHA itself
  71. current_branch: Ref | ObjectID
  72. if len(ref_chain) > 1:
  73. current_branch = ref_chain[1] # The actual branch ref
  74. else:
  75. current_branch = sha # Detached HEAD
  76. except KeyError:
  77. # Detached HEAD
  78. try:
  79. current_branch = self.repo.head()
  80. except KeyError:
  81. # No HEAD exists - can't start bisect
  82. raise ValueError("Cannot start bisect: repository has no HEAD")
  83. # Write BISECT_START
  84. with open(self._bisect_dir, "wb") as f:
  85. f.write(current_branch)
  86. # Write BISECT_TERMS
  87. terms_file = os.path.join(self.repo.controldir(), "BISECT_TERMS")
  88. with open(terms_file, "w") as f:
  89. f.write(f"{term_bad}\n{term_good}\n")
  90. # Write BISECT_NAMES (paths)
  91. names_file = os.path.join(self.repo.controldir(), "BISECT_NAMES")
  92. with open(names_file, "w") as f:
  93. if paths:
  94. f.write(
  95. "\n".join(path.decode("utf-8", "replace") for path in paths) + "\n"
  96. )
  97. else:
  98. f.write("\n")
  99. # Initialize BISECT_LOG
  100. log_file = os.path.join(self.repo.controldir(), "BISECT_LOG")
  101. with open(log_file, "w") as f:
  102. f.write("git bisect start\n")
  103. f.write("# status: waiting for both good and bad commits\n")
  104. # Mark bad commit if provided
  105. if bad is not None:
  106. self.mark_bad(bad)
  107. # Mark good commits if provided
  108. if good:
  109. for g in good:
  110. self.mark_good(g)
  111. def mark_bad(self, rev: ObjectID | None = None) -> ObjectID | None:
  112. """Mark a commit as bad.
  113. Args:
  114. rev: Commit SHA to mark as bad (defaults to HEAD)
  115. Returns:
  116. The SHA of the next commit to test, or None if bisect is complete
  117. """
  118. if not self.is_active:
  119. raise ValueError("No bisect session in progress")
  120. if rev is None:
  121. rev = self.repo.head()
  122. else:
  123. rev = peel_sha(self.repo.object_store, rev)[1].id
  124. # Write bad ref
  125. bad_ref_path = os.path.join(self.repo.controldir(), "refs", "bisect", "bad")
  126. with open(bad_ref_path, "wb") as f:
  127. f.write(rev + b"\n")
  128. # Update log
  129. self._append_to_log(
  130. f"# bad: [{rev.decode('ascii')}] {self._get_commit_subject(rev)}"
  131. )
  132. self._append_to_log(f"git bisect bad {rev.decode('ascii')}")
  133. return self._find_next_commit()
  134. def mark_good(self, rev: ObjectID | None = None) -> ObjectID | None:
  135. """Mark a commit as good.
  136. Args:
  137. rev: Commit SHA to mark as good (defaults to HEAD)
  138. Returns:
  139. The SHA of the next commit to test, or None if bisect is complete
  140. """
  141. if not self.is_active:
  142. raise ValueError("No bisect session in progress")
  143. if rev is None:
  144. rev = self.repo.head()
  145. else:
  146. rev = peel_sha(self.repo.object_store, rev)[1].id
  147. # Write good ref
  148. good_ref_path = os.path.join(
  149. self.repo.controldir(), "refs", "bisect", f"good-{rev.decode('ascii')}"
  150. )
  151. with open(good_ref_path, "wb") as f:
  152. f.write(rev + b"\n")
  153. # Update log
  154. self._append_to_log(
  155. f"# good: [{rev.decode('ascii')}] {self._get_commit_subject(rev)}"
  156. )
  157. self._append_to_log(f"git bisect good {rev.decode('ascii')}")
  158. return self._find_next_commit()
  159. def skip(self, revs: Sequence[ObjectID] | None = None) -> ObjectID | None:
  160. """Skip one or more commits.
  161. Args:
  162. revs: List of commits to skip (defaults to [HEAD])
  163. Returns:
  164. The SHA of the next commit to test, or None if bisect is complete
  165. """
  166. if not self.is_active:
  167. raise ValueError("No bisect session in progress")
  168. if revs is None:
  169. revs = [self.repo.head()]
  170. for rev in revs:
  171. rev = peel_sha(self.repo.object_store, rev)[1].id
  172. skip_ref_path = os.path.join(
  173. self.repo.controldir(), "refs", "bisect", f"skip-{rev.decode('ascii')}"
  174. )
  175. with open(skip_ref_path, "wb") as f:
  176. f.write(rev + b"\n")
  177. self._append_to_log(f"git bisect skip {rev.decode('ascii')}")
  178. return self._find_next_commit()
  179. def reset(self, commit: ObjectID | None = None) -> None:
  180. """Reset bisect state and return to original branch/commit.
  181. Args:
  182. commit: Optional commit to reset to (defaults to original branch/commit)
  183. """
  184. if not self.is_active:
  185. raise ValueError("No bisect session in progress")
  186. # Read original branch/commit
  187. with open(self._bisect_dir, "rb") as f:
  188. original = f.read().strip()
  189. # Clean up bisect files
  190. for filename in [
  191. "BISECT_START",
  192. "BISECT_TERMS",
  193. "BISECT_NAMES",
  194. "BISECT_LOG",
  195. "BISECT_EXPECTED_REV",
  196. "BISECT_ANCESTORS_OK",
  197. ]:
  198. filepath = os.path.join(self.repo.controldir(), filename)
  199. if os.path.exists(filepath):
  200. os.remove(filepath)
  201. # Clean up refs/bisect directory
  202. bisect_refs_dir = os.path.join(self.repo.controldir(), "refs", "bisect")
  203. if os.path.exists(bisect_refs_dir):
  204. for filename in os.listdir(bisect_refs_dir):
  205. os.remove(os.path.join(bisect_refs_dir, filename))
  206. os.rmdir(bisect_refs_dir)
  207. # Reset to target commit/branch
  208. if commit is None:
  209. if original.startswith(b"refs/"):
  210. # It's a branch reference - need to create a symbolic ref
  211. self.repo.refs.set_symbolic_ref(HEADREF, Ref(original))
  212. else:
  213. # It's a commit SHA
  214. self.repo.refs[HEADREF] = ObjectID(original)
  215. else:
  216. commit = peel_sha(self.repo.object_store, commit)[1].id
  217. self.repo.refs[HEADREF] = commit
  218. def get_log(self) -> str:
  219. """Get the bisect log."""
  220. if not self.is_active:
  221. raise ValueError("No bisect session in progress")
  222. log_file = os.path.join(self.repo.controldir(), "BISECT_LOG")
  223. with open(log_file) as f:
  224. return f.read()
  225. def replay(self, log_content: str) -> None:
  226. """Replay a bisect log.
  227. Args:
  228. log_content: The bisect log content to replay
  229. """
  230. # Parse and execute commands from log
  231. for line in log_content.splitlines():
  232. line = line.strip()
  233. if line.startswith("#") or not line:
  234. continue
  235. parts = line.split()
  236. if len(parts) < 3 or parts[0] != "git" or parts[1] != "bisect":
  237. continue
  238. cmd = parts[2]
  239. args = parts[3:] if len(parts) > 3 else []
  240. if cmd == "start":
  241. self.start()
  242. elif cmd == "bad":
  243. rev = ObjectID(args[0].encode("ascii")) if args else None
  244. self.mark_bad(rev)
  245. elif cmd == "good":
  246. rev = ObjectID(args[0].encode("ascii")) if args else None
  247. self.mark_good(rev)
  248. elif cmd == "skip":
  249. revs = [ObjectID(arg.encode("ascii")) for arg in args] if args else None
  250. self.skip(revs)
  251. def _find_next_commit(self) -> ObjectID | None:
  252. """Find the next commit to test using binary search.
  253. Returns:
  254. The SHA of the next commit to test, or None if bisect is complete
  255. """
  256. # Get bad commit
  257. bad_ref_path = os.path.join(self.repo.controldir(), "refs", "bisect", "bad")
  258. if not os.path.exists(bad_ref_path):
  259. self._append_to_log("# status: waiting for both good and bad commits")
  260. return None
  261. with open(bad_ref_path, "rb") as f:
  262. bad_sha = ObjectID(f.read().strip())
  263. # Get all good commits
  264. good_shas: list[ObjectID] = []
  265. bisect_refs_dir = os.path.join(self.repo.controldir(), "refs", "bisect")
  266. for filename in os.listdir(bisect_refs_dir):
  267. if filename.startswith("good-"):
  268. with open(os.path.join(bisect_refs_dir, filename), "rb") as f:
  269. good_shas.append(ObjectID(f.read().strip()))
  270. if not good_shas:
  271. self._append_to_log(
  272. "# status: waiting for good commit(s), bad commit known"
  273. )
  274. return None
  275. # Get skip commits
  276. skip_shas: set[ObjectID] = set()
  277. for filename in os.listdir(bisect_refs_dir):
  278. if filename.startswith("skip-"):
  279. with open(os.path.join(bisect_refs_dir, filename), "rb") as f:
  280. skip_shas.add(ObjectID(f.read().strip()))
  281. # Find commits between good and bad
  282. candidates = self._find_bisect_candidates(bad_sha, good_shas, skip_shas)
  283. if not candidates:
  284. # Bisect complete - the first bad commit is found
  285. self._append_to_log(
  286. f"# first bad commit: [{bad_sha.decode('ascii')}] "
  287. f"{self._get_commit_subject(bad_sha)}"
  288. )
  289. return None
  290. # Find midpoint
  291. mid_idx = len(candidates) // 2
  292. next_commit = candidates[mid_idx]
  293. # Write BISECT_EXPECTED_REV
  294. expected_file = os.path.join(self.repo.controldir(), "BISECT_EXPECTED_REV")
  295. with open(expected_file, "wb") as f:
  296. f.write(next_commit + b"\n")
  297. # Update status in log
  298. steps_remaining = self._estimate_steps(len(candidates))
  299. self._append_to_log(
  300. f"Bisecting: {len(candidates) - 1} revisions left to test after this "
  301. f"(roughly {steps_remaining} steps)"
  302. )
  303. self._append_to_log(
  304. f"[{next_commit.decode('ascii')}] {self._get_commit_subject(next_commit)}"
  305. )
  306. return next_commit
  307. def _find_bisect_candidates(
  308. self, bad_sha: ObjectID, good_shas: Sequence[ObjectID], skip_shas: Set[ObjectID]
  309. ) -> list[ObjectID]:
  310. """Find all commits between good and bad commits.
  311. Args:
  312. bad_sha: The bad commit SHA
  313. good_shas: List of good commit SHAs
  314. skip_shas: Set of commits to skip
  315. Returns:
  316. List of candidate commit SHAs in topological order
  317. """
  318. # Use git's graph walking to find commits
  319. # This is a simplified version - a full implementation would need
  320. # to handle merge commits properly
  321. candidates: list[ObjectID] = []
  322. visited: set[ObjectID] = set(good_shas)
  323. queue: list[ObjectID] = [bad_sha]
  324. while queue:
  325. sha = queue.pop(0)
  326. if sha in visited or sha in skip_shas:
  327. continue
  328. visited.add(sha)
  329. commit = self.repo.object_store[sha]
  330. # Don't include good commits
  331. if sha not in good_shas:
  332. candidates.append(sha)
  333. # Add parents to queue
  334. if isinstance(commit, Commit):
  335. for parent in commit.parents:
  336. if parent not in visited:
  337. queue.append(parent)
  338. # Remove the bad commit itself
  339. if bad_sha in candidates:
  340. candidates.remove(bad_sha)
  341. return candidates
  342. def _get_commit_subject(self, sha: ObjectID) -> str:
  343. """Get the subject line of a commit message."""
  344. obj = self.repo.object_store[sha]
  345. if isinstance(obj, Commit):
  346. message = obj.message.decode("utf-8", errors="replace")
  347. lines = message.split("\n")
  348. return lines[0] if lines else ""
  349. return ""
  350. def _append_to_log(self, line: str) -> None:
  351. """Append a line to the bisect log."""
  352. log_file = os.path.join(self.repo.controldir(), "BISECT_LOG")
  353. with open(log_file, "a") as f:
  354. f.write(line + "\n")
  355. def _estimate_steps(self, num_candidates: int) -> int:
  356. """Estimate the number of steps remaining in bisect."""
  357. if num_candidates <= 1:
  358. return 0
  359. steps = 0
  360. while num_candidates > 1:
  361. num_candidates //= 2
  362. steps += 1
  363. return steps