2
0

test_objectspec.py 20 KB


  1. # test_objectspec.py -- tests for objectspec.py
  2. # Copyright (C) 2014 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for revision spec parsing."""
  22. # TODO: Round-trip parse-serialize-parse and serialize-parse-serialize tests.
  23. from dulwich.objects import Blob, Commit, Tag, Tree
  24. from dulwich.objectspec import (
  25. parse_commit,
  26. parse_commit_range,
  27. parse_object,
  28. parse_ref,
  29. parse_refs,
  30. parse_reftuple,
  31. parse_reftuples,
  32. parse_tree,
  33. )
  34. from dulwich.repo import MemoryRepo
  35. from dulwich.tests.utils import build_commit_graph
  36. from . import TestCase
  37. class ParseObjectTests(TestCase):
  38. """Test parse_object."""
  39. def test_nonexistent(self) -> None:
  40. r = MemoryRepo()
  41. self.assertRaises(KeyError, parse_object, r, "thisdoesnotexist")
  42. def test_blob_by_sha(self) -> None:
  43. r = MemoryRepo()
  44. b = Blob.from_string(b"Blah")
  45. r.object_store.add_object(b)
  46. self.assertEqual(b, parse_object(r, b.id))
  47. def test_parent_caret(self) -> None:
  48. r = MemoryRepo()
  49. c1, c2, c3 = build_commit_graph(r.object_store, [[1], [2, 1], [3, 1, 2]])
  50. # c3's parents are [c1, c2]
  51. self.assertEqual(c1, parse_object(r, c3.id + b"^1"))
  52. self.assertEqual(c1, parse_object(r, c3.id + b"^")) # ^ defaults to ^1
  53. self.assertEqual(c2, parse_object(r, c3.id + b"^2"))
  54. def test_parent_tilde(self) -> None:
  55. r = MemoryRepo()
  56. c1, c2, c3 = build_commit_graph(r.object_store, [[1], [2, 1], [3, 2]])
  57. self.assertEqual(c2, parse_object(r, c3.id + b"~"))
  58. self.assertEqual(c2, parse_object(r, c3.id + b"~1"))
  59. self.assertEqual(c1, parse_object(r, c3.id + b"~2"))
  60. def test_combined_operators(self) -> None:
  61. r = MemoryRepo()
  62. c1, c2, c3, c4 = build_commit_graph(
  63. r.object_store, [[1], [2, 1], [3, 1, 2], [4, 3]]
  64. )
  65. # c4~1^2 means: go back 1 generation from c4 (to c3), then take its 2nd parent
  66. # c3's parents are [c1, c2], so ^2 is c2
  67. self.assertEqual(c2, parse_object(r, c4.id + b"~1^2"))
  68. self.assertEqual(c1, parse_object(r, c4.id + b"~^"))
  69. def test_with_ref(self) -> None:
  70. r = MemoryRepo()
  71. c1, c2, c3 = build_commit_graph(r.object_store, [[1], [2, 1], [3, 2]])
  72. r.refs[b"refs/heads/master"] = c3.id
  73. self.assertEqual(c2, parse_object(r, b"master~"))
  74. self.assertEqual(c1, parse_object(r, b"master~2"))
  75. def test_caret_zero(self) -> None:
  76. r = MemoryRepo()
  77. c1, c2 = build_commit_graph(r.object_store, [[1], [2, 1]])
  78. # ^0 means the commit itself
  79. self.assertEqual(c2, parse_object(r, c2.id + b"^0"))
  80. self.assertEqual(c1, parse_object(r, c2.id + b"~^0"))
  81. def test_missing_parent(self) -> None:
  82. r = MemoryRepo()
  83. c1, c2 = build_commit_graph(r.object_store, [[1], [2, 1]])
  84. # c2 only has 1 parent, so ^2 should fail
  85. self.assertRaises(ValueError, parse_object, r, c2.id + b"^2")
  86. # c1 has no parents, so ~ should fail
  87. self.assertRaises(ValueError, parse_object, r, c1.id + b"~")
  88. def test_empty_base(self) -> None:
  89. r = MemoryRepo()
  90. self.assertRaises(ValueError, parse_object, r, b"~1")
  91. self.assertRaises(ValueError, parse_object, r, b"^1")
  92. def test_non_commit_with_operators(self) -> None:
  93. r = MemoryRepo()
  94. b = Blob.from_string(b"Blah")
  95. r.object_store.add_object(b)
  96. # Can't apply ~ or ^ to a blob
  97. self.assertRaises(ValueError, parse_object, r, b.id + b"~1")
  98. def test_tag_dereference(self) -> None:
  99. r = MemoryRepo()
  100. [c1] = build_commit_graph(r.object_store, [[1]])
  101. # Create an annotated tag
  102. tag = Tag()
  103. tag.name = b"v1.0"
  104. tag.message = b"Test tag"
  105. tag.tag_time = 1234567890
  106. tag.tag_timezone = 0
  107. tag.object = (Commit, c1.id)
  108. tag.tagger = b"Test Tagger <test@example.com>"
  109. r.object_store.add_object(tag)
  110. # ^{} dereferences the tag
  111. self.assertEqual(c1, parse_object(r, tag.id + b"^{}"))
  112. def test_nested_tag_dereference(self) -> None:
  113. r = MemoryRepo()
  114. [c1] = build_commit_graph(r.object_store, [[1]])
  115. # Create a tag pointing to a commit
  116. tag1 = Tag()
  117. tag1.name = b"v1.0"
  118. tag1.message = b"Test tag"
  119. tag1.tag_time = 1234567890
  120. tag1.tag_timezone = 0
  121. tag1.object = (Commit, c1.id)
  122. tag1.tagger = b"Test Tagger <test@example.com>"
  123. r.object_store.add_object(tag1)
  124. # Create another tag pointing to the first tag
  125. tag2 = Tag()
  126. tag2.name = b"v1.0-release"
  127. tag2.message = b"Release tag"
  128. tag2.tag_time = 1234567900
  129. tag2.tag_timezone = 0
  130. tag2.object = (Tag, tag1.id)
  131. tag2.tagger = b"Test Tagger <test@example.com>"
  132. r.object_store.add_object(tag2)
  133. # ^{} should recursively dereference to the commit
  134. self.assertEqual(c1, parse_object(r, tag2.id + b"^{}"))
  135. def test_path_in_tree(self) -> None:
  136. r = MemoryRepo()
  137. # Create a blob
  138. b = Blob.from_string(b"Test content")
  139. # Create a commit with the blob in its tree
  140. [c1] = build_commit_graph(r.object_store, [[1]], trees={1: [(b"test.txt", b)]})
  141. # HEAD:test.txt should return the blob
  142. r.refs[b"HEAD"] = c1.id
  143. result = parse_object(r, b"HEAD:test.txt")
  144. self.assertEqual(b"Test content", result.data)
  145. def test_path_in_tree_nested(self) -> None:
  146. r = MemoryRepo()
  147. # Create blobs
  148. b1 = Blob.from_string(b"Content 1")
  149. b2 = Blob.from_string(b"Content 2")
  150. # For nested trees, we need to create them manually
  151. # Create subtree
  152. subtree = Tree()
  153. subtree.add(b"file.txt", 0o100644, b1.id)
  154. r.object_store.add_object(b1)
  155. r.object_store.add_object(subtree)
  156. # Create main tree
  157. main_tree = Tree()
  158. main_tree.add(b"README", 0o100644, b2.id)
  159. main_tree.add(b"subdir", 0o040000, subtree.id)
  160. r.object_store.add_object(b2)
  161. r.object_store.add_object(main_tree)
  162. # Create commit with our tree
  163. c = Commit()
  164. c.tree = main_tree.id
  165. c.author = c.committer = b"Test User <test@example.com>"
  166. c.author_time = c.commit_time = 1234567890
  167. c.author_timezone = c.commit_timezone = 0
  168. c.message = b"Test commit"
  169. r.object_store.add_object(c)
  170. # Lookup nested path
  171. result = parse_object(r, c.id + b":subdir/file.txt")
  172. self.assertEqual(b"Content 1", result.data)
  173. def test_reflog_lookup(self) -> None:
  174. # Use a real repo for reflog testing
  175. import tempfile
  176. from dulwich.repo import Repo
  177. with tempfile.TemporaryDirectory() as tmpdir:
  178. r = Repo.init_bare(tmpdir)
  179. c1, c2, c3 = build_commit_graph(r.object_store, [[1], [2, 1], [3, 2]])
  180. # Write reflog entries using the repo's _write_reflog method
  181. # These are written in chronological order (oldest first)
  182. r._write_reflog(
  183. b"HEAD",
  184. None,
  185. c1.id,
  186. b"Test User <test@example.com>",
  187. 1234567890,
  188. 0,
  189. b"commit: Initial commit",
  190. )
  191. r._write_reflog(
  192. b"HEAD",
  193. c1.id,
  194. c2.id,
  195. b"Test User <test@example.com>",
  196. 1234567891,
  197. 0,
  198. b"commit: Second commit",
  199. )
  200. r._write_reflog(
  201. b"HEAD",
  202. c2.id,
  203. c3.id,
  204. b"Test User <test@example.com>",
  205. 1234567892,
  206. 0,
  207. b"commit: Third commit",
  208. )
  209. # HEAD@{0} is the most recent (c3)
  210. self.assertEqual(c3, parse_object(r, b"HEAD@{0}"))
  211. # HEAD@{1} is the second most recent (c2)
  212. self.assertEqual(c2, parse_object(r, b"HEAD@{1}"))
  213. # HEAD@{2} is the third/oldest (c1)
  214. self.assertEqual(c1, parse_object(r, b"HEAD@{2}"))
  215. class ParseCommitRangeTests(TestCase):
  216. """Test parse_commit_range."""
  217. def test_nonexistent(self) -> None:
  218. r = MemoryRepo()
  219. self.assertRaises(KeyError, parse_commit_range, r, "thisdoesnotexist")
  220. def test_commit_by_sha(self) -> None:
  221. r = MemoryRepo()
  222. c1, c2, c3 = build_commit_graph(r.object_store, [[1], [2, 1], [3, 1, 2]])
  223. self.assertEqual([c1], list(parse_commit_range(r, c1.id)))
  224. class ParseCommitTests(TestCase):
  225. """Test parse_commit."""
  226. def test_nonexistent(self) -> None:
  227. r = MemoryRepo()
  228. self.assertRaises(KeyError, parse_commit, r, "thisdoesnotexist")
  229. def test_commit_by_sha(self) -> None:
  230. r = MemoryRepo()
  231. [c1] = build_commit_graph(r.object_store, [[1]])
  232. self.assertEqual(c1, parse_commit(r, c1.id))
  233. def test_commit_by_short_sha(self) -> None:
  234. r = MemoryRepo()
  235. [c1] = build_commit_graph(r.object_store, [[1]])
  236. self.assertEqual(c1, parse_commit(r, c1.id[:10]))
  237. def test_annotated_tag(self) -> None:
  238. r = MemoryRepo()
  239. [c1] = build_commit_graph(r.object_store, [[1]])
  240. # Create an annotated tag pointing to the commit
  241. tag = Tag()
  242. tag.name = b"v1.0"
  243. tag.message = b"Test tag"
  244. tag.tag_time = 1234567890
  245. tag.tag_timezone = 0
  246. tag.object = (Commit, c1.id)
  247. tag.tagger = b"Test Tagger <test@example.com>"
  248. r.object_store.add_object(tag)
  249. # parse_commit should follow the tag to the commit
  250. self.assertEqual(c1, parse_commit(r, tag.id))
  251. def test_nested_tags(self) -> None:
  252. r = MemoryRepo()
  253. [c1] = build_commit_graph(r.object_store, [[1]])
  254. # Create an annotated tag pointing to the commit
  255. tag1 = Tag()
  256. tag1.name = b"v1.0"
  257. tag1.message = b"Test tag"
  258. tag1.tag_time = 1234567890
  259. tag1.tag_timezone = 0
  260. tag1.object = (Commit, c1.id)
  261. tag1.tagger = b"Test Tagger <test@example.com>"
  262. r.object_store.add_object(tag1)
  263. # Create another tag pointing to the first tag
  264. tag2 = Tag()
  265. tag2.name = b"v1.0-release"
  266. tag2.message = b"Release tag"
  267. tag2.tag_time = 1234567900
  268. tag2.tag_timezone = 0
  269. tag2.object = (Tag, tag1.id)
  270. tag2.tagger = b"Test Tagger <test@example.com>"
  271. r.object_store.add_object(tag2)
  272. # parse_commit should follow both tags to the commit
  273. self.assertEqual(c1, parse_commit(r, tag2.id))
  274. def test_tag_to_missing_commit(self) -> None:
  275. r = MemoryRepo()
  276. # Create a tag pointing to a non-existent commit
  277. missing_sha = b"1234567890123456789012345678901234567890"
  278. tag = Tag()
  279. tag.name = b"v1.0"
  280. tag.message = b"Test tag"
  281. tag.tag_time = 1234567890
  282. tag.tag_timezone = 0
  283. tag.object = (Commit, missing_sha)
  284. tag.tagger = b"Test Tagger <test@example.com>"
  285. r.object_store.add_object(tag)
  286. # Should raise KeyError for missing commit
  287. self.assertRaises(KeyError, parse_commit, r, tag.id)
  288. def test_tag_to_blob(self) -> None:
  289. r = MemoryRepo()
  290. # Create a blob
  291. blob = Blob.from_string(b"Test content")
  292. r.object_store.add_object(blob)
  293. # Create a tag pointing to the blob
  294. tag = Tag()
  295. tag.name = b"blob-tag"
  296. tag.message = b"Tag pointing to blob"
  297. tag.tag_time = 1234567890
  298. tag.tag_timezone = 0
  299. tag.object = (Blob, blob.id)
  300. tag.tagger = b"Test Tagger <test@example.com>"
  301. r.object_store.add_object(tag)
  302. # Should raise ValueError as it's not a commit
  303. self.assertRaises(ValueError, parse_commit, r, tag.id)
  304. def test_commit_object(self) -> None:
  305. r = MemoryRepo()
  306. [c1] = build_commit_graph(r.object_store, [[1]])
  307. # Test that passing a Commit object directly returns the same object
  308. self.assertEqual(c1, parse_commit(r, c1))
  309. class ParseRefTests(TestCase):
  310. def test_nonexistent(self) -> None:
  311. r = {}
  312. self.assertRaises(KeyError, parse_ref, r, b"thisdoesnotexist")
  313. def test_ambiguous_ref(self) -> None:
  314. r = {
  315. b"ambig1": "bla",
  316. b"refs/ambig1": "bla",
  317. b"refs/tags/ambig1": "bla",
  318. b"refs/heads/ambig1": "bla",
  319. b"refs/remotes/ambig1": "bla",
  320. b"refs/remotes/ambig1/HEAD": "bla",
  321. }
  322. self.assertEqual(b"ambig1", parse_ref(r, b"ambig1"))
  323. def test_ambiguous_ref2(self) -> None:
  324. r = {
  325. b"refs/ambig2": "bla",
  326. b"refs/tags/ambig2": "bla",
  327. b"refs/heads/ambig2": "bla",
  328. b"refs/remotes/ambig2": "bla",
  329. b"refs/remotes/ambig2/HEAD": "bla",
  330. }
  331. self.assertEqual(b"refs/ambig2", parse_ref(r, b"ambig2"))
  332. def test_ambiguous_tag(self) -> None:
  333. r = {
  334. b"refs/tags/ambig3": "bla",
  335. b"refs/heads/ambig3": "bla",
  336. b"refs/remotes/ambig3": "bla",
  337. b"refs/remotes/ambig3/HEAD": "bla",
  338. }
  339. self.assertEqual(b"refs/tags/ambig3", parse_ref(r, b"ambig3"))
  340. def test_ambiguous_head(self) -> None:
  341. r = {
  342. b"refs/heads/ambig4": "bla",
  343. b"refs/remotes/ambig4": "bla",
  344. b"refs/remotes/ambig4/HEAD": "bla",
  345. }
  346. self.assertEqual(b"refs/heads/ambig4", parse_ref(r, b"ambig4"))
  347. def test_ambiguous_remote(self) -> None:
  348. r = {b"refs/remotes/ambig5": "bla", b"refs/remotes/ambig5/HEAD": "bla"}
  349. self.assertEqual(b"refs/remotes/ambig5", parse_ref(r, b"ambig5"))
  350. def test_ambiguous_remote_head(self) -> None:
  351. r = {b"refs/remotes/ambig6/HEAD": "bla"}
  352. self.assertEqual(b"refs/remotes/ambig6/HEAD", parse_ref(r, b"ambig6"))
  353. def test_heads_full(self) -> None:
  354. r = {b"refs/heads/foo": "bla"}
  355. self.assertEqual(b"refs/heads/foo", parse_ref(r, b"refs/heads/foo"))
  356. def test_heads_partial(self) -> None:
  357. r = {b"refs/heads/foo": "bla"}
  358. self.assertEqual(b"refs/heads/foo", parse_ref(r, b"heads/foo"))
  359. def test_tags_partial(self) -> None:
  360. r = {b"refs/tags/foo": "bla"}
  361. self.assertEqual(b"refs/tags/foo", parse_ref(r, b"tags/foo"))
  362. class ParseRefsTests(TestCase):
  363. def test_nonexistent(self) -> None:
  364. r = {}
  365. self.assertRaises(KeyError, parse_refs, r, [b"thisdoesnotexist"])
  366. def test_head(self) -> None:
  367. r = {b"refs/heads/foo": "bla"}
  368. self.assertEqual([b"refs/heads/foo"], parse_refs(r, [b"foo"]))
  369. def test_full(self) -> None:
  370. r = {b"refs/heads/foo": "bla"}
  371. self.assertEqual([b"refs/heads/foo"], parse_refs(r, b"refs/heads/foo"))
  372. class ParseReftupleTests(TestCase):
  373. def test_nonexistent(self) -> None:
  374. r = {}
  375. self.assertRaises(KeyError, parse_reftuple, r, r, b"thisdoesnotexist")
  376. def test_head(self) -> None:
  377. r = {b"refs/heads/foo": "bla"}
  378. self.assertEqual(
  379. (b"refs/heads/foo", b"refs/heads/foo", False),
  380. parse_reftuple(r, r, b"foo"),
  381. )
  382. self.assertEqual(
  383. (b"refs/heads/foo", b"refs/heads/foo", True),
  384. parse_reftuple(r, r, b"+foo"),
  385. )
  386. self.assertEqual(
  387. (b"refs/heads/foo", b"refs/heads/foo", True),
  388. parse_reftuple(r, {}, b"+foo"),
  389. )
  390. self.assertEqual(
  391. (b"refs/heads/foo", b"refs/heads/foo", True),
  392. parse_reftuple(r, {}, b"foo", True),
  393. )
  394. def test_full(self) -> None:
  395. r = {b"refs/heads/foo": "bla"}
  396. self.assertEqual(
  397. (b"refs/heads/foo", b"refs/heads/foo", False),
  398. parse_reftuple(r, r, b"refs/heads/foo"),
  399. )
  400. def test_no_left_ref(self) -> None:
  401. r = {b"refs/heads/foo": "bla"}
  402. self.assertEqual(
  403. (None, b"refs/heads/foo", False),
  404. parse_reftuple(r, r, b":refs/heads/foo"),
  405. )
  406. def test_no_right_ref(self) -> None:
  407. r = {b"refs/heads/foo": "bla"}
  408. self.assertEqual(
  409. (b"refs/heads/foo", None, False),
  410. parse_reftuple(r, r, b"refs/heads/foo:"),
  411. )
  412. def test_default_with_string(self) -> None:
  413. r = {b"refs/heads/foo": "bla"}
  414. self.assertEqual(
  415. (b"refs/heads/foo", b"refs/heads/foo", False),
  416. parse_reftuple(r, r, "foo"),
  417. )
  418. class ParseReftuplesTests(TestCase):
  419. def test_nonexistent(self) -> None:
  420. r = {}
  421. self.assertRaises(KeyError, parse_reftuples, r, r, [b"thisdoesnotexist"])
  422. def test_head(self) -> None:
  423. r = {b"refs/heads/foo": "bla"}
  424. self.assertEqual(
  425. [(b"refs/heads/foo", b"refs/heads/foo", False)],
  426. parse_reftuples(r, r, [b"foo"]),
  427. )
  428. def test_full(self) -> None:
  429. r = {b"refs/heads/foo": "bla"}
  430. self.assertEqual(
  431. [(b"refs/heads/foo", b"refs/heads/foo", False)],
  432. parse_reftuples(r, r, b"refs/heads/foo"),
  433. )
  434. r = {b"refs/heads/foo": "bla"}
  435. self.assertEqual(
  436. [(b"refs/heads/foo", b"refs/heads/foo", True)],
  437. parse_reftuples(r, r, b"refs/heads/foo", True),
  438. )
  439. class ParseTreeTests(TestCase):
  440. """Test parse_tree."""
  441. def test_nonexistent(self) -> None:
  442. r = MemoryRepo()
  443. self.assertRaises(KeyError, parse_tree, r, "thisdoesnotexist")
  444. def test_from_commit(self) -> None:
  445. r = MemoryRepo()
  446. c1, c2, c3 = build_commit_graph(r.object_store, [[1], [2, 1], [3, 1, 2]])
  447. self.assertEqual(r[c1.tree], parse_tree(r, c1.id))
  448. self.assertEqual(r[c1.tree], parse_tree(r, c1.tree))
  449. def test_from_ref(self) -> None:
  450. r = MemoryRepo()
  451. c1, c2, c3 = build_commit_graph(r.object_store, [[1], [2, 1], [3, 1, 2]])
  452. r.refs[b"refs/heads/foo"] = c1.id
  453. self.assertEqual(r[c1.tree], parse_tree(r, b"foo"))
  454. def test_tree_object(self) -> None:
  455. r = MemoryRepo()
  456. [c1] = build_commit_graph(r.object_store, [[1]])
  457. tree = r[c1.tree]
  458. # Test that passing a Tree object directly returns the same object
  459. self.assertEqual(tree, parse_tree(r, tree))
  460. def test_commit_object(self) -> None:
  461. r = MemoryRepo()
  462. [c1] = build_commit_graph(r.object_store, [[1]])
  463. # Test that passing a Commit object returns its tree
  464. self.assertEqual(r[c1.tree], parse_tree(r, c1))
  465. def test_tag_object(self) -> None:
  466. r = MemoryRepo()
  467. [c1] = build_commit_graph(r.object_store, [[1]])
  468. # Create an annotated tag pointing to the commit
  469. tag = Tag()
  470. tag.name = b"v1.0"
  471. tag.message = b"Test tag"
  472. tag.tag_time = 1234567890
  473. tag.tag_timezone = 0
  474. tag.object = (Commit, c1.id)
  475. tag.tagger = b"Test Tagger <test@example.com>"
  476. r.object_store.add_object(tag)
  477. # parse_tree should follow the tag to the commit's tree
  478. self.assertEqual(r[c1.tree], parse_tree(r, tag))