test_objectspec.py 20 KB


  1. # test_objectspec.py -- tests for objectspec.py
  2. # Copyright (C) 2014 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for revision spec parsing."""
  22. # TODO: Round-trip parse-serialize-parse and serialize-parse-serialize tests.
  23. from dulwich.objects import Blob, Commit, Tag, Tree
  24. from dulwich.objectspec import (
  25. parse_commit,
  26. parse_commit_range,
  27. parse_object,
  28. parse_ref,
  29. parse_refs,
  30. parse_reftuple,
  31. parse_reftuples,
  32. parse_tree,
  33. )
  34. from dulwich.repo import MemoryRepo
  35. from dulwich.tests.utils import build_commit_graph
  36. from . import TestCase
  37. class ParseObjectTests(TestCase):
  38. """Test parse_object."""
  39. def test_nonexistent(self) -> None:
  40. r = MemoryRepo()
  41. self.assertRaises(KeyError, parse_object, r, "thisdoesnotexist")
  42. def test_blob_by_sha(self) -> None:
  43. r = MemoryRepo()
  44. b = Blob.from_string(b"Blah")
  45. r.object_store.add_object(b)
  46. self.assertEqual(b, parse_object(r, b.id))
  47. def test_parent_caret(self) -> None:
  48. r = MemoryRepo()
  49. c1, c2, c3 = build_commit_graph(r.object_store, [[1], [2, 1], [3, 1, 2]])
  50. # c3's parents are [c1, c2]
  51. self.assertEqual(c1, parse_object(r, c3.id + b"^1"))
  52. self.assertEqual(c1, parse_object(r, c3.id + b"^")) # ^ defaults to ^1
  53. self.assertEqual(c2, parse_object(r, c3.id + b"^2"))
  54. def test_parent_tilde(self) -> None:
  55. r = MemoryRepo()
  56. c1, c2, c3 = build_commit_graph(r.object_store, [[1], [2, 1], [3, 2]])
  57. self.assertEqual(c2, parse_object(r, c3.id + b"~"))
  58. self.assertEqual(c2, parse_object(r, c3.id + b"~1"))
  59. self.assertEqual(c1, parse_object(r, c3.id + b"~2"))
  60. def test_combined_operators(self) -> None:
  61. r = MemoryRepo()
  62. c1, c2, c3, c4 = build_commit_graph(
  63. r.object_store, [[1], [2, 1], [3, 1, 2], [4, 3]]
  64. )
  65. # c4~1^2 means: go back 1 generation from c4 (to c3), then take its 2nd parent
  66. # c3's parents are [c1, c2], so ^2 is c2
  67. self.assertEqual(c2, parse_object(r, c4.id + b"~1^2"))
  68. self.assertEqual(c1, parse_object(r, c4.id + b"~^"))
  69. def test_with_ref(self) -> None:
  70. r = MemoryRepo()
  71. c1, c2, c3 = build_commit_graph(r.object_store, [[1], [2, 1], [3, 2]])
  72. r.refs[b"refs/heads/master"] = c3.id
  73. self.assertEqual(c2, parse_object(r, b"master~"))
  74. self.assertEqual(c1, parse_object(r, b"master~2"))
  75. def test_caret_zero(self) -> None:
  76. r = MemoryRepo()
  77. c1, c2 = build_commit_graph(r.object_store, [[1], [2, 1]])
  78. # ^0 means the commit itself
  79. self.assertEqual(c2, parse_object(r, c2.id + b"^0"))
  80. self.assertEqual(c1, parse_object(r, c2.id + b"~^0"))
  81. def test_missing_parent(self) -> None:
  82. r = MemoryRepo()
  83. c1, c2 = build_commit_graph(r.object_store, [[1], [2, 1]])
  84. # c2 only has 1 parent, so ^2 should fail
  85. self.assertRaises(ValueError, parse_object, r, c2.id + b"^2")
  86. # c1 has no parents, so ~ should fail
  87. self.assertRaises(ValueError, parse_object, r, c1.id + b"~")
  88. def test_empty_base(self) -> None:
  89. r = MemoryRepo()
  90. self.assertRaises(ValueError, parse_object, r, b"~1")
  91. self.assertRaises(ValueError, parse_object, r, b"^1")
  92. def test_non_commit_with_operators(self) -> None:
  93. r = MemoryRepo()
  94. b = Blob.from_string(b"Blah")
  95. r.object_store.add_object(b)
  96. # Can't apply ~ or ^ to a blob
  97. self.assertRaises(ValueError, parse_object, r, b.id + b"~1")
  98. def test_tag_dereference(self) -> None:
  99. r = MemoryRepo()
  100. [c1] = build_commit_graph(r.object_store, [[1]])
  101. # Create an annotated tag
  102. tag = Tag()
  103. tag.name = b"v1.0"
  104. tag.message = b"Test tag"
  105. tag.tag_time = 1234567890
  106. tag.tag_timezone = 0
  107. tag.object = (Commit, c1.id)
  108. tag.tagger = b"Test Tagger <test@example.com>"
  109. r.object_store.add_object(tag)
  110. # ^{} dereferences the tag
  111. self.assertEqual(c1, parse_object(r, tag.id + b"^{}"))
  112. def test_nested_tag_dereference(self) -> None:
  113. r = MemoryRepo()
  114. [c1] = build_commit_graph(r.object_store, [[1]])
  115. # Create a tag pointing to a commit
  116. tag1 = Tag()
  117. tag1.name = b"v1.0"
  118. tag1.message = b"Test tag"
  119. tag1.tag_time = 1234567890
  120. tag1.tag_timezone = 0
  121. tag1.object = (Commit, c1.id)
  122. tag1.tagger = b"Test Tagger <test@example.com>"
  123. r.object_store.add_object(tag1)
  124. # Create another tag pointing to the first tag
  125. tag2 = Tag()
  126. tag2.name = b"v1.0-release"
  127. tag2.message = b"Release tag"
  128. tag2.tag_time = 1234567900
  129. tag2.tag_timezone = 0
  130. tag2.object = (Tag, tag1.id)
  131. tag2.tagger = b"Test Tagger <test@example.com>"
  132. r.object_store.add_object(tag2)
  133. # ^{} should recursively dereference to the commit
  134. self.assertEqual(c1, parse_object(r, tag2.id + b"^{}"))
  135. def test_path_in_tree(self) -> None:
  136. r = MemoryRepo()
  137. # Create a blob
  138. b = Blob.from_string(b"Test content")
  139. # Create a commit with the blob in its tree
  140. [c1] = build_commit_graph(r.object_store, [[1]], trees={1: [(b"test.txt", b)]})
  141. # HEAD:test.txt should return the blob
  142. r.refs[b"HEAD"] = c1.id
  143. result = parse_object(r, b"HEAD:test.txt")
  144. self.assertEqual(b"Test content", result.data)
  145. def test_path_in_tree_nested(self) -> None:
  146. r = MemoryRepo()
  147. # Create blobs
  148. b1 = Blob.from_string(b"Content 1")
  149. b2 = Blob.from_string(b"Content 2")
  150. # For nested trees, we need to create them manually
  151. # Create subtree
  152. subtree = Tree()
  153. subtree.add(b"file.txt", 0o100644, b1.id)
  154. r.object_store.add_object(b1)
  155. r.object_store.add_object(subtree)
  156. # Create main tree
  157. main_tree = Tree()
  158. main_tree.add(b"README", 0o100644, b2.id)
  159. main_tree.add(b"subdir", 0o040000, subtree.id)
  160. r.object_store.add_object(b2)
  161. r.object_store.add_object(main_tree)
  162. # Create commit with our tree
  163. c = Commit()
  164. c.tree = main_tree.id
  165. c.author = c.committer = b"Test User <test@example.com>"
  166. c.author_time = c.commit_time = 1234567890
  167. c.author_timezone = c.commit_timezone = 0
  168. c.message = b"Test commit"
  169. r.object_store.add_object(c)
  170. # Lookup nested path
  171. result = parse_object(r, c.id + b":subdir/file.txt")
  172. self.assertEqual(b"Content 1", result.data)
  173. def test_reflog_lookup(self) -> None:
  174. # Use a real repo for reflog testing
  175. import tempfile
  176. from dulwich.repo import Repo
  177. with tempfile.TemporaryDirectory() as tmpdir:
  178. r = Repo.init_bare(tmpdir)
  179. c1, c2, c3 = build_commit_graph(r.object_store, [[1], [2, 1], [3, 2]])
  180. # Write reflog entries using the repo's _write_reflog method
  181. # These are written in chronological order (oldest first)
  182. r._write_reflog(
  183. b"HEAD",
  184. None,
  185. c1.id,
  186. b"Test User <test@example.com>",
  187. 1234567890,
  188. 0,
  189. b"commit: Initial commit",
  190. )
  191. r._write_reflog(
  192. b"HEAD",
  193. c1.id,
  194. c2.id,
  195. b"Test User <test@example.com>",
  196. 1234567891,
  197. 0,
  198. b"commit: Second commit",
  199. )
  200. r._write_reflog(
  201. b"HEAD",
  202. c2.id,
  203. c3.id,
  204. b"Test User <test@example.com>",
  205. 1234567892,
  206. 0,
  207. b"commit: Third commit",
  208. )
  209. # HEAD@{0} is the most recent (c3)
  210. self.assertEqual(c3, parse_object(r, b"HEAD@{0}"))
  211. # HEAD@{1} is the second most recent (c2)
  212. self.assertEqual(c2, parse_object(r, b"HEAD@{1}"))
  213. # HEAD@{2} is the third/oldest (c1)
  214. self.assertEqual(c1, parse_object(r, b"HEAD@{2}"))
  215. class ParseCommitRangeTests(TestCase):
  216. """Test parse_commit_range."""
  217. def test_nonexistent(self) -> None:
  218. r = MemoryRepo()
  219. self.assertRaises(KeyError, parse_commit_range, r, "thisdoesnotexist..HEAD")
  220. def test_commit_by_sha(self) -> None:
  221. r = MemoryRepo()
  222. c1, c2, c3 = build_commit_graph(r.object_store, [[1], [2, 1], [3, 1, 2]])
  223. self.assertIsNone(parse_commit_range(r, c1.id))
  224. def test_commit_range(self) -> None:
  225. r = MemoryRepo()
  226. c1, c2, c3 = build_commit_graph(r.object_store, [[1], [2, 1], [3, 1, 2]])
  227. result = parse_commit_range(r, f"{c1.id.decode()}..{c2.id.decode()}")
  228. self.assertIsNotNone(result)
  229. start_commit, end_commit = result
  230. self.assertEqual(c1, start_commit)
  231. self.assertEqual(c2, end_commit)
  232. class ParseCommitTests(TestCase):
  233. """Test parse_commit."""
  234. def test_nonexistent(self) -> None:
  235. r = MemoryRepo()
  236. self.assertRaises(KeyError, parse_commit, r, "thisdoesnotexist")
  237. def test_commit_by_sha(self) -> None:
  238. r = MemoryRepo()
  239. [c1] = build_commit_graph(r.object_store, [[1]])
  240. self.assertEqual(c1, parse_commit(r, c1.id))
  241. def test_commit_by_short_sha(self) -> None:
  242. r = MemoryRepo()
  243. [c1] = build_commit_graph(r.object_store, [[1]])
  244. self.assertEqual(c1, parse_commit(r, c1.id[:10]))
  245. def test_annotated_tag(self) -> None:
  246. r = MemoryRepo()
  247. [c1] = build_commit_graph(r.object_store, [[1]])
  248. # Create an annotated tag pointing to the commit
  249. tag = Tag()
  250. tag.name = b"v1.0"
  251. tag.message = b"Test tag"
  252. tag.tag_time = 1234567890
  253. tag.tag_timezone = 0
  254. tag.object = (Commit, c1.id)
  255. tag.tagger = b"Test Tagger <test@example.com>"
  256. r.object_store.add_object(tag)
  257. # parse_commit should follow the tag to the commit
  258. self.assertEqual(c1, parse_commit(r, tag.id))
  259. def test_nested_tags(self) -> None:
  260. r = MemoryRepo()
  261. [c1] = build_commit_graph(r.object_store, [[1]])
  262. # Create an annotated tag pointing to the commit
  263. tag1 = Tag()
  264. tag1.name = b"v1.0"
  265. tag1.message = b"Test tag"
  266. tag1.tag_time = 1234567890
  267. tag1.tag_timezone = 0
  268. tag1.object = (Commit, c1.id)
  269. tag1.tagger = b"Test Tagger <test@example.com>"
  270. r.object_store.add_object(tag1)
  271. # Create another tag pointing to the first tag
  272. tag2 = Tag()
  273. tag2.name = b"v1.0-release"
  274. tag2.message = b"Release tag"
  275. tag2.tag_time = 1234567900
  276. tag2.tag_timezone = 0
  277. tag2.object = (Tag, tag1.id)
  278. tag2.tagger = b"Test Tagger <test@example.com>"
  279. r.object_store.add_object(tag2)
  280. # parse_commit should follow both tags to the commit
  281. self.assertEqual(c1, parse_commit(r, tag2.id))
  282. def test_tag_to_missing_commit(self) -> None:
  283. r = MemoryRepo()
  284. # Create a tag pointing to a non-existent commit
  285. missing_sha = b"1234567890123456789012345678901234567890"
  286. tag = Tag()
  287. tag.name = b"v1.0"
  288. tag.message = b"Test tag"
  289. tag.tag_time = 1234567890
  290. tag.tag_timezone = 0
  291. tag.object = (Commit, missing_sha)
  292. tag.tagger = b"Test Tagger <test@example.com>"
  293. r.object_store.add_object(tag)
  294. # Should raise KeyError for missing commit
  295. self.assertRaises(KeyError, parse_commit, r, tag.id)
  296. def test_tag_to_blob(self) -> None:
  297. r = MemoryRepo()
  298. # Create a blob
  299. blob = Blob.from_string(b"Test content")
  300. r.object_store.add_object(blob)
  301. # Create a tag pointing to the blob
  302. tag = Tag()
  303. tag.name = b"blob-tag"
  304. tag.message = b"Tag pointing to blob"
  305. tag.tag_time = 1234567890
  306. tag.tag_timezone = 0
  307. tag.object = (Blob, blob.id)
  308. tag.tagger = b"Test Tagger <test@example.com>"
  309. r.object_store.add_object(tag)
  310. # Should raise ValueError as it's not a commit
  311. self.assertRaises(ValueError, parse_commit, r, tag.id)
  312. def test_commit_object(self) -> None:
  313. r = MemoryRepo()
  314. [c1] = build_commit_graph(r.object_store, [[1]])
  315. # Test that passing a Commit object directly returns the same object
  316. self.assertEqual(c1, parse_commit(r, c1))
  317. class ParseRefTests(TestCase):
  318. def test_nonexistent(self) -> None:
  319. r = {}
  320. self.assertRaises(KeyError, parse_ref, r, b"thisdoesnotexist")
  321. def test_ambiguous_ref(self) -> None:
  322. r = {
  323. b"ambig1": "bla",
  324. b"refs/ambig1": "bla",
  325. b"refs/tags/ambig1": "bla",
  326. b"refs/heads/ambig1": "bla",
  327. b"refs/remotes/ambig1": "bla",
  328. b"refs/remotes/ambig1/HEAD": "bla",
  329. }
  330. self.assertEqual(b"ambig1", parse_ref(r, b"ambig1"))
  331. def test_ambiguous_ref2(self) -> None:
  332. r = {
  333. b"refs/ambig2": "bla",
  334. b"refs/tags/ambig2": "bla",
  335. b"refs/heads/ambig2": "bla",
  336. b"refs/remotes/ambig2": "bla",
  337. b"refs/remotes/ambig2/HEAD": "bla",
  338. }
  339. self.assertEqual(b"refs/ambig2", parse_ref(r, b"ambig2"))
  340. def test_ambiguous_tag(self) -> None:
  341. r = {
  342. b"refs/tags/ambig3": "bla",
  343. b"refs/heads/ambig3": "bla",
  344. b"refs/remotes/ambig3": "bla",
  345. b"refs/remotes/ambig3/HEAD": "bla",
  346. }
  347. self.assertEqual(b"refs/tags/ambig3", parse_ref(r, b"ambig3"))
  348. def test_ambiguous_head(self) -> None:
  349. r = {
  350. b"refs/heads/ambig4": "bla",
  351. b"refs/remotes/ambig4": "bla",
  352. b"refs/remotes/ambig4/HEAD": "bla",
  353. }
  354. self.assertEqual(b"refs/heads/ambig4", parse_ref(r, b"ambig4"))
  355. def test_ambiguous_remote(self) -> None:
  356. r = {b"refs/remotes/ambig5": "bla", b"refs/remotes/ambig5/HEAD": "bla"}
  357. self.assertEqual(b"refs/remotes/ambig5", parse_ref(r, b"ambig5"))
  358. def test_ambiguous_remote_head(self) -> None:
  359. r = {b"refs/remotes/ambig6/HEAD": "bla"}
  360. self.assertEqual(b"refs/remotes/ambig6/HEAD", parse_ref(r, b"ambig6"))
  361. def test_heads_full(self) -> None:
  362. r = {b"refs/heads/foo": "bla"}
  363. self.assertEqual(b"refs/heads/foo", parse_ref(r, b"refs/heads/foo"))
  364. def test_heads_partial(self) -> None:
  365. r = {b"refs/heads/foo": "bla"}
  366. self.assertEqual(b"refs/heads/foo", parse_ref(r, b"heads/foo"))
  367. def test_tags_partial(self) -> None:
  368. r = {b"refs/tags/foo": "bla"}
  369. self.assertEqual(b"refs/tags/foo", parse_ref(r, b"tags/foo"))
  370. class ParseRefsTests(TestCase):
  371. def test_nonexistent(self) -> None:
  372. r = {}
  373. self.assertRaises(KeyError, parse_refs, r, [b"thisdoesnotexist"])
  374. def test_head(self) -> None:
  375. r = {b"refs/heads/foo": "bla"}
  376. self.assertEqual([b"refs/heads/foo"], parse_refs(r, [b"foo"]))
  377. def test_full(self) -> None:
  378. r = {b"refs/heads/foo": "bla"}
  379. self.assertEqual([b"refs/heads/foo"], parse_refs(r, b"refs/heads/foo"))
  380. class ParseReftupleTests(TestCase):
  381. def test_nonexistent(self) -> None:
  382. r = {}
  383. self.assertRaises(KeyError, parse_reftuple, r, r, b"thisdoesnotexist")
  384. def test_head(self) -> None:
  385. r = {b"refs/heads/foo": "bla"}
  386. self.assertEqual(
  387. (b"refs/heads/foo", b"refs/heads/foo", False),
  388. parse_reftuple(r, r, b"foo"),
  389. )
  390. self.assertEqual(
  391. (b"refs/heads/foo", b"refs/heads/foo", True),
  392. parse_reftuple(r, r, b"+foo"),
  393. )
  394. self.assertEqual(
  395. (b"refs/heads/foo", b"refs/heads/foo", True),
  396. parse_reftuple(r, {}, b"+foo"),
  397. )
  398. self.assertEqual(
  399. (b"refs/heads/foo", b"refs/heads/foo", True),
  400. parse_reftuple(r, {}, b"foo", True),
  401. )
  402. def test_full(self) -> None:
  403. r = {b"refs/heads/foo": "bla"}
  404. self.assertEqual(
  405. (b"refs/heads/foo", b"refs/heads/foo", False),
  406. parse_reftuple(r, r, b"refs/heads/foo"),
  407. )
  408. def test_no_left_ref(self) -> None:
  409. r = {b"refs/heads/foo": "bla"}
  410. self.assertEqual(
  411. (None, b"refs/heads/foo", False),
  412. parse_reftuple(r, r, b":refs/heads/foo"),
  413. )
  414. def test_no_right_ref(self) -> None:
  415. r = {b"refs/heads/foo": "bla"}
  416. self.assertEqual(
  417. (b"refs/heads/foo", None, False),
  418. parse_reftuple(r, r, b"refs/heads/foo:"),
  419. )
  420. def test_default_with_string(self) -> None:
  421. r = {b"refs/heads/foo": "bla"}
  422. self.assertEqual(
  423. (b"refs/heads/foo", b"refs/heads/foo", False),
  424. parse_reftuple(r, r, "foo"),
  425. )
  426. class ParseReftuplesTests(TestCase):
  427. def test_nonexistent(self) -> None:
  428. r = {}
  429. self.assertRaises(KeyError, parse_reftuples, r, r, [b"thisdoesnotexist"])
  430. def test_head(self) -> None:
  431. r = {b"refs/heads/foo": "bla"}
  432. self.assertEqual(
  433. [(b"refs/heads/foo", b"refs/heads/foo", False)],
  434. parse_reftuples(r, r, [b"foo"]),
  435. )
  436. def test_full(self) -> None:
  437. r = {b"refs/heads/foo": "bla"}
  438. self.assertEqual(
  439. [(b"refs/heads/foo", b"refs/heads/foo", False)],
  440. parse_reftuples(r, r, b"refs/heads/foo"),
  441. )
  442. r = {b"refs/heads/foo": "bla"}
  443. self.assertEqual(
  444. [(b"refs/heads/foo", b"refs/heads/foo", True)],
  445. parse_reftuples(r, r, b"refs/heads/foo", True),
  446. )
  447. class ParseTreeTests(TestCase):
  448. """Test parse_tree."""
  449. def test_nonexistent(self) -> None:
  450. r = MemoryRepo()
  451. self.assertRaises(KeyError, parse_tree, r, "thisdoesnotexist")
  452. def test_from_commit(self) -> None:
  453. r = MemoryRepo()
  454. c1, c2, c3 = build_commit_graph(r.object_store, [[1], [2, 1], [3, 1, 2]])
  455. self.assertEqual(r[c1.tree], parse_tree(r, c1.id))
  456. self.assertEqual(r[c1.tree], parse_tree(r, c1.tree))
  457. def test_from_ref(self) -> None:
  458. r = MemoryRepo()
  459. c1, c2, c3 = build_commit_graph(r.object_store, [[1], [2, 1], [3, 1, 2]])
  460. r.refs[b"refs/heads/foo"] = c1.id
  461. self.assertEqual(r[c1.tree], parse_tree(r, b"foo"))
  462. def test_tree_object(self) -> None:
  463. r = MemoryRepo()
  464. [c1] = build_commit_graph(r.object_store, [[1]])
  465. tree = r[c1.tree]
  466. # Test that passing a Tree object directly returns the same object
  467. self.assertEqual(tree, parse_tree(r, tree))
  468. def test_commit_object(self) -> None:
  469. r = MemoryRepo()
  470. [c1] = build_commit_graph(r.object_store, [[1]])
  471. # Test that passing a Commit object returns its tree
  472. self.assertEqual(r[c1.tree], parse_tree(r, c1))
  473. def test_tag_object(self) -> None:
  474. r = MemoryRepo()
  475. [c1] = build_commit_graph(r.object_store, [[1]])
  476. # Create an annotated tag pointing to the commit
  477. tag = Tag()
  478. tag.name = b"v1.0"
  479. tag.message = b"Test tag"
  480. tag.tag_time = 1234567890
  481. tag.tag_timezone = 0
  482. tag.object = (Commit, c1.id)
  483. tag.tagger = b"Test Tagger <test@example.com>"
  484. r.object_store.add_object(tag)
  485. # parse_tree should follow the tag to the commit's tree
  486. self.assertEqual(r[c1.tree], parse_tree(r, tag))