bisect.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441
  1. # bisect.py -- Git bisect algorithm implementation
  2. # Copyright (C) 2025 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Git bisect implementation."""
  22. __all__ = ["BisectState"]
  23. import os
  24. from collections.abc import Sequence, Set
  25. from dulwich.object_store import peel_sha
  26. from dulwich.objects import Commit, ObjectID
  27. from dulwich.refs import HEADREF, Ref
  28. from dulwich.repo import Repo
  29. class BisectState:
  30. """Manages the state of a bisect session."""
  31. def __init__(self, repo: Repo) -> None:
  32. """Initialize BisectState.
  33. Args:
  34. repo: Repository to perform bisect on
  35. """
  36. self.repo = repo
  37. self._bisect_dir = os.path.join(repo.controldir(), "BISECT_START")
  38. @property
  39. def is_active(self) -> bool:
  40. """Check if a bisect session is active."""
  41. return os.path.exists(self._bisect_dir)
  42. def start(
  43. self,
  44. bad: ObjectID | None = None,
  45. good: Sequence[ObjectID] | None = None,
  46. paths: Sequence[bytes] | None = None,
  47. no_checkout: bool = False,
  48. term_bad: str = "bad",
  49. term_good: str = "good",
  50. ) -> None:
  51. """Start a new bisect session.
  52. Args:
  53. bad: The bad commit SHA (defaults to HEAD)
  54. good: List of good commit SHAs
  55. paths: Optional paths to limit bisect to
  56. no_checkout: If True, don't checkout commits during bisect
  57. term_bad: Term to use for bad commits (default: "bad")
  58. term_good: Term to use for good commits (default: "good")
  59. """
  60. if self.is_active:
  61. raise ValueError("Bisect session already in progress")
  62. # Create bisect state directory
  63. bisect_refs_dir = os.path.join(self.repo.controldir(), "refs", "bisect")
  64. os.makedirs(bisect_refs_dir, exist_ok=True)
  65. # Store current branch/commit
  66. try:
  67. ref_chain, sha = self.repo.refs.follow(HEADREF)
  68. if sha is None:
  69. # No HEAD exists
  70. raise ValueError("Cannot start bisect: repository has no HEAD")
  71. # Use the first non-HEAD ref in the chain, or the SHA itself
  72. current_branch: Ref | ObjectID
  73. if len(ref_chain) > 1:
  74. current_branch = ref_chain[1] # The actual branch ref
  75. else:
  76. current_branch = sha # Detached HEAD
  77. except KeyError:
  78. # Detached HEAD
  79. try:
  80. current_branch = self.repo.head()
  81. except KeyError:
  82. # No HEAD exists - can't start bisect
  83. raise ValueError("Cannot start bisect: repository has no HEAD")
  84. # Write BISECT_START
  85. with open(self._bisect_dir, "wb") as f:
  86. f.write(current_branch)
  87. # Write BISECT_TERMS
  88. terms_file = os.path.join(self.repo.controldir(), "BISECT_TERMS")
  89. with open(terms_file, "w") as f:
  90. f.write(f"{term_bad}\n{term_good}\n")
  91. # Write BISECT_NAMES (paths)
  92. names_file = os.path.join(self.repo.controldir(), "BISECT_NAMES")
  93. with open(names_file, "w") as f:
  94. if paths:
  95. f.write(
  96. "\n".join(path.decode("utf-8", "replace") for path in paths) + "\n"
  97. )
  98. else:
  99. f.write("\n")
  100. # Initialize BISECT_LOG
  101. log_file = os.path.join(self.repo.controldir(), "BISECT_LOG")
  102. with open(log_file, "w") as f:
  103. f.write("git bisect start\n")
  104. f.write("# status: waiting for both good and bad commits\n")
  105. # Mark bad commit if provided
  106. if bad is not None:
  107. self.mark_bad(bad)
  108. # Mark good commits if provided
  109. if good:
  110. for g in good:
  111. self.mark_good(g)
  112. def mark_bad(self, rev: ObjectID | None = None) -> ObjectID | None:
  113. """Mark a commit as bad.
  114. Args:
  115. rev: Commit SHA to mark as bad (defaults to HEAD)
  116. Returns:
  117. The SHA of the next commit to test, or None if bisect is complete
  118. """
  119. if not self.is_active:
  120. raise ValueError("No bisect session in progress")
  121. if rev is None:
  122. rev = self.repo.head()
  123. else:
  124. rev = peel_sha(self.repo.object_store, rev)[1].id
  125. # Write bad ref
  126. bad_ref_path = os.path.join(self.repo.controldir(), "refs", "bisect", "bad")
  127. with open(bad_ref_path, "wb") as f:
  128. f.write(rev + b"\n")
  129. # Update log
  130. self._append_to_log(
  131. f"# bad: [{rev.decode('ascii')}] {self._get_commit_subject(rev)}"
  132. )
  133. self._append_to_log(f"git bisect bad {rev.decode('ascii')}")
  134. return self._find_next_commit()
  135. def mark_good(self, rev: ObjectID | None = None) -> ObjectID | None:
  136. """Mark a commit as good.
  137. Args:
  138. rev: Commit SHA to mark as good (defaults to HEAD)
  139. Returns:
  140. The SHA of the next commit to test, or None if bisect is complete
  141. """
  142. if not self.is_active:
  143. raise ValueError("No bisect session in progress")
  144. if rev is None:
  145. rev = self.repo.head()
  146. else:
  147. rev = peel_sha(self.repo.object_store, rev)[1].id
  148. # Write good ref
  149. good_ref_path = os.path.join(
  150. self.repo.controldir(), "refs", "bisect", f"good-{rev.decode('ascii')}"
  151. )
  152. with open(good_ref_path, "wb") as f:
  153. f.write(rev + b"\n")
  154. # Update log
  155. self._append_to_log(
  156. f"# good: [{rev.decode('ascii')}] {self._get_commit_subject(rev)}"
  157. )
  158. self._append_to_log(f"git bisect good {rev.decode('ascii')}")
  159. return self._find_next_commit()
  160. def skip(self, revs: Sequence[ObjectID] | None = None) -> ObjectID | None:
  161. """Skip one or more commits.
  162. Args:
  163. revs: List of commits to skip (defaults to [HEAD])
  164. Returns:
  165. The SHA of the next commit to test, or None if bisect is complete
  166. """
  167. if not self.is_active:
  168. raise ValueError("No bisect session in progress")
  169. if revs is None:
  170. revs = [self.repo.head()]
  171. for rev in revs:
  172. rev = peel_sha(self.repo.object_store, rev)[1].id
  173. skip_ref_path = os.path.join(
  174. self.repo.controldir(), "refs", "bisect", f"skip-{rev.decode('ascii')}"
  175. )
  176. with open(skip_ref_path, "wb") as f:
  177. f.write(rev + b"\n")
  178. self._append_to_log(f"git bisect skip {rev.decode('ascii')}")
  179. return self._find_next_commit()
  180. def reset(self, commit: ObjectID | None = None) -> None:
  181. """Reset bisect state and return to original branch/commit.
  182. Args:
  183. commit: Optional commit to reset to (defaults to original branch/commit)
  184. """
  185. if not self.is_active:
  186. raise ValueError("No bisect session in progress")
  187. # Read original branch/commit
  188. with open(self._bisect_dir, "rb") as f:
  189. original = f.read().strip()
  190. # Clean up bisect files
  191. for filename in [
  192. "BISECT_START",
  193. "BISECT_TERMS",
  194. "BISECT_NAMES",
  195. "BISECT_LOG",
  196. "BISECT_EXPECTED_REV",
  197. "BISECT_ANCESTORS_OK",
  198. ]:
  199. filepath = os.path.join(self.repo.controldir(), filename)
  200. if os.path.exists(filepath):
  201. os.remove(filepath)
  202. # Clean up refs/bisect directory
  203. bisect_refs_dir = os.path.join(self.repo.controldir(), "refs", "bisect")
  204. if os.path.exists(bisect_refs_dir):
  205. for filename in os.listdir(bisect_refs_dir):
  206. os.remove(os.path.join(bisect_refs_dir, filename))
  207. os.rmdir(bisect_refs_dir)
  208. # Reset to target commit/branch
  209. if commit is None:
  210. if original.startswith(b"refs/"):
  211. # It's a branch reference - need to create a symbolic ref
  212. self.repo.refs.set_symbolic_ref(HEADREF, Ref(original))
  213. else:
  214. # It's a commit SHA
  215. self.repo.refs[HEADREF] = ObjectID(original)
  216. else:
  217. commit = peel_sha(self.repo.object_store, commit)[1].id
  218. self.repo.refs[HEADREF] = commit
  219. def get_log(self) -> str:
  220. """Get the bisect log."""
  221. if not self.is_active:
  222. raise ValueError("No bisect session in progress")
  223. log_file = os.path.join(self.repo.controldir(), "BISECT_LOG")
  224. with open(log_file) as f:
  225. return f.read()
  226. def replay(self, log_content: str) -> None:
  227. """Replay a bisect log.
  228. Args:
  229. log_content: The bisect log content to replay
  230. """
  231. # Parse and execute commands from log
  232. for line in log_content.splitlines():
  233. line = line.strip()
  234. if line.startswith("#") or not line:
  235. continue
  236. parts = line.split()
  237. if len(parts) < 3 or parts[0] != "git" or parts[1] != "bisect":
  238. continue
  239. cmd = parts[2]
  240. args = parts[3:] if len(parts) > 3 else []
  241. if cmd == "start":
  242. self.start()
  243. elif cmd == "bad":
  244. rev = ObjectID(args[0].encode("ascii")) if args else None
  245. self.mark_bad(rev)
  246. elif cmd == "good":
  247. rev = ObjectID(args[0].encode("ascii")) if args else None
  248. self.mark_good(rev)
  249. elif cmd == "skip":
  250. revs = [ObjectID(arg.encode("ascii")) for arg in args] if args else None
  251. self.skip(revs)
  252. def _find_next_commit(self) -> ObjectID | None:
  253. """Find the next commit to test using binary search.
  254. Returns:
  255. The SHA of the next commit to test, or None if bisect is complete
  256. """
  257. # Get bad commit
  258. bad_ref_path = os.path.join(self.repo.controldir(), "refs", "bisect", "bad")
  259. if not os.path.exists(bad_ref_path):
  260. self._append_to_log("# status: waiting for both good and bad commits")
  261. return None
  262. with open(bad_ref_path, "rb") as f:
  263. bad_sha = ObjectID(f.read().strip())
  264. # Get all good commits
  265. good_shas: list[ObjectID] = []
  266. bisect_refs_dir = os.path.join(self.repo.controldir(), "refs", "bisect")
  267. for filename in os.listdir(bisect_refs_dir):
  268. if filename.startswith("good-"):
  269. with open(os.path.join(bisect_refs_dir, filename), "rb") as f:
  270. good_shas.append(ObjectID(f.read().strip()))
  271. if not good_shas:
  272. self._append_to_log(
  273. "# status: waiting for good commit(s), bad commit known"
  274. )
  275. return None
  276. # Get skip commits
  277. skip_shas: set[ObjectID] = set()
  278. for filename in os.listdir(bisect_refs_dir):
  279. if filename.startswith("skip-"):
  280. with open(os.path.join(bisect_refs_dir, filename), "rb") as f:
  281. skip_shas.add(ObjectID(f.read().strip()))
  282. # Find commits between good and bad
  283. candidates = self._find_bisect_candidates(bad_sha, good_shas, skip_shas)
  284. if not candidates:
  285. # Bisect complete - the first bad commit is found
  286. self._append_to_log(
  287. f"# first bad commit: [{bad_sha.decode('ascii')}] "
  288. f"{self._get_commit_subject(bad_sha)}"
  289. )
  290. return None
  291. # Find midpoint
  292. mid_idx = len(candidates) // 2
  293. next_commit = candidates[mid_idx]
  294. # Write BISECT_EXPECTED_REV
  295. expected_file = os.path.join(self.repo.controldir(), "BISECT_EXPECTED_REV")
  296. with open(expected_file, "wb") as f:
  297. f.write(next_commit + b"\n")
  298. # Update status in log
  299. steps_remaining = self._estimate_steps(len(candidates))
  300. self._append_to_log(
  301. f"Bisecting: {len(candidates) - 1} revisions left to test after this "
  302. f"(roughly {steps_remaining} steps)"
  303. )
  304. self._append_to_log(
  305. f"[{next_commit.decode('ascii')}] {self._get_commit_subject(next_commit)}"
  306. )
  307. return next_commit
  308. def _find_bisect_candidates(
  309. self, bad_sha: ObjectID, good_shas: Sequence[ObjectID], skip_shas: Set[ObjectID]
  310. ) -> list[ObjectID]:
  311. """Find all commits between good and bad commits.
  312. Args:
  313. bad_sha: The bad commit SHA
  314. good_shas: List of good commit SHAs
  315. skip_shas: Set of commits to skip
  316. Returns:
  317. List of candidate commit SHAs in topological order
  318. """
  319. # Use git's graph walking to find commits
  320. # This is a simplified version - a full implementation would need
  321. # to handle merge commits properly
  322. candidates: list[ObjectID] = []
  323. visited: set[ObjectID] = set(good_shas)
  324. queue: list[ObjectID] = [bad_sha]
  325. while queue:
  326. sha = queue.pop(0)
  327. if sha in visited or sha in skip_shas:
  328. continue
  329. visited.add(sha)
  330. commit = self.repo.object_store[sha]
  331. # Don't include good commits
  332. if sha not in good_shas:
  333. candidates.append(sha)
  334. # Add parents to queue
  335. if isinstance(commit, Commit):
  336. for parent in commit.parents:
  337. if parent not in visited:
  338. queue.append(parent)
  339. # Remove the bad commit itself
  340. if bad_sha in candidates:
  341. candidates.remove(bad_sha)
  342. return candidates
  343. def _get_commit_subject(self, sha: ObjectID) -> str:
  344. """Get the subject line of a commit message."""
  345. obj = self.repo.object_store[sha]
  346. if isinstance(obj, Commit):
  347. message = obj.message.decode("utf-8", errors="replace")
  348. lines = message.split("\n")
  349. return lines[0] if lines else ""
  350. return ""
  351. def _append_to_log(self, line: str) -> None:
  352. """Append a line to the bisect log."""
  353. log_file = os.path.join(self.repo.controldir(), "BISECT_LOG")
  354. with open(log_file, "a") as f:
  355. f.write(line + "\n")
  356. def _estimate_steps(self, num_candidates: int) -> int:
  357. """Estimate the number of steps remaining in bisect."""
  358. if num_candidates <= 1:
  359. return 0
  360. steps = 0
  361. while num_candidates > 1:
  362. num_candidates //= 2
  363. steps += 1
  364. return steps