test_annotate.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. # test_annotate.py -- tests for annotate
  2. # Copyright (C) 2015 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # This program is free software; you can redistribute it and/or
  5. # modify it under the terms of the GNU General Public License
  6. # as published by the Free Software Foundation; version 2
  7. # of the License or (at your option) a later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU General Public License
  15. # along with this program; if not, write to the Free Software
  16. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  17. # MA 02110-1301, USA.
  18. """Tests for annotate support."""
  19. import os
  20. import tempfile
  21. import unittest
  22. from unittest import TestCase
  23. from dulwich.annotate import annotate_lines, update_lines
  24. from dulwich.objects import Blob, Commit, Tree
  25. from dulwich.porcelain import annotate, blame
  26. from dulwich.repo import Repo
  27. class UpdateLinesTestCase(TestCase):
  28. """Tests for update_lines function."""
  29. def test_update_lines_equal(self):
  30. """Test update_lines when all lines are equal."""
  31. old_lines = [
  32. (("commit1", "entry1"), b"line1"),
  33. (("commit2", "entry2"), b"line2"),
  34. ]
  35. new_blob = b"line1\nline2"
  36. new_history_data = ("commit3", "entry3")
  37. result = update_lines(old_lines, new_history_data, new_blob)
  38. self.assertEqual(old_lines, result)
  39. def test_update_lines_insert(self):
  40. """Test update_lines when new lines are inserted."""
  41. old_lines = [
  42. (("commit1", "entry1"), b"line1"),
  43. (("commit2", "entry2"), b"line3"),
  44. ]
  45. new_blob = b"line1\nline2\nline3"
  46. new_history_data = ("commit3", "entry3")
  47. result = update_lines(old_lines, new_history_data, new_blob)
  48. expected = [
  49. (("commit1", "entry1"), b"line1"),
  50. (("commit3", "entry3"), b"line2"),
  51. (("commit2", "entry2"), b"line3"),
  52. ]
  53. self.assertEqual(expected, result)
  54. def test_update_lines_delete(self):
  55. """Test update_lines when lines are deleted."""
  56. old_lines = [
  57. (("commit1", "entry1"), b"line1"),
  58. (("commit2", "entry2"), b"line2"),
  59. (("commit3", "entry3"), b"line3"),
  60. ]
  61. new_blob = b"line1\nline3"
  62. new_history_data = ("commit4", "entry4")
  63. result = update_lines(old_lines, new_history_data, new_blob)
  64. expected = [
  65. (("commit1", "entry1"), b"line1"),
  66. (("commit3", "entry3"), b"line3"),
  67. ]
  68. self.assertEqual(expected, result)
  69. def test_update_lines_replace(self):
  70. """Test update_lines when lines are replaced."""
  71. old_lines = [
  72. (("commit1", "entry1"), b"line1"),
  73. (("commit2", "entry2"), b"line2"),
  74. ]
  75. new_blob = b"line1\nline2_modified"
  76. new_history_data = ("commit3", "entry3")
  77. result = update_lines(old_lines, new_history_data, new_blob)
  78. expected = [
  79. (("commit1", "entry1"), b"line1"),
  80. (("commit3", "entry3"), b"line2_modified"),
  81. ]
  82. self.assertEqual(expected, result)
  83. def test_update_lines_empty_old(self):
  84. """Test update_lines with empty old lines."""
  85. old_lines = []
  86. new_blob = b"line1\nline2"
  87. new_history_data = ("commit1", "entry1")
  88. result = update_lines(old_lines, new_history_data, new_blob)
  89. expected = [
  90. (("commit1", "entry1"), b"line1"),
  91. (("commit1", "entry1"), b"line2"),
  92. ]
  93. self.assertEqual(expected, result)
  94. def test_update_lines_empty_new(self):
  95. """Test update_lines with empty new blob."""
  96. old_lines = [(("commit1", "entry1"), b"line1")]
  97. new_blob = b""
  98. new_history_data = ("commit2", "entry2")
  99. result = update_lines(old_lines, new_history_data, new_blob)
  100. self.assertEqual([], result)
  101. class AnnotateLinesTestCase(TestCase):
  102. """Tests for annotate_lines function."""
  103. def setUp(self):
  104. self.temp_dir = tempfile.mkdtemp()
  105. self.repo = Repo.init(self.temp_dir)
  106. def tearDown(self):
  107. self.repo.close()
  108. import shutil
  109. shutil.rmtree(self.temp_dir)
  110. def _make_commit(self, blob_content, message, parent=None):
  111. """Helper to create a commit with a single file."""
  112. # Create blob
  113. blob = Blob()
  114. blob.data = blob_content
  115. self.repo.object_store.add_object(blob)
  116. # Create tree
  117. tree = Tree()
  118. tree.add(b"test.txt", 0o100644, blob.id)
  119. self.repo.object_store.add_object(tree)
  120. # Create commit
  121. commit = Commit()
  122. commit.tree = tree.id
  123. commit.author = commit.committer = b"Test Author <test@example.com>"
  124. commit.author_time = commit.commit_time = 1000000000
  125. commit.author_timezone = commit.commit_timezone = 0
  126. commit.encoding = b"UTF-8"
  127. commit.message = message.encode("utf-8")
  128. if parent:
  129. commit.parents = [parent]
  130. else:
  131. commit.parents = []
  132. self.repo.object_store.add_object(commit)
  133. return commit.id
  134. def test_annotate_lines_single_commit(self):
  135. """Test annotating a file with a single commit."""
  136. commit_id = self._make_commit(b"line1\nline2\nline3\n", "Initial commit")
  137. result = annotate_lines(self.repo.object_store, commit_id, b"test.txt")
  138. self.assertEqual(3, len(result))
  139. for (commit, entry), line in result:
  140. self.assertEqual(commit_id, commit.id)
  141. self.assertIn(line, [b"line1\n", b"line2\n", b"line3\n"])
  142. def test_annotate_lines_multiple_commits(self):
  143. """Test annotating a file with multiple commits."""
  144. # First commit
  145. commit1_id = self._make_commit(b"line1\nline2\n", "Initial commit")
  146. # Second commit - add a line
  147. commit2_id = self._make_commit(
  148. b"line1\nline1.5\nline2\n", "Add line between", parent=commit1_id
  149. )
  150. # Third commit - modify a line
  151. commit3_id = self._make_commit(
  152. b"line1_modified\nline1.5\nline2\n", "Modify first line", parent=commit2_id
  153. )
  154. result = annotate_lines(self.repo.object_store, commit3_id, b"test.txt")
  155. self.assertEqual(3, len(result))
  156. # First line should be from commit3
  157. self.assertEqual(commit3_id, result[0][0][0].id)
  158. self.assertEqual(b"line1_modified\n", result[0][1])
  159. # Second line should be from commit2
  160. self.assertEqual(commit2_id, result[1][0][0].id)
  161. self.assertEqual(b"line1.5\n", result[1][1])
  162. # Third line should be from commit1
  163. self.assertEqual(commit1_id, result[2][0][0].id)
  164. self.assertEqual(b"line2\n", result[2][1])
  165. def test_annotate_lines_nonexistent_path(self):
  166. """Test annotating a nonexistent file."""
  167. commit_id = self._make_commit(b"content\n", "Initial commit")
  168. result = annotate_lines(self.repo.object_store, commit_id, b"nonexistent.txt")
  169. self.assertEqual([], result)
  170. class PorcelainAnnotateTestCase(TestCase):
  171. """Tests for the porcelain annotate function."""
  172. def setUp(self):
  173. self.temp_dir = tempfile.mkdtemp()
  174. self.repo = Repo.init(self.temp_dir)
  175. def tearDown(self):
  176. self.repo.close()
  177. import shutil
  178. shutil.rmtree(self.temp_dir)
  179. def _make_commit_with_file(self, filename, content, message, parent=None):
  180. """Helper to create a commit with a file."""
  181. # Create blob
  182. blob = Blob()
  183. blob.data = content
  184. self.repo.object_store.add_object(blob)
  185. # Create tree
  186. tree = Tree()
  187. tree.add(filename.encode(), 0o100644, blob.id)
  188. self.repo.object_store.add_object(tree)
  189. # Create commit
  190. commit = Commit()
  191. commit.tree = tree.id
  192. commit.author = commit.committer = b"Test Author <test@example.com>"
  193. commit.author_time = commit.commit_time = 1000000000
  194. commit.author_timezone = commit.commit_timezone = 0
  195. commit.encoding = b"UTF-8"
  196. commit.message = message.encode("utf-8")
  197. if parent:
  198. commit.parents = [parent]
  199. else:
  200. commit.parents = []
  201. self.repo.object_store.add_object(commit)
  202. # Update HEAD
  203. self.repo.refs[b"HEAD"] = commit.id
  204. return commit.id
  205. def test_porcelain_annotate(self):
  206. """Test the porcelain annotate function."""
  207. # Create commits
  208. commit1_id = self._make_commit_with_file(
  209. "file.txt", b"line1\nline2\n", "Initial commit"
  210. )
  211. self._make_commit_with_file(
  212. "file.txt", b"line1\nline2\nline3\n", "Add third line", parent=commit1_id
  213. )
  214. # Test annotate
  215. result = list(annotate(self.temp_dir, "file.txt"))
  216. self.assertEqual(3, len(result))
  217. # Check that each result has the right structure
  218. for (commit, entry), line in result:
  219. self.assertIsNotNone(commit)
  220. self.assertIsNotNone(entry)
  221. self.assertIn(line, [b"line1\n", b"line2\n", b"line3\n"])
  222. def test_porcelain_annotate_with_committish(self):
  223. """Test porcelain annotate with specific commit."""
  224. # Create commits
  225. commit1_id = self._make_commit_with_file(
  226. "file.txt", b"original\n", "Initial commit"
  227. )
  228. self._make_commit_with_file(
  229. "file.txt", b"modified\n", "Modify file", parent=commit1_id
  230. )
  231. # Annotate at first commit
  232. result = list(
  233. annotate(self.temp_dir, "file.txt", committish=commit1_id.decode())
  234. )
  235. self.assertEqual(1, len(result))
  236. self.assertEqual(b"original\n", result[0][1])
  237. # Annotate at HEAD (second commit)
  238. result = list(annotate(self.temp_dir, "file.txt"))
  239. self.assertEqual(1, len(result))
  240. self.assertEqual(b"modified\n", result[0][1])
  241. def test_blame_alias(self):
  242. """Test that blame is an alias for annotate."""
  243. self.assertIs(blame, annotate)
  244. class IntegrationTestCase(TestCase):
  245. """Integration tests with more complex scenarios."""
  246. def setUp(self):
  247. self.temp_dir = tempfile.mkdtemp()
  248. self.repo = Repo.init(self.temp_dir)
  249. def tearDown(self):
  250. self.repo.close()
  251. import shutil
  252. shutil.rmtree(self.temp_dir)
  253. def _create_file_commit(self, filename, content, message, parent=None):
  254. """Helper to create a commit with file content."""
  255. # Write file to working directory
  256. filepath = os.path.join(self.temp_dir, filename)
  257. with open(filepath, "wb") as f:
  258. f.write(content)
  259. # Stage file
  260. self.repo.stage([filename.encode()])
  261. # Create commit
  262. commit_id = self.repo.do_commit(
  263. message.encode(),
  264. committer=b"Test Committer <test@example.com>",
  265. author=b"Test Author <test@example.com>",
  266. commit_timestamp=1000000000,
  267. commit_timezone=0,
  268. author_timestamp=1000000000,
  269. author_timezone=0,
  270. )
  271. return commit_id
  272. def test_complex_file_history(self):
  273. """Test annotating a file with complex history."""
  274. # Initial commit with 3 lines
  275. self._create_file_commit(
  276. "complex.txt", b"First line\nSecond line\nThird line\n", "Initial commit"
  277. )
  278. # Add lines at the beginning and end
  279. self._create_file_commit(
  280. "complex.txt",
  281. b"New first line\nFirst line\nSecond line\nThird line\nNew last line\n",
  282. "Add lines at beginning and end",
  283. )
  284. # Modify middle line
  285. self._create_file_commit(
  286. "complex.txt",
  287. b"New first line\nFirst line\n"
  288. b"Modified second line\nThird line\n"
  289. b"New last line\n",
  290. "Modify middle line",
  291. )
  292. # Delete a line
  293. self._create_file_commit(
  294. "complex.txt",
  295. b"New first line\nFirst line\nModified second line\nNew last line\n",
  296. "Delete third line",
  297. )
  298. # Run annotate
  299. result = list(annotate(self.temp_dir, "complex.txt"))
  300. # Should have 4 lines
  301. self.assertEqual(4, len(result))
  302. # Verify each line comes from the correct commit
  303. lines = [line for (commit, entry), line in result]
  304. self.assertEqual(
  305. [
  306. b"New first line\n",
  307. b"First line\n",
  308. b"Modified second line\n",
  309. b"New last line\n",
  310. ],
  311. lines,
  312. )
  313. if __name__ == "__main__":
  314. unittest.main()