test_annotate.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. # test_annotate.py -- tests for annotate
  2. # Copyright (C) 2015 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # This program is free software; you can redistribute it and/or
  5. # modify it under the terms of the GNU General Public License
  6. # as published by the Free Software Foundation; version 2
  7. # of the License or (at your option) a later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU General Public License
  15. # along with this program; if not, write to the Free Software
  16. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  17. # MA 02110-1301, USA.
  18. """Tests for annotate support."""
  19. import os
  20. import tempfile
  21. import unittest
  22. from typing import Any, Optional
  23. from unittest import TestCase
  24. from dulwich.annotate import annotate_lines, update_lines
  25. from dulwich.objects import Blob, Commit, Tree
  26. from dulwich.porcelain import annotate, blame
  27. from dulwich.repo import Repo
  28. class UpdateLinesTestCase(TestCase):
  29. """Tests for update_lines function."""
  30. def test_update_lines_equal(self) -> None:
  31. """Test update_lines when all lines are equal."""
  32. old_lines: list[tuple[tuple[Any, Any], bytes]] = [
  33. (("commit1", "entry1"), b"line1"),
  34. (("commit2", "entry2"), b"line2"),
  35. ]
  36. new_blob = b"line1\nline2"
  37. new_history_data = ("commit3", "entry3")
  38. result = update_lines(old_lines, new_history_data, new_blob) # type: ignore[arg-type]
  39. self.assertEqual(old_lines, result)
  40. def test_update_lines_insert(self) -> None:
  41. """Test update_lines when new lines are inserted."""
  42. old_lines: list[tuple[tuple[Any, Any], bytes]] = [
  43. (("commit1", "entry1"), b"line1"),
  44. (("commit2", "entry2"), b"line3"),
  45. ]
  46. new_blob = b"line1\nline2\nline3"
  47. new_history_data = ("commit3", "entry3")
  48. result = update_lines(old_lines, new_history_data, new_blob) # type: ignore[arg-type]
  49. expected = [
  50. (("commit1", "entry1"), b"line1"),
  51. (("commit3", "entry3"), b"line2"),
  52. (("commit2", "entry2"), b"line3"),
  53. ]
  54. self.assertEqual(expected, result)
  55. def test_update_lines_delete(self) -> None:
  56. """Test update_lines when lines are deleted."""
  57. old_lines: list[tuple[tuple[Any, Any], bytes]] = [
  58. (("commit1", "entry1"), b"line1"),
  59. (("commit2", "entry2"), b"line2"),
  60. (("commit3", "entry3"), b"line3"),
  61. ]
  62. new_blob = b"line1\nline3"
  63. new_history_data = ("commit4", "entry4")
  64. result = update_lines(old_lines, new_history_data, new_blob) # type: ignore[arg-type]
  65. expected = [
  66. (("commit1", "entry1"), b"line1"),
  67. (("commit3", "entry3"), b"line3"),
  68. ]
  69. self.assertEqual(expected, result)
  70. def test_update_lines_replace(self) -> None:
  71. """Test update_lines when lines are replaced."""
  72. old_lines: list[tuple[tuple[Any, Any], bytes]] = [
  73. (("commit1", "entry1"), b"line1"),
  74. (("commit2", "entry2"), b"line2"),
  75. ]
  76. new_blob = b"line1\nline2_modified"
  77. new_history_data = ("commit3", "entry3")
  78. result = update_lines(old_lines, new_history_data, new_blob) # type: ignore[arg-type]
  79. expected = [
  80. (("commit1", "entry1"), b"line1"),
  81. (("commit3", "entry3"), b"line2_modified"),
  82. ]
  83. self.assertEqual(expected, result)
  84. def test_update_lines_empty_old(self) -> None:
  85. """Test update_lines with empty old lines."""
  86. old_lines: list[tuple[tuple[Any, Any], bytes]] = []
  87. new_blob = b"line1\nline2"
  88. new_history_data = ("commit1", "entry1")
  89. result = update_lines(old_lines, new_history_data, new_blob) # type: ignore[arg-type]
  90. expected = [
  91. (("commit1", "entry1"), b"line1"),
  92. (("commit1", "entry1"), b"line2"),
  93. ]
  94. self.assertEqual(expected, result)
  95. def test_update_lines_empty_new(self) -> None:
  96. """Test update_lines with empty new blob."""
  97. old_lines: list[tuple[tuple[Any, Any], bytes]] = [(("commit1", "entry1"), b"line1")]
  98. new_blob = b""
  99. new_history_data = ("commit2", "entry2")
  100. result = update_lines(old_lines, new_history_data, new_blob) # type: ignore[arg-type]
  101. self.assertEqual([], result)
  102. class AnnotateLinesTestCase(TestCase):
  103. """Tests for annotate_lines function."""
  104. def setUp(self) -> None:
  105. self.temp_dir = tempfile.mkdtemp()
  106. self.repo = Repo.init(self.temp_dir)
  107. def tearDown(self) -> None:
  108. self.repo.close()
  109. import shutil
  110. shutil.rmtree(self.temp_dir)
  111. def _make_commit(self, blob_content: bytes, message: str, parent: Optional[bytes] = None) -> bytes:
  112. """Helper to create a commit with a single file."""
  113. # Create blob
  114. blob = Blob()
  115. blob.data = blob_content
  116. self.repo.object_store.add_object(blob)
  117. # Create tree
  118. tree = Tree()
  119. tree.add(b"test.txt", 0o100644, blob.id)
  120. self.repo.object_store.add_object(tree)
  121. # Create commit
  122. commit = Commit()
  123. commit.tree = tree.id
  124. commit.author = commit.committer = b"Test Author <test@example.com>"
  125. commit.author_time = commit.commit_time = 1000000000
  126. commit.author_timezone = commit.commit_timezone = 0
  127. commit.encoding = b"UTF-8"
  128. commit.message = message.encode("utf-8")
  129. if parent:
  130. commit.parents = [parent]
  131. else:
  132. commit.parents = []
  133. self.repo.object_store.add_object(commit)
  134. return commit.id
  135. def test_annotate_lines_single_commit(self) -> None:
  136. """Test annotating a file with a single commit."""
  137. commit_id = self._make_commit(b"line1\nline2\nline3\n", "Initial commit")
  138. result = annotate_lines(self.repo.object_store, commit_id, b"test.txt")
  139. self.assertEqual(3, len(result))
  140. for (commit, entry), line in result:
  141. self.assertEqual(commit_id, commit.id)
  142. self.assertIn(line, [b"line1\n", b"line2\n", b"line3\n"])
  143. def test_annotate_lines_multiple_commits(self) -> None:
  144. """Test annotating a file with multiple commits."""
  145. # First commit
  146. commit1_id = self._make_commit(b"line1\nline2\n", "Initial commit")
  147. # Second commit - add a line
  148. commit2_id = self._make_commit(
  149. b"line1\nline1.5\nline2\n", "Add line between", parent=commit1_id
  150. )
  151. # Third commit - modify a line
  152. commit3_id = self._make_commit(
  153. b"line1_modified\nline1.5\nline2\n", "Modify first line", parent=commit2_id
  154. )
  155. result = annotate_lines(self.repo.object_store, commit3_id, b"test.txt")
  156. self.assertEqual(3, len(result))
  157. # First line should be from commit3
  158. self.assertEqual(commit3_id, result[0][0][0].id)
  159. self.assertEqual(b"line1_modified\n", result[0][1])
  160. # Second line should be from commit2
  161. self.assertEqual(commit2_id, result[1][0][0].id)
  162. self.assertEqual(b"line1.5\n", result[1][1])
  163. # Third line should be from commit1
  164. self.assertEqual(commit1_id, result[2][0][0].id)
  165. self.assertEqual(b"line2\n", result[2][1])
  166. def test_annotate_lines_nonexistent_path(self) -> None:
  167. """Test annotating a nonexistent file."""
  168. commit_id = self._make_commit(b"content\n", "Initial commit")
  169. result = annotate_lines(self.repo.object_store, commit_id, b"nonexistent.txt")
  170. self.assertEqual([], result)
  171. class PorcelainAnnotateTestCase(TestCase):
  172. """Tests for the porcelain annotate function."""
  173. def setUp(self) -> None:
  174. self.temp_dir = tempfile.mkdtemp()
  175. self.repo = Repo.init(self.temp_dir)
  176. def tearDown(self) -> None:
  177. self.repo.close()
  178. import shutil
  179. shutil.rmtree(self.temp_dir)
  180. def _make_commit_with_file(self, filename: str, content: bytes, message: str, parent: Optional[bytes] = None) -> bytes:
  181. """Helper to create a commit with a file."""
  182. # Create blob
  183. blob = Blob()
  184. blob.data = content
  185. self.repo.object_store.add_object(blob)
  186. # Create tree
  187. tree = Tree()
  188. tree.add(filename.encode(), 0o100644, blob.id)
  189. self.repo.object_store.add_object(tree)
  190. # Create commit
  191. commit = Commit()
  192. commit.tree = tree.id
  193. commit.author = commit.committer = b"Test Author <test@example.com>"
  194. commit.author_time = commit.commit_time = 1000000000
  195. commit.author_timezone = commit.commit_timezone = 0
  196. commit.encoding = b"UTF-8"
  197. commit.message = message.encode("utf-8")
  198. if parent:
  199. commit.parents = [parent]
  200. else:
  201. commit.parents = []
  202. self.repo.object_store.add_object(commit)
  203. # Update HEAD
  204. self.repo.refs[b"HEAD"] = commit.id
  205. return commit.id
  206. def test_porcelain_annotate(self) -> None:
  207. """Test the porcelain annotate function."""
  208. # Create commits
  209. commit1_id = self._make_commit_with_file(
  210. "file.txt", b"line1\nline2\n", "Initial commit"
  211. )
  212. self._make_commit_with_file(
  213. "file.txt", b"line1\nline2\nline3\n", "Add third line", parent=commit1_id
  214. )
  215. # Test annotate
  216. result = list(annotate(self.temp_dir, "file.txt"))
  217. self.assertEqual(3, len(result))
  218. # Check that each result has the right structure
  219. for (commit, entry), line in result:
  220. self.assertIsNotNone(commit)
  221. self.assertIsNotNone(entry)
  222. self.assertIn(line, [b"line1\n", b"line2\n", b"line3\n"])
  223. def test_porcelain_annotate_with_committish(self) -> None:
  224. """Test porcelain annotate with specific commit."""
  225. # Create commits
  226. commit1_id = self._make_commit_with_file(
  227. "file.txt", b"original\n", "Initial commit"
  228. )
  229. self._make_commit_with_file(
  230. "file.txt", b"modified\n", "Modify file", parent=commit1_id
  231. )
  232. # Annotate at first commit
  233. result = list(
  234. annotate(self.temp_dir, "file.txt", committish=commit1_id.decode())
  235. )
  236. self.assertEqual(1, len(result))
  237. self.assertEqual(b"original\n", result[0][1])
  238. # Annotate at HEAD (second commit)
  239. result = list(annotate(self.temp_dir, "file.txt"))
  240. self.assertEqual(1, len(result))
  241. self.assertEqual(b"modified\n", result[0][1])
  242. def test_blame_alias(self) -> None:
  243. """Test that blame is an alias for annotate."""
  244. self.assertIs(blame, annotate)
  245. class IntegrationTestCase(TestCase):
  246. """Integration tests with more complex scenarios."""
  247. def setUp(self) -> None:
  248. self.temp_dir = tempfile.mkdtemp()
  249. self.repo = Repo.init(self.temp_dir)
  250. def tearDown(self) -> None:
  251. self.repo.close()
  252. import shutil
  253. shutil.rmtree(self.temp_dir)
  254. def _create_file_commit(self, filename: str, content: bytes, message: str, parent: Optional[bytes] = None) -> bytes:
  255. """Helper to create a commit with file content."""
  256. # Write file to working directory
  257. filepath = os.path.join(self.temp_dir, filename)
  258. with open(filepath, "wb") as f:
  259. f.write(content)
  260. # Stage file
  261. self.repo.get_worktree().stage([filename.encode()])
  262. # Create commit
  263. commit_id = self.repo.get_worktree().commit(
  264. message=message.encode(),
  265. committer=b"Test Committer <test@example.com>",
  266. author=b"Test Author <test@example.com>",
  267. commit_timestamp=1000000000,
  268. commit_timezone=0,
  269. author_timestamp=1000000000,
  270. author_timezone=0,
  271. )
  272. return commit_id
  273. def test_complex_file_history(self) -> None:
  274. """Test annotating a file with complex history."""
  275. # Initial commit with 3 lines
  276. self._create_file_commit(
  277. "complex.txt", b"First line\nSecond line\nThird line\n", "Initial commit"
  278. )
  279. # Add lines at the beginning and end
  280. self._create_file_commit(
  281. "complex.txt",
  282. b"New first line\nFirst line\nSecond line\nThird line\nNew last line\n",
  283. "Add lines at beginning and end",
  284. )
  285. # Modify middle line
  286. self._create_file_commit(
  287. "complex.txt",
  288. b"New first line\nFirst line\n"
  289. b"Modified second line\nThird line\n"
  290. b"New last line\n",
  291. "Modify middle line",
  292. )
  293. # Delete a line
  294. self._create_file_commit(
  295. "complex.txt",
  296. b"New first line\nFirst line\nModified second line\nNew last line\n",
  297. "Delete third line",
  298. )
  299. # Run annotate
  300. result = list(annotate(self.temp_dir, "complex.txt"))
  301. # Should have 4 lines
  302. self.assertEqual(4, len(result))
  303. # Verify each line comes from the correct commit
  304. lines = [line for (commit, entry), line in result]
  305. self.assertEqual(
  306. [
  307. b"New first line\n",
  308. b"First line\n",
  309. b"Modified second line\n",
  310. b"New last line\n",
  311. ],
  312. lines,
  313. )
  314. if __name__ == "__main__":
  315. unittest.main()