test_objectspec.py 27 KB


  1. # test_objectspec.py -- tests for objectspec.py
  2. # Copyright (C) 2014 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for revision spec parsing."""
  22. # TODO: Round-trip parse-serialize-parse and serialize-parse-serialize tests.
  23. from dulwich.objects import Blob, Commit, Tag, Tree
  24. from dulwich.objectspec import (
  25. parse_commit,
  26. parse_commit_range,
  27. parse_object,
  28. parse_ref,
  29. parse_refs,
  30. parse_reftuple,
  31. parse_reftuples,
  32. parse_tree,
  33. )
  34. from dulwich.repo import MemoryRepo
  35. from dulwich.tests.utils import build_commit_graph
  36. from . import TestCase
  37. class ParseObjectTests(TestCase):
  38. """Test parse_object."""
  39. def test_nonexistent(self) -> None:
  40. r = MemoryRepo()
  41. self.addCleanup(r.close)
  42. self.assertRaises(KeyError, parse_object, r, "thisdoesnotexist")
  43. def test_blob_by_sha(self) -> None:
  44. r = MemoryRepo()
  45. self.addCleanup(r.close)
  46. b = Blob.from_string(b"Blah")
  47. r.object_store.add_object(b)
  48. self.assertEqual(b, parse_object(r, b.id))
  49. def test_parent_caret(self) -> None:
  50. r = MemoryRepo()
  51. self.addCleanup(r.close)
  52. c1, c2, c3 = build_commit_graph(r.object_store, [[1], [2, 1], [3, 1, 2]])
  53. # c3's parents are [c1, c2]
  54. self.assertEqual(c1, parse_object(r, c3.id + b"^1"))
  55. self.assertEqual(c1, parse_object(r, c3.id + b"^")) # ^ defaults to ^1
  56. self.assertEqual(c2, parse_object(r, c3.id + b"^2"))
  57. def test_parent_tilde(self) -> None:
  58. r = MemoryRepo()
  59. self.addCleanup(r.close)
  60. c1, c2, c3 = build_commit_graph(r.object_store, [[1], [2, 1], [3, 2]])
  61. self.assertEqual(c2, parse_object(r, c3.id + b"~"))
  62. self.assertEqual(c2, parse_object(r, c3.id + b"~1"))
  63. self.assertEqual(c1, parse_object(r, c3.id + b"~2"))
  64. def test_combined_operators(self) -> None:
  65. r = MemoryRepo()
  66. self.addCleanup(r.close)
  67. c1, c2, _c3, c4 = build_commit_graph(
  68. r.object_store, [[1], [2, 1], [3, 1, 2], [4, 3]]
  69. )
  70. # c4~1^2 means: go back 1 generation from c4 (to c3), then take its 2nd parent
  71. # c3's parents are [c1, c2], so ^2 is c2
  72. self.assertEqual(c2, parse_object(r, c4.id + b"~1^2"))
  73. self.assertEqual(c1, parse_object(r, c4.id + b"~^"))
  74. def test_with_ref(self) -> None:
  75. r = MemoryRepo()
  76. self.addCleanup(r.close)
  77. c1, c2, c3 = build_commit_graph(r.object_store, [[1], [2, 1], [3, 2]])
  78. r.refs[b"refs/heads/master"] = c3.id
  79. self.assertEqual(c2, parse_object(r, b"master~"))
  80. self.assertEqual(c1, parse_object(r, b"master~2"))
  81. def test_caret_zero(self) -> None:
  82. r = MemoryRepo()
  83. self.addCleanup(r.close)
  84. c1, c2 = build_commit_graph(r.object_store, [[1], [2, 1]])
  85. # ^0 means the commit itself
  86. self.assertEqual(c2, parse_object(r, c2.id + b"^0"))
  87. self.assertEqual(c1, parse_object(r, c2.id + b"~^0"))
  88. def test_missing_parent(self) -> None:
  89. r = MemoryRepo()
  90. self.addCleanup(r.close)
  91. c1, c2 = build_commit_graph(r.object_store, [[1], [2, 1]])
  92. # c2 only has 1 parent, so ^2 should fail
  93. self.assertRaises(ValueError, parse_object, r, c2.id + b"^2")
  94. # c1 has no parents, so ~ should fail
  95. self.assertRaises(ValueError, parse_object, r, c1.id + b"~")
  96. def test_empty_base(self) -> None:
  97. r = MemoryRepo()
  98. self.addCleanup(r.close)
  99. self.assertRaises(ValueError, parse_object, r, b"~1")
  100. self.assertRaises(ValueError, parse_object, r, b"^1")
  101. def test_non_commit_with_operators(self) -> None:
  102. r = MemoryRepo()
  103. self.addCleanup(r.close)
  104. b = Blob.from_string(b"Blah")
  105. r.object_store.add_object(b)
  106. # Can't apply ~ or ^ to a blob
  107. self.assertRaises(ValueError, parse_object, r, b.id + b"~1")
  108. def test_tag_dereference(self) -> None:
  109. r = MemoryRepo()
  110. self.addCleanup(r.close)
  111. [c1] = build_commit_graph(r.object_store, [[1]])
  112. # Create an annotated tag
  113. tag = Tag()
  114. tag.name = b"v1.0"
  115. tag.message = b"Test tag"
  116. tag.tag_time = 1234567890
  117. tag.tag_timezone = 0
  118. tag.object = (Commit, c1.id)
  119. tag.tagger = b"Test Tagger <test@example.com>"
  120. r.object_store.add_object(tag)
  121. # ^{} dereferences the tag
  122. self.assertEqual(c1, parse_object(r, tag.id + b"^{}"))
  123. def test_nested_tag_dereference(self) -> None:
  124. r = MemoryRepo()
  125. self.addCleanup(r.close)
  126. [c1] = build_commit_graph(r.object_store, [[1]])
  127. # Create a tag pointing to a commit
  128. tag1 = Tag()
  129. tag1.name = b"v1.0"
  130. tag1.message = b"Test tag"
  131. tag1.tag_time = 1234567890
  132. tag1.tag_timezone = 0
  133. tag1.object = (Commit, c1.id)
  134. tag1.tagger = b"Test Tagger <test@example.com>"
  135. r.object_store.add_object(tag1)
  136. # Create another tag pointing to the first tag
  137. tag2 = Tag()
  138. tag2.name = b"v1.0-release"
  139. tag2.message = b"Release tag"
  140. tag2.tag_time = 1234567900
  141. tag2.tag_timezone = 0
  142. tag2.object = (Tag, tag1.id)
  143. tag2.tagger = b"Test Tagger <test@example.com>"
  144. r.object_store.add_object(tag2)
  145. # ^{} should recursively dereference to the commit
  146. self.assertEqual(c1, parse_object(r, tag2.id + b"^{}"))
  147. def test_path_in_tree(self) -> None:
  148. r = MemoryRepo()
  149. self.addCleanup(r.close)
  150. # Create a blob
  151. b = Blob.from_string(b"Test content")
  152. # Create a commit with the blob in its tree
  153. [c1] = build_commit_graph(r.object_store, [[1]], trees={1: [(b"test.txt", b)]})
  154. # HEAD:test.txt should return the blob
  155. r.refs[b"HEAD"] = c1.id
  156. result = parse_object(r, b"HEAD:test.txt")
  157. self.assertEqual(b"Test content", result.data)
  158. def test_path_in_tree_nested(self) -> None:
  159. r = MemoryRepo()
  160. self.addCleanup(r.close)
  161. # Create blobs
  162. b1 = Blob.from_string(b"Content 1")
  163. b2 = Blob.from_string(b"Content 2")
  164. # For nested trees, we need to create them manually
  165. # Create subtree
  166. subtree = Tree()
  167. subtree.add(b"file.txt", 0o100644, b1.id)
  168. r.object_store.add_object(b1)
  169. r.object_store.add_object(subtree)
  170. # Create main tree
  171. main_tree = Tree()
  172. main_tree.add(b"README", 0o100644, b2.id)
  173. main_tree.add(b"subdir", 0o040000, subtree.id)
  174. r.object_store.add_object(b2)
  175. r.object_store.add_object(main_tree)
  176. # Create commit with our tree
  177. c = Commit()
  178. c.tree = main_tree.id
  179. c.author = c.committer = b"Test User <test@example.com>"
  180. c.author_time = c.commit_time = 1234567890
  181. c.author_timezone = c.commit_timezone = 0
  182. c.message = b"Test commit"
  183. r.object_store.add_object(c)
  184. # Lookup nested path
  185. result = parse_object(r, c.id + b":subdir/file.txt")
  186. self.assertEqual(b"Content 1", result.data)
  187. def test_reflog_lookup(self) -> None:
  188. # Use a real repo for reflog testing
  189. import tempfile
  190. from dulwich.repo import Repo
  191. with tempfile.TemporaryDirectory() as tmpdir:
  192. r = Repo.init_bare(tmpdir)
  193. c1, c2, c3 = build_commit_graph(r.object_store, [[1], [2, 1], [3, 2]])
  194. # Write reflog entries using the repo's _write_reflog method
  195. # These are written in chronological order (oldest first)
  196. r._write_reflog(
  197. b"HEAD",
  198. None,
  199. c1.id,
  200. b"Test User <test@example.com>",
  201. 1234567890,
  202. 0,
  203. b"commit: Initial commit",
  204. )
  205. r._write_reflog(
  206. b"HEAD",
  207. c1.id,
  208. c2.id,
  209. b"Test User <test@example.com>",
  210. 1234567891,
  211. 0,
  212. b"commit: Second commit",
  213. )
  214. r._write_reflog(
  215. b"HEAD",
  216. c2.id,
  217. c3.id,
  218. b"Test User <test@example.com>",
  219. 1234567892,
  220. 0,
  221. b"commit: Third commit",
  222. )
  223. # HEAD@{0} is the most recent (c3)
  224. self.assertEqual(c3, parse_object(r, b"HEAD@{0}"))
  225. # HEAD@{1} is the second most recent (c2)
  226. self.assertEqual(c2, parse_object(r, b"HEAD@{1}"))
  227. # HEAD@{2} is the third/oldest (c1)
  228. self.assertEqual(c1, parse_object(r, b"HEAD@{2}"))
  229. def test_reflog_time_lookup(self) -> None:
  230. # Use a real repo for reflog testing with time specifications
  231. import tempfile
  232. from dulwich.repo import Repo
  233. with tempfile.TemporaryDirectory() as tmpdir:
  234. r = Repo.init_bare(tmpdir)
  235. c1, c2, c3 = build_commit_graph(r.object_store, [[1], [2, 1], [3, 2]])
  236. # Write reflog entries with specific timestamps
  237. # 1234567890 = 2009-02-13 23:31:30 UTC
  238. r._write_reflog(
  239. b"HEAD",
  240. None,
  241. c1.id,
  242. b"Test User <test@example.com>",
  243. 1234567890,
  244. 0,
  245. b"commit: Initial commit",
  246. )
  247. # 1234657890 = 2009-02-14 23:31:30 UTC (1 day + 1 second later)
  248. r._write_reflog(
  249. b"HEAD",
  250. c1.id,
  251. c2.id,
  252. b"Test User <test@example.com>",
  253. 1234657890,
  254. 0,
  255. b"commit: Second commit",
  256. )
  257. # 1235000000 = 2009-02-18 19:33:20 UTC
  258. r._write_reflog(
  259. b"HEAD",
  260. c2.id,
  261. c3.id,
  262. b"Test User <test@example.com>",
  263. 1235000000,
  264. 0,
  265. b"commit: Third commit",
  266. )
  267. # Lookup by timestamp - should get the most recent entry at or before time
  268. self.assertEqual(c1, parse_object(r, b"HEAD@{1234567890}"))
  269. self.assertEqual(c2, parse_object(r, b"HEAD@{1234657890}"))
  270. self.assertEqual(c3, parse_object(r, b"HEAD@{1235000000}"))
  271. # Future timestamp should get latest entry
  272. self.assertEqual(c3, parse_object(r, b"HEAD@{9999999999}"))
  273. def test_index_path_lookup_stage0(self) -> None:
  274. # Test index path lookup for stage 0 (normal files)
  275. import tempfile
  276. from dulwich.repo import Repo
  277. with tempfile.TemporaryDirectory() as tmpdir:
  278. r = Repo.init(tmpdir)
  279. # Create a blob and add it to the index
  280. b = Blob.from_string(b"Test content")
  281. r.object_store.add_object(b)
  282. # Add to index
  283. index = r.open_index()
  284. from dulwich.index import IndexEntry
  285. index[b"test.txt"] = IndexEntry(
  286. ctime=(0, 0),
  287. mtime=(0, 0),
  288. dev=0,
  289. ino=0,
  290. mode=0o100644,
  291. uid=0,
  292. gid=0,
  293. size=len(b.data),
  294. sha=b.id,
  295. )
  296. index.write()
  297. # Test :path syntax (defaults to stage 0)
  298. result = parse_object(r, b":test.txt")
  299. self.assertEqual(b"Test content", result.data)
  300. # Test :0:path syntax (explicit stage 0)
  301. result = parse_object(r, b":0:test.txt")
  302. self.assertEqual(b"Test content", result.data)
  303. def test_index_path_lookup_conflicts(self) -> None:
  304. # Test index path lookup with merge conflicts (stages 1-3)
  305. import tempfile
  306. from dulwich.index import ConflictedIndexEntry, IndexEntry
  307. from dulwich.repo import Repo
  308. with tempfile.TemporaryDirectory() as tmpdir:
  309. r = Repo.init(tmpdir)
  310. # Create three different versions of a file
  311. b_ancestor = Blob.from_string(b"Ancestor content")
  312. b_this = Blob.from_string(b"This content")
  313. b_other = Blob.from_string(b"Other content")
  314. r.object_store.add_object(b_ancestor)
  315. r.object_store.add_object(b_this)
  316. r.object_store.add_object(b_other)
  317. # Add conflicted entry to index
  318. index = r.open_index()
  319. index[b"conflict.txt"] = ConflictedIndexEntry(
  320. ancestor=IndexEntry(
  321. ctime=(0, 0),
  322. mtime=(0, 0),
  323. dev=0,
  324. ino=0,
  325. mode=0o100644,
  326. uid=0,
  327. gid=0,
  328. size=len(b_ancestor.data),
  329. sha=b_ancestor.id,
  330. ),
  331. this=IndexEntry(
  332. ctime=(0, 0),
  333. mtime=(0, 0),
  334. dev=0,
  335. ino=0,
  336. mode=0o100644,
  337. uid=0,
  338. gid=0,
  339. size=len(b_this.data),
  340. sha=b_this.id,
  341. ),
  342. other=IndexEntry(
  343. ctime=(0, 0),
  344. mtime=(0, 0),
  345. dev=0,
  346. ino=0,
  347. mode=0o100644,
  348. uid=0,
  349. gid=0,
  350. size=len(b_other.data),
  351. sha=b_other.id,
  352. ),
  353. )
  354. index.write()
  355. # Test stage 1 (ancestor)
  356. result = parse_object(r, b":1:conflict.txt")
  357. self.assertEqual(b"Ancestor content", result.data)
  358. # Test stage 2 (this)
  359. result = parse_object(r, b":2:conflict.txt")
  360. self.assertEqual(b"This content", result.data)
  361. # Test stage 3 (other)
  362. result = parse_object(r, b":3:conflict.txt")
  363. self.assertEqual(b"Other content", result.data)
  364. # Test that :conflict.txt raises an error for conflicted files
  365. self.assertRaises(ValueError, parse_object, r, b":conflict.txt")
  366. def test_index_path_not_found(self) -> None:
  367. # Test error when path not in index
  368. import tempfile
  369. from dulwich.repo import Repo
  370. with tempfile.TemporaryDirectory() as tmpdir:
  371. r = Repo.init(tmpdir)
  372. # Try to lookup non-existent path
  373. self.assertRaises(KeyError, parse_object, r, b":nonexistent.txt")
  374. class ParseCommitRangeTests(TestCase):
  375. """Test parse_commit_range."""
  376. def test_nonexistent(self) -> None:
  377. r = MemoryRepo()
  378. self.addCleanup(r.close)
  379. self.assertRaises(KeyError, parse_commit_range, r, "thisdoesnotexist..HEAD")
  380. def test_commit_by_sha(self) -> None:
  381. r = MemoryRepo()
  382. self.addCleanup(r.close)
  383. c1, _c2, _c3 = build_commit_graph(r.object_store, [[1], [2, 1], [3, 1, 2]])
  384. self.assertIsNone(parse_commit_range(r, c1.id))
  385. def test_commit_range(self) -> None:
  386. r = MemoryRepo()
  387. self.addCleanup(r.close)
  388. c1, c2, _c3 = build_commit_graph(r.object_store, [[1], [2, 1], [3, 1, 2]])
  389. result = parse_commit_range(r, f"{c1.id.decode()}..{c2.id.decode()}")
  390. self.assertIsNotNone(result)
  391. start_commit, end_commit = result
  392. self.assertEqual(c1, start_commit)
  393. self.assertEqual(c2, end_commit)
  394. class ParseCommitTests(TestCase):
  395. """Test parse_commit."""
  396. def test_nonexistent(self) -> None:
  397. r = MemoryRepo()
  398. self.addCleanup(r.close)
  399. self.assertRaises(KeyError, parse_commit, r, "thisdoesnotexist")
  400. def test_commit_by_sha(self) -> None:
  401. r = MemoryRepo()
  402. self.addCleanup(r.close)
  403. [c1] = build_commit_graph(r.object_store, [[1]])
  404. self.assertEqual(c1, parse_commit(r, c1.id))
  405. def test_commit_by_short_sha(self) -> None:
  406. r = MemoryRepo()
  407. self.addCleanup(r.close)
  408. [c1] = build_commit_graph(r.object_store, [[1]])
  409. self.assertEqual(c1, parse_commit(r, c1.id[:10]))
  410. def test_annotated_tag(self) -> None:
  411. r = MemoryRepo()
  412. self.addCleanup(r.close)
  413. [c1] = build_commit_graph(r.object_store, [[1]])
  414. # Create an annotated tag pointing to the commit
  415. tag = Tag()
  416. tag.name = b"v1.0"
  417. tag.message = b"Test tag"
  418. tag.tag_time = 1234567890
  419. tag.tag_timezone = 0
  420. tag.object = (Commit, c1.id)
  421. tag.tagger = b"Test Tagger <test@example.com>"
  422. r.object_store.add_object(tag)
  423. # parse_commit should follow the tag to the commit
  424. self.assertEqual(c1, parse_commit(r, tag.id))
  425. def test_nested_tags(self) -> None:
  426. r = MemoryRepo()
  427. self.addCleanup(r.close)
  428. [c1] = build_commit_graph(r.object_store, [[1]])
  429. # Create an annotated tag pointing to the commit
  430. tag1 = Tag()
  431. tag1.name = b"v1.0"
  432. tag1.message = b"Test tag"
  433. tag1.tag_time = 1234567890
  434. tag1.tag_timezone = 0
  435. tag1.object = (Commit, c1.id)
  436. tag1.tagger = b"Test Tagger <test@example.com>"
  437. r.object_store.add_object(tag1)
  438. # Create another tag pointing to the first tag
  439. tag2 = Tag()
  440. tag2.name = b"v1.0-release"
  441. tag2.message = b"Release tag"
  442. tag2.tag_time = 1234567900
  443. tag2.tag_timezone = 0
  444. tag2.object = (Tag, tag1.id)
  445. tag2.tagger = b"Test Tagger <test@example.com>"
  446. r.object_store.add_object(tag2)
  447. # parse_commit should follow both tags to the commit
  448. self.assertEqual(c1, parse_commit(r, tag2.id))
  449. def test_tag_to_missing_commit(self) -> None:
  450. r = MemoryRepo()
  451. # Create a tag pointing to a non-existent commit
  452. missing_sha = b"1234567890123456789012345678901234567890"
  453. tag = Tag()
  454. tag.name = b"v1.0"
  455. tag.message = b"Test tag"
  456. tag.tag_time = 1234567890
  457. tag.tag_timezone = 0
  458. tag.object = (Commit, missing_sha)
  459. tag.tagger = b"Test Tagger <test@example.com>"
  460. r.object_store.add_object(tag)
  461. # Should raise KeyError for missing commit
  462. self.assertRaises(KeyError, parse_commit, r, tag.id)
  463. def test_tag_to_blob(self) -> None:
  464. r = MemoryRepo()
  465. self.addCleanup(r.close)
  466. # Create a blob
  467. blob = Blob.from_string(b"Test content")
  468. r.object_store.add_object(blob)
  469. # Create a tag pointing to the blob
  470. tag = Tag()
  471. tag.name = b"blob-tag"
  472. tag.message = b"Tag pointing to blob"
  473. tag.tag_time = 1234567890
  474. tag.tag_timezone = 0
  475. tag.object = (Blob, blob.id)
  476. tag.tagger = b"Test Tagger <test@example.com>"
  477. r.object_store.add_object(tag)
  478. # Should raise ValueError as it's not a commit
  479. self.assertRaises(ValueError, parse_commit, r, tag.id)
  480. def test_commit_object(self) -> None:
  481. r = MemoryRepo()
  482. self.addCleanup(r.close)
  483. [c1] = build_commit_graph(r.object_store, [[1]])
  484. # Test that passing a Commit object directly returns the same object
  485. self.assertEqual(c1, parse_commit(r, c1))
  486. class ParseRefTests(TestCase):
  487. def test_nonexistent(self) -> None:
  488. r = {}
  489. self.assertRaises(KeyError, parse_ref, r, b"thisdoesnotexist")
  490. def test_ambiguous_ref(self) -> None:
  491. r = {
  492. b"ambig1": "bla",
  493. b"refs/ambig1": "bla",
  494. b"refs/tags/ambig1": "bla",
  495. b"refs/heads/ambig1": "bla",
  496. b"refs/remotes/ambig1": "bla",
  497. b"refs/remotes/ambig1/HEAD": "bla",
  498. }
  499. self.assertEqual(b"ambig1", parse_ref(r, b"ambig1"))
  500. def test_ambiguous_ref2(self) -> None:
  501. r = {
  502. b"refs/ambig2": "bla",
  503. b"refs/tags/ambig2": "bla",
  504. b"refs/heads/ambig2": "bla",
  505. b"refs/remotes/ambig2": "bla",
  506. b"refs/remotes/ambig2/HEAD": "bla",
  507. }
  508. self.assertEqual(b"refs/ambig2", parse_ref(r, b"ambig2"))
  509. def test_ambiguous_tag(self) -> None:
  510. r = {
  511. b"refs/tags/ambig3": "bla",
  512. b"refs/heads/ambig3": "bla",
  513. b"refs/remotes/ambig3": "bla",
  514. b"refs/remotes/ambig3/HEAD": "bla",
  515. }
  516. self.assertEqual(b"refs/tags/ambig3", parse_ref(r, b"ambig3"))
  517. def test_ambiguous_head(self) -> None:
  518. r = {
  519. b"refs/heads/ambig4": "bla",
  520. b"refs/remotes/ambig4": "bla",
  521. b"refs/remotes/ambig4/HEAD": "bla",
  522. }
  523. self.assertEqual(b"refs/heads/ambig4", parse_ref(r, b"ambig4"))
  524. def test_ambiguous_remote(self) -> None:
  525. r = {b"refs/remotes/ambig5": "bla", b"refs/remotes/ambig5/HEAD": "bla"}
  526. self.assertEqual(b"refs/remotes/ambig5", parse_ref(r, b"ambig5"))
  527. def test_ambiguous_remote_head(self) -> None:
  528. r = {b"refs/remotes/ambig6/HEAD": "bla"}
  529. self.assertEqual(b"refs/remotes/ambig6/HEAD", parse_ref(r, b"ambig6"))
  530. def test_heads_full(self) -> None:
  531. r = {b"refs/heads/foo": "bla"}
  532. self.assertEqual(b"refs/heads/foo", parse_ref(r, b"refs/heads/foo"))
  533. def test_heads_partial(self) -> None:
  534. r = {b"refs/heads/foo": "bla"}
  535. self.assertEqual(b"refs/heads/foo", parse_ref(r, b"heads/foo"))
  536. def test_tags_partial(self) -> None:
  537. r = {b"refs/tags/foo": "bla"}
  538. self.assertEqual(b"refs/tags/foo", parse_ref(r, b"tags/foo"))
  539. class ParseRefsTests(TestCase):
  540. def test_nonexistent(self) -> None:
  541. r = {}
  542. self.assertRaises(KeyError, parse_refs, r, [b"thisdoesnotexist"])
  543. def test_head(self) -> None:
  544. r = {b"refs/heads/foo": "bla"}
  545. self.assertEqual([b"refs/heads/foo"], parse_refs(r, [b"foo"]))
  546. def test_full(self) -> None:
  547. r = {b"refs/heads/foo": "bla"}
  548. self.assertEqual([b"refs/heads/foo"], parse_refs(r, b"refs/heads/foo"))
  549. class ParseReftupleTests(TestCase):
  550. def test_nonexistent(self) -> None:
  551. r = {}
  552. self.assertRaises(KeyError, parse_reftuple, r, r, b"thisdoesnotexist")
  553. def test_head(self) -> None:
  554. r = {b"refs/heads/foo": "bla"}
  555. self.assertEqual(
  556. (b"refs/heads/foo", b"refs/heads/foo", False),
  557. parse_reftuple(r, r, b"foo"),
  558. )
  559. self.assertEqual(
  560. (b"refs/heads/foo", b"refs/heads/foo", True),
  561. parse_reftuple(r, r, b"+foo"),
  562. )
  563. self.assertEqual(
  564. (b"refs/heads/foo", b"refs/heads/foo", True),
  565. parse_reftuple(r, {}, b"+foo"),
  566. )
  567. self.assertEqual(
  568. (b"refs/heads/foo", b"refs/heads/foo", True),
  569. parse_reftuple(r, {}, b"foo", True),
  570. )
  571. def test_full(self) -> None:
  572. r = {b"refs/heads/foo": "bla"}
  573. self.assertEqual(
  574. (b"refs/heads/foo", b"refs/heads/foo", False),
  575. parse_reftuple(r, r, b"refs/heads/foo"),
  576. )
  577. def test_no_left_ref(self) -> None:
  578. r = {b"refs/heads/foo": "bla"}
  579. self.assertEqual(
  580. (None, b"refs/heads/foo", False),
  581. parse_reftuple(r, r, b":refs/heads/foo"),
  582. )
  583. def test_no_right_ref(self) -> None:
  584. r = {b"refs/heads/foo": "bla"}
  585. self.assertEqual(
  586. (b"refs/heads/foo", None, False),
  587. parse_reftuple(r, r, b"refs/heads/foo:"),
  588. )
  589. def test_default_with_string(self) -> None:
  590. r = {b"refs/heads/foo": "bla"}
  591. self.assertEqual(
  592. (b"refs/heads/foo", b"refs/heads/foo", False),
  593. parse_reftuple(r, r, "foo"),
  594. )
  595. class ParseReftuplesTests(TestCase):
  596. def test_nonexistent(self) -> None:
  597. r = {}
  598. self.assertRaises(KeyError, parse_reftuples, r, r, [b"thisdoesnotexist"])
  599. def test_head(self) -> None:
  600. r = {b"refs/heads/foo": "bla"}
  601. self.assertEqual(
  602. [(b"refs/heads/foo", b"refs/heads/foo", False)],
  603. parse_reftuples(r, r, [b"foo"]),
  604. )
  605. def test_full(self) -> None:
  606. r = {b"refs/heads/foo": "bla"}
  607. self.assertEqual(
  608. [(b"refs/heads/foo", b"refs/heads/foo", False)],
  609. parse_reftuples(r, r, b"refs/heads/foo"),
  610. )
  611. r = {b"refs/heads/foo": "bla"}
  612. self.assertEqual(
  613. [(b"refs/heads/foo", b"refs/heads/foo", True)],
  614. parse_reftuples(r, r, b"refs/heads/foo", True),
  615. )
  616. class ParseTreeTests(TestCase):
  617. """Test parse_tree."""
  618. def test_nonexistent(self) -> None:
  619. r = MemoryRepo()
  620. self.addCleanup(r.close)
  621. self.assertRaises(KeyError, parse_tree, r, "thisdoesnotexist")
  622. def test_from_commit(self) -> None:
  623. r = MemoryRepo()
  624. self.addCleanup(r.close)
  625. c1, _c2, _c3 = build_commit_graph(r.object_store, [[1], [2, 1], [3, 1, 2]])
  626. self.assertEqual(r[c1.tree], parse_tree(r, c1.id))
  627. self.assertEqual(r[c1.tree], parse_tree(r, c1.tree))
  628. def test_from_ref(self) -> None:
  629. r = MemoryRepo()
  630. self.addCleanup(r.close)
  631. c1, _c2, _c3 = build_commit_graph(r.object_store, [[1], [2, 1], [3, 1, 2]])
  632. r.refs[b"refs/heads/foo"] = c1.id
  633. self.assertEqual(r[c1.tree], parse_tree(r, b"foo"))
  634. def test_tree_object(self) -> None:
  635. r = MemoryRepo()
  636. self.addCleanup(r.close)
  637. [c1] = build_commit_graph(r.object_store, [[1]])
  638. tree = r[c1.tree]
  639. # Test that passing a Tree object directly returns the same object
  640. self.assertEqual(tree, parse_tree(r, tree))
  641. def test_commit_object(self) -> None:
  642. r = MemoryRepo()
  643. self.addCleanup(r.close)
  644. [c1] = build_commit_graph(r.object_store, [[1]])
  645. # Test that passing a Commit object returns its tree
  646. self.assertEqual(r[c1.tree], parse_tree(r, c1))
  647. def test_tag_object(self) -> None:
  648. r = MemoryRepo()
  649. self.addCleanup(r.close)
  650. [c1] = build_commit_graph(r.object_store, [[1]])
  651. # Create an annotated tag pointing to the commit
  652. tag = Tag()
  653. tag.name = b"v1.0"
  654. tag.message = b"Test tag"
  655. tag.tag_time = 1234567890
  656. tag.tag_timezone = 0
  657. tag.object = (Commit, c1.id)
  658. tag.tagger = b"Test Tagger <test@example.com>"
  659. r.object_store.add_object(tag)
  660. # parse_tree should follow the tag to the commit's tree
  661. self.assertEqual(r[c1.tree], parse_tree(r, tag))