test_annotate.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. # test_annotate.py -- tests for annotate
  2. # Copyright (C) 2015 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # This program is free software; you can redistribute it and/or
  5. # modify it under the terms of the GNU General Public License
  6. # as published by the Free Software Foundation; version 2
  7. # of the License or (at your option) a later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU General Public License
  15. # along with this program; if not, write to the Free Software
  16. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  17. # MA 02110-1301, USA.
  18. """Tests for annotate support."""
  19. import os
  20. import tempfile
  21. import unittest
  22. from typing import Any, Optional
  23. from unittest import TestCase
  24. from dulwich.annotate import annotate_lines, update_lines
  25. from dulwich.objects import Blob, Commit, Tree
  26. from dulwich.porcelain import annotate, blame
  27. from dulwich.repo import Repo
  28. class UpdateLinesTestCase(TestCase):
  29. """Tests for update_lines function."""
  30. def test_update_lines_equal(self) -> None:
  31. """Test update_lines when all lines are equal."""
  32. old_lines: list[tuple[tuple[Any, Any], bytes]] = [
  33. (("commit1", "entry1"), b"line1"),
  34. (("commit2", "entry2"), b"line2"),
  35. ]
  36. new_blob = b"line1\nline2"
  37. new_history_data = ("commit3", "entry3")
  38. result = update_lines(old_lines, new_history_data, new_blob) # type: ignore[arg-type]
  39. self.assertEqual(old_lines, result)
  40. def test_update_lines_insert(self) -> None:
  41. """Test update_lines when new lines are inserted."""
  42. old_lines: list[tuple[tuple[Any, Any], bytes]] = [
  43. (("commit1", "entry1"), b"line1"),
  44. (("commit2", "entry2"), b"line3"),
  45. ]
  46. new_blob = b"line1\nline2\nline3"
  47. new_history_data = ("commit3", "entry3")
  48. result = update_lines(old_lines, new_history_data, new_blob) # type: ignore[arg-type]
  49. expected = [
  50. (("commit1", "entry1"), b"line1"),
  51. (("commit3", "entry3"), b"line2"),
  52. (("commit2", "entry2"), b"line3"),
  53. ]
  54. self.assertEqual(expected, result)
  55. def test_update_lines_delete(self) -> None:
  56. """Test update_lines when lines are deleted."""
  57. old_lines: list[tuple[tuple[Any, Any], bytes]] = [
  58. (("commit1", "entry1"), b"line1"),
  59. (("commit2", "entry2"), b"line2"),
  60. (("commit3", "entry3"), b"line3"),
  61. ]
  62. new_blob = b"line1\nline3"
  63. new_history_data = ("commit4", "entry4")
  64. result = update_lines(old_lines, new_history_data, new_blob) # type: ignore[arg-type]
  65. expected = [
  66. (("commit1", "entry1"), b"line1"),
  67. (("commit3", "entry3"), b"line3"),
  68. ]
  69. self.assertEqual(expected, result)
  70. def test_update_lines_replace(self) -> None:
  71. """Test update_lines when lines are replaced."""
  72. old_lines: list[tuple[tuple[Any, Any], bytes]] = [
  73. (("commit1", "entry1"), b"line1"),
  74. (("commit2", "entry2"), b"line2"),
  75. ]
  76. new_blob = b"line1\nline2_modified"
  77. new_history_data = ("commit3", "entry3")
  78. result = update_lines(old_lines, new_history_data, new_blob) # type: ignore[arg-type]
  79. expected = [
  80. (("commit1", "entry1"), b"line1"),
  81. (("commit3", "entry3"), b"line2_modified"),
  82. ]
  83. self.assertEqual(expected, result)
  84. def test_update_lines_empty_old(self) -> None:
  85. """Test update_lines with empty old lines."""
  86. old_lines: list[tuple[tuple[Any, Any], bytes]] = []
  87. new_blob = b"line1\nline2"
  88. new_history_data = ("commit1", "entry1")
  89. result = update_lines(old_lines, new_history_data, new_blob) # type: ignore[arg-type]
  90. expected = [
  91. (("commit1", "entry1"), b"line1"),
  92. (("commit1", "entry1"), b"line2"),
  93. ]
  94. self.assertEqual(expected, result)
  95. def test_update_lines_empty_new(self) -> None:
  96. """Test update_lines with empty new blob."""
  97. old_lines: list[tuple[tuple[Any, Any], bytes]] = [
  98. (("commit1", "entry1"), b"line1")
  99. ]
  100. new_blob = b""
  101. new_history_data = ("commit2", "entry2")
  102. result = update_lines(old_lines, new_history_data, new_blob) # type: ignore[arg-type]
  103. self.assertEqual([], result)
  104. class AnnotateLinesTestCase(TestCase):
  105. """Tests for annotate_lines function."""
  106. def setUp(self) -> None:
  107. self.temp_dir = tempfile.mkdtemp()
  108. self.repo = Repo.init(self.temp_dir)
  109. def tearDown(self) -> None:
  110. self.repo.close()
  111. import shutil
  112. shutil.rmtree(self.temp_dir)
  113. def _make_commit(
  114. self, blob_content: bytes, message: str, parent: Optional[bytes] = None
  115. ) -> bytes:
  116. """Helper to create a commit with a single file."""
  117. # Create blob
  118. blob = Blob()
  119. blob.data = blob_content
  120. self.repo.object_store.add_object(blob)
  121. # Create tree
  122. tree = Tree()
  123. tree.add(b"test.txt", 0o100644, blob.id)
  124. self.repo.object_store.add_object(tree)
  125. # Create commit
  126. commit = Commit()
  127. commit.tree = tree.id
  128. commit.author = commit.committer = b"Test Author <test@example.com>"
  129. commit.author_time = commit.commit_time = 1000000000
  130. commit.author_timezone = commit.commit_timezone = 0
  131. commit.encoding = b"UTF-8"
  132. commit.message = message.encode("utf-8")
  133. if parent:
  134. commit.parents = [parent]
  135. else:
  136. commit.parents = []
  137. self.repo.object_store.add_object(commit)
  138. return commit.id
  139. def test_annotate_lines_single_commit(self) -> None:
  140. """Test annotating a file with a single commit."""
  141. commit_id = self._make_commit(b"line1\nline2\nline3\n", "Initial commit")
  142. result = annotate_lines(self.repo.object_store, commit_id, b"test.txt")
  143. self.assertEqual(3, len(result))
  144. for (commit, entry), line in result:
  145. self.assertEqual(commit_id, commit.id)
  146. self.assertIn(line, [b"line1\n", b"line2\n", b"line3\n"])
  147. def test_annotate_lines_multiple_commits(self) -> None:
  148. """Test annotating a file with multiple commits."""
  149. # First commit
  150. commit1_id = self._make_commit(b"line1\nline2\n", "Initial commit")
  151. # Second commit - add a line
  152. commit2_id = self._make_commit(
  153. b"line1\nline1.5\nline2\n", "Add line between", parent=commit1_id
  154. )
  155. # Third commit - modify a line
  156. commit3_id = self._make_commit(
  157. b"line1_modified\nline1.5\nline2\n", "Modify first line", parent=commit2_id
  158. )
  159. result = annotate_lines(self.repo.object_store, commit3_id, b"test.txt")
  160. self.assertEqual(3, len(result))
  161. # First line should be from commit3
  162. self.assertEqual(commit3_id, result[0][0][0].id)
  163. self.assertEqual(b"line1_modified\n", result[0][1])
  164. # Second line should be from commit2
  165. self.assertEqual(commit2_id, result[1][0][0].id)
  166. self.assertEqual(b"line1.5\n", result[1][1])
  167. # Third line should be from commit1
  168. self.assertEqual(commit1_id, result[2][0][0].id)
  169. self.assertEqual(b"line2\n", result[2][1])
  170. def test_annotate_lines_nonexistent_path(self) -> None:
  171. """Test annotating a nonexistent file."""
  172. commit_id = self._make_commit(b"content\n", "Initial commit")
  173. result = annotate_lines(self.repo.object_store, commit_id, b"nonexistent.txt")
  174. self.assertEqual([], result)
  175. class PorcelainAnnotateTestCase(TestCase):
  176. """Tests for the porcelain annotate function."""
  177. def setUp(self) -> None:
  178. self.temp_dir = tempfile.mkdtemp()
  179. self.repo = Repo.init(self.temp_dir)
  180. def tearDown(self) -> None:
  181. self.repo.close()
  182. import shutil
  183. shutil.rmtree(self.temp_dir)
  184. def _make_commit_with_file(
  185. self,
  186. filename: str,
  187. content: bytes,
  188. message: str,
  189. parent: Optional[bytes] = None,
  190. ) -> bytes:
  191. """Helper to create a commit with a file."""
  192. # Create blob
  193. blob = Blob()
  194. blob.data = content
  195. self.repo.object_store.add_object(blob)
  196. # Create tree
  197. tree = Tree()
  198. tree.add(filename.encode(), 0o100644, blob.id)
  199. self.repo.object_store.add_object(tree)
  200. # Create commit
  201. commit = Commit()
  202. commit.tree = tree.id
  203. commit.author = commit.committer = b"Test Author <test@example.com>"
  204. commit.author_time = commit.commit_time = 1000000000
  205. commit.author_timezone = commit.commit_timezone = 0
  206. commit.encoding = b"UTF-8"
  207. commit.message = message.encode("utf-8")
  208. if parent:
  209. commit.parents = [parent]
  210. else:
  211. commit.parents = []
  212. self.repo.object_store.add_object(commit)
  213. # Update HEAD
  214. self.repo.refs[b"HEAD"] = commit.id
  215. return commit.id
  216. def test_porcelain_annotate(self) -> None:
  217. """Test the porcelain annotate function."""
  218. # Create commits
  219. commit1_id = self._make_commit_with_file(
  220. "file.txt", b"line1\nline2\n", "Initial commit"
  221. )
  222. self._make_commit_with_file(
  223. "file.txt", b"line1\nline2\nline3\n", "Add third line", parent=commit1_id
  224. )
  225. # Test annotate
  226. result = list(annotate(self.temp_dir, "file.txt"))
  227. self.assertEqual(3, len(result))
  228. # Check that each result has the right structure
  229. for (commit, entry), line in result:
  230. self.assertIsNotNone(commit)
  231. self.assertIsNotNone(entry)
  232. self.assertIn(line, [b"line1\n", b"line2\n", b"line3\n"])
  233. def test_porcelain_annotate_with_committish(self) -> None:
  234. """Test porcelain annotate with specific commit."""
  235. # Create commits
  236. commit1_id = self._make_commit_with_file(
  237. "file.txt", b"original\n", "Initial commit"
  238. )
  239. self._make_commit_with_file(
  240. "file.txt", b"modified\n", "Modify file", parent=commit1_id
  241. )
  242. # Annotate at first commit
  243. result = list(
  244. annotate(self.temp_dir, "file.txt", committish=commit1_id.decode())
  245. )
  246. self.assertEqual(1, len(result))
  247. self.assertEqual(b"original\n", result[0][1])
  248. # Annotate at HEAD (second commit)
  249. result = list(annotate(self.temp_dir, "file.txt"))
  250. self.assertEqual(1, len(result))
  251. self.assertEqual(b"modified\n", result[0][1])
  252. def test_blame_alias(self) -> None:
  253. """Test that blame is an alias for annotate."""
  254. self.assertIs(blame, annotate)
  255. class IntegrationTestCase(TestCase):
  256. """Integration tests with more complex scenarios."""
  257. def setUp(self) -> None:
  258. self.temp_dir = tempfile.mkdtemp()
  259. self.repo = Repo.init(self.temp_dir)
  260. def tearDown(self) -> None:
  261. self.repo.close()
  262. import shutil
  263. shutil.rmtree(self.temp_dir)
  264. def _create_file_commit(
  265. self,
  266. filename: str,
  267. content: bytes,
  268. message: str,
  269. parent: Optional[bytes] = None,
  270. ) -> bytes:
  271. """Helper to create a commit with file content."""
  272. # Write file to working directory
  273. filepath = os.path.join(self.temp_dir, filename)
  274. with open(filepath, "wb") as f:
  275. f.write(content)
  276. # Stage file
  277. self.repo.get_worktree().stage([filename.encode()])
  278. # Create commit
  279. commit_id = self.repo.get_worktree().commit(
  280. message=message.encode(),
  281. committer=b"Test Committer <test@example.com>",
  282. author=b"Test Author <test@example.com>",
  283. commit_timestamp=1000000000,
  284. commit_timezone=0,
  285. author_timestamp=1000000000,
  286. author_timezone=0,
  287. )
  288. return commit_id
  289. def test_complex_file_history(self) -> None:
  290. """Test annotating a file with complex history."""
  291. # Initial commit with 3 lines
  292. self._create_file_commit(
  293. "complex.txt", b"First line\nSecond line\nThird line\n", "Initial commit"
  294. )
  295. # Add lines at the beginning and end
  296. self._create_file_commit(
  297. "complex.txt",
  298. b"New first line\nFirst line\nSecond line\nThird line\nNew last line\n",
  299. "Add lines at beginning and end",
  300. )
  301. # Modify middle line
  302. self._create_file_commit(
  303. "complex.txt",
  304. b"New first line\nFirst line\n"
  305. b"Modified second line\nThird line\n"
  306. b"New last line\n",
  307. "Modify middle line",
  308. )
  309. # Delete a line
  310. self._create_file_commit(
  311. "complex.txt",
  312. b"New first line\nFirst line\nModified second line\nNew last line\n",
  313. "Delete third line",
  314. )
  315. # Run annotate
  316. result = list(annotate(self.temp_dir, "complex.txt"))
  317. # Should have 4 lines
  318. self.assertEqual(4, len(result))
  319. # Verify each line comes from the correct commit
  320. lines = [line for (commit, entry), line in result]
  321. self.assertEqual(
  322. [
  323. b"New first line\n",
  324. b"First line\n",
  325. b"Modified second line\n",
  326. b"New last line\n",
  327. ],
  328. lines,
  329. )
  330. if __name__ == "__main__":
  331. unittest.main()