test_repository.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753
  1. # test_repository.py -- tests for repository.py
  2. # Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
  3. #
  4. # This program is free software; you can redistribute it and/or
  5. # modify it under the terms of the GNU General Public License
  6. # as published by the Free Software Foundation; version 2
  7. # of the License or (at your option) any later version of
  8. # the License.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program; if not, write to the Free Software
  17. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  18. # MA 02110-1301, USA.
  19. """Tests for the repository."""
  20. from cStringIO import StringIO
  21. import os
  22. import shutil
  23. import tempfile
  24. import unittest
  25. import warnings
  26. from dulwich import errors
  27. from dulwich.object_store import (
  28. tree_lookup_path,
  29. )
  30. from dulwich import objects
  31. from dulwich.repo import (
  32. check_ref_format,
  33. DictRefsContainer,
  34. Repo,
  35. read_packed_refs,
  36. read_packed_refs_with_peeled,
  37. write_packed_refs,
  38. _split_ref_line,
  39. )
  40. from dulwich.tests.utils import (
  41. open_repo,
  42. tear_down_repo,
  43. )
  44. missing_sha = 'b91fa4d900e17e99b433218e988c4eb4a3e9a097'
  45. class CreateRepositoryTests(unittest.TestCase):
  46. def test_create(self):
  47. tmp_dir = tempfile.mkdtemp()
  48. try:
  49. repo = Repo.init_bare(tmp_dir)
  50. self.assertEquals(tmp_dir, repo._controldir)
  51. finally:
  52. shutil.rmtree(tmp_dir)
  53. class RepositoryTests(unittest.TestCase):
  54. def setUp(self):
  55. self._repo = None
  56. def tearDown(self):
  57. if self._repo is not None:
  58. tear_down_repo(self._repo)
  59. def test_simple_props(self):
  60. r = self._repo = open_repo('a.git')
  61. self.assertEqual(r.controldir(), r.path)
  62. def test_ref(self):
  63. r = self._repo = open_repo('a.git')
  64. self.assertEqual(r.ref('refs/heads/master'),
  65. 'a90fa2d900a17e99b433217e988c4eb4a2e9a097')
  66. def test_setitem(self):
  67. r = self._repo = open_repo('a.git')
  68. r["refs/tags/foo"] = 'a90fa2d900a17e99b433217e988c4eb4a2e9a097'
  69. self.assertEquals('a90fa2d900a17e99b433217e988c4eb4a2e9a097',
  70. r["refs/tags/foo"].id)
  71. def test_get_refs(self):
  72. r = self._repo = open_repo('a.git')
  73. self.assertEqual({
  74. 'HEAD': 'a90fa2d900a17e99b433217e988c4eb4a2e9a097',
  75. 'refs/heads/master': 'a90fa2d900a17e99b433217e988c4eb4a2e9a097',
  76. 'refs/tags/mytag': '28237f4dc30d0d462658d6b937b08a0f0b6ef55a',
  77. 'refs/tags/mytag-packed': 'b0931cadc54336e78a1d980420e3268903b57a50',
  78. }, r.get_refs())
  79. def test_head(self):
  80. r = self._repo = open_repo('a.git')
  81. self.assertEqual(r.head(), 'a90fa2d900a17e99b433217e988c4eb4a2e9a097')
  82. def test_get_object(self):
  83. r = self._repo = open_repo('a.git')
  84. obj = r.get_object(r.head())
  85. self.assertEqual(obj.type_name, 'commit')
  86. def test_get_object_non_existant(self):
  87. r = self._repo = open_repo('a.git')
  88. self.assertRaises(KeyError, r.get_object, missing_sha)
  89. def test_contains_object(self):
  90. r = self._repo = open_repo('a.git')
  91. self.assertTrue(r.head() in r)
  92. def test_contains_ref(self):
  93. r = self._repo = open_repo('a.git')
  94. self.assertTrue("HEAD" in r)
  95. def test_contains_missing(self):
  96. r = self._repo = open_repo('a.git')
  97. self.assertFalse("bar" in r)
  98. def test_commit(self):
  99. r = self._repo = open_repo('a.git')
  100. warnings.simplefilter("ignore", DeprecationWarning)
  101. try:
  102. obj = r.commit(r.head())
  103. finally:
  104. warnings.resetwarnings()
  105. self.assertEqual(obj.type_name, 'commit')
  106. def test_commit_not_commit(self):
  107. r = self._repo = open_repo('a.git')
  108. warnings.simplefilter("ignore", DeprecationWarning)
  109. try:
  110. self.assertRaises(errors.NotCommitError,
  111. r.commit, '4f2e6529203aa6d44b5af6e3292c837ceda003f9')
  112. finally:
  113. warnings.resetwarnings()
  114. def test_tree(self):
  115. r = self._repo = open_repo('a.git')
  116. commit = r[r.head()]
  117. warnings.simplefilter("ignore", DeprecationWarning)
  118. try:
  119. tree = r.tree(commit.tree)
  120. finally:
  121. warnings.resetwarnings()
  122. self.assertEqual(tree.type_name, 'tree')
  123. self.assertEqual(tree.sha().hexdigest(), commit.tree)
  124. def test_tree_not_tree(self):
  125. r = self._repo = open_repo('a.git')
  126. warnings.simplefilter("ignore", DeprecationWarning)
  127. try:
  128. self.assertRaises(errors.NotTreeError, r.tree, r.head())
  129. finally:
  130. warnings.resetwarnings()
  131. def test_tag(self):
  132. r = self._repo = open_repo('a.git')
  133. tag_sha = '28237f4dc30d0d462658d6b937b08a0f0b6ef55a'
  134. warnings.simplefilter("ignore", DeprecationWarning)
  135. try:
  136. tag = r.tag(tag_sha)
  137. finally:
  138. warnings.resetwarnings()
  139. self.assertEqual(tag.type_name, 'tag')
  140. self.assertEqual(tag.sha().hexdigest(), tag_sha)
  141. obj_class, obj_sha = tag.object
  142. self.assertEqual(obj_class, objects.Commit)
  143. self.assertEqual(obj_sha, r.head())
  144. def test_tag_not_tag(self):
  145. r = self._repo = open_repo('a.git')
  146. warnings.simplefilter("ignore", DeprecationWarning)
  147. try:
  148. self.assertRaises(errors.NotTagError, r.tag, r.head())
  149. finally:
  150. warnings.resetwarnings()
  151. def test_get_peeled(self):
  152. # unpacked ref
  153. r = self._repo = open_repo('a.git')
  154. tag_sha = '28237f4dc30d0d462658d6b937b08a0f0b6ef55a'
  155. self.assertNotEqual(r[tag_sha].sha().hexdigest(), r.head())
  156. self.assertEqual(r.get_peeled('refs/tags/mytag'), r.head())
  157. # packed ref with cached peeled value
  158. packed_tag_sha = 'b0931cadc54336e78a1d980420e3268903b57a50'
  159. parent_sha = r[r.head()].parents[0]
  160. self.assertNotEqual(r[packed_tag_sha].sha().hexdigest(), parent_sha)
  161. self.assertEqual(r.get_peeled('refs/tags/mytag-packed'), parent_sha)
  162. # TODO: add more corner cases to test repo
  163. def test_get_peeled_not_tag(self):
  164. r = self._repo = open_repo('a.git')
  165. self.assertEqual(r.get_peeled('HEAD'), r.head())
  166. def test_get_blob(self):
  167. r = self._repo = open_repo('a.git')
  168. commit = r[r.head()]
  169. tree = r[commit.tree]
  170. blob_sha = tree.entries()[0][2]
  171. warnings.simplefilter("ignore", DeprecationWarning)
  172. try:
  173. blob = r.get_blob(blob_sha)
  174. finally:
  175. warnings.resetwarnings()
  176. self.assertEqual(blob.type_name, 'blob')
  177. self.assertEqual(blob.sha().hexdigest(), blob_sha)
  178. def test_get_blob_notblob(self):
  179. r = self._repo = open_repo('a.git')
  180. warnings.simplefilter("ignore", DeprecationWarning)
  181. try:
  182. self.assertRaises(errors.NotBlobError, r.get_blob, r.head())
  183. finally:
  184. warnings.resetwarnings()
  185. def test_linear_history(self):
  186. r = self._repo = open_repo('a.git')
  187. history = r.revision_history(r.head())
  188. shas = [c.sha().hexdigest() for c in history]
  189. self.assertEqual(shas, [r.head(),
  190. '2a72d929692c41d8554c07f6301757ba18a65d91'])
  191. def test_merge_history(self):
  192. r = self._repo = open_repo('simple_merge.git')
  193. history = r.revision_history(r.head())
  194. shas = [c.sha().hexdigest() for c in history]
  195. self.assertEqual(shas, ['5dac377bdded4c9aeb8dff595f0faeebcc8498cc',
  196. 'ab64bbdcc51b170d21588e5c5d391ee5c0c96dfd',
  197. '4cffe90e0a41ad3f5190079d7c8f036bde29cbe6',
  198. '60dacdc733de308bb77bb76ce0fb0f9b44c9769e',
  199. '0d89f20333fbb1d2f3a94da77f4981373d8f4310'])
  200. def test_revision_history_missing_commit(self):
  201. r = self._repo = open_repo('simple_merge.git')
  202. self.assertRaises(errors.MissingCommitError, r.revision_history,
  203. missing_sha)
  204. def test_out_of_order_merge(self):
  205. """Test that revision history is ordered by date, not parent order."""
  206. r = self._repo = open_repo('ooo_merge.git')
  207. history = r.revision_history(r.head())
  208. shas = [c.sha().hexdigest() for c in history]
  209. self.assertEqual(shas, ['7601d7f6231db6a57f7bbb79ee52e4d462fd44d1',
  210. 'f507291b64138b875c28e03469025b1ea20bc614',
  211. 'fb5b0425c7ce46959bec94d54b9a157645e114f5',
  212. 'f9e39b120c68182a4ba35349f832d0e4e61f485c'])
  213. def test_get_tags_empty(self):
  214. r = self._repo = open_repo('ooo_merge.git')
  215. self.assertEqual({}, r.refs.as_dict('refs/tags'))
  216. def test_get_config(self):
  217. r = self._repo = open_repo('ooo_merge.git')
  218. self.assertEquals({}, r.get_config())
  219. def test_common_revisions(self):
  220. """
  221. This test demonstrates that ``find_common_revisions()`` actually returns
  222. common heads, not revisions; dulwich already uses
  223. ``find_common_revisions()`` in such a manner (see
  224. ``Repo.fetch_objects()``).
  225. """
  226. expected_shas = set(['60dacdc733de308bb77bb76ce0fb0f9b44c9769e'])
  227. # Source for objects.
  228. r_base = open_repo('simple_merge.git')
  229. # Re-create each-side of the merge in simple_merge.git.
  230. #
  231. # Since the trees and blobs are missing, the repository created is
  232. # corrupted, but we're only checking for commits for the purpose of this
  233. # test, so it's immaterial.
  234. r1_dir = tempfile.mkdtemp()
  235. r1_commits = ['ab64bbdcc51b170d21588e5c5d391ee5c0c96dfd', # HEAD
  236. '60dacdc733de308bb77bb76ce0fb0f9b44c9769e',
  237. '0d89f20333fbb1d2f3a94da77f4981373d8f4310']
  238. r2_dir = tempfile.mkdtemp()
  239. r2_commits = ['4cffe90e0a41ad3f5190079d7c8f036bde29cbe6', # HEAD
  240. '60dacdc733de308bb77bb76ce0fb0f9b44c9769e',
  241. '0d89f20333fbb1d2f3a94da77f4981373d8f4310']
  242. try:
  243. r1 = Repo.init_bare(r1_dir)
  244. map(lambda c: r1.object_store.add_object(r_base.get_object(c)), \
  245. r1_commits)
  246. r1.refs['HEAD'] = r1_commits[0]
  247. r2 = Repo.init_bare(r2_dir)
  248. map(lambda c: r2.object_store.add_object(r_base.get_object(c)), \
  249. r2_commits)
  250. r2.refs['HEAD'] = r2_commits[0]
  251. # Finally, the 'real' testing!
  252. shas = r2.object_store.find_common_revisions(r1.get_graph_walker())
  253. self.assertEqual(set(shas), expected_shas)
  254. shas = r1.object_store.find_common_revisions(r2.get_graph_walker())
  255. self.assertEqual(set(shas), expected_shas)
  256. finally:
  257. shutil.rmtree(r1_dir)
  258. shutil.rmtree(r2_dir)
  259. class BuildRepoTests(unittest.TestCase):
  260. """Tests that build on-disk repos from scratch.
  261. Repos live in a temp dir and are torn down after each test. They start with
  262. a single commit in master having single file named 'a'.
  263. """
  264. def setUp(self):
  265. repo_dir = os.path.join(tempfile.mkdtemp(), 'test')
  266. os.makedirs(repo_dir)
  267. r = self._repo = Repo.init(repo_dir)
  268. self.assertFalse(r.bare)
  269. self.assertEqual('ref: refs/heads/master', r.refs.read_ref('HEAD'))
  270. self.assertRaises(KeyError, lambda: r.refs['refs/heads/master'])
  271. f = open(os.path.join(r.path, 'a'), 'wb')
  272. try:
  273. f.write('file contents')
  274. finally:
  275. f.close()
  276. r.stage(['a'])
  277. commit_sha = r.do_commit('msg',
  278. committer='Test Committer <test@nodomain.com>',
  279. author='Test Author <test@nodomain.com>',
  280. commit_timestamp=12345, commit_timezone=0,
  281. author_timestamp=12345, author_timezone=0)
  282. self.assertEqual([], r[commit_sha].parents)
  283. self._root_commit = commit_sha
  284. def tearDown(self):
  285. tear_down_repo(self._repo)
  286. def test_build_repo(self):
  287. r = self._repo
  288. self.assertEqual('ref: refs/heads/master', r.refs.read_ref('HEAD'))
  289. self.assertEqual(self._root_commit, r.refs['refs/heads/master'])
  290. expected_blob = objects.Blob.from_string('file contents')
  291. self.assertEqual(expected_blob.data, r[expected_blob.id].data)
  292. actual_commit = r[self._root_commit]
  293. self.assertEqual('msg', actual_commit.message)
  294. def test_commit_modified(self):
  295. r = self._repo
  296. f = open(os.path.join(r.path, 'a'), 'wb')
  297. try:
  298. f.write('new contents')
  299. finally:
  300. f.close()
  301. r.stage(['a'])
  302. commit_sha = r.do_commit('modified a',
  303. committer='Test Committer <test@nodomain.com>',
  304. author='Test Author <test@nodomain.com>',
  305. commit_timestamp=12395, commit_timezone=0,
  306. author_timestamp=12395, author_timezone=0)
  307. self.assertEqual([self._root_commit], r[commit_sha].parents)
  308. _, blob_id = tree_lookup_path(r.get_object, r[commit_sha].tree, 'a')
  309. self.assertEqual('new contents', r[blob_id].data)
  310. def test_commit_deleted(self):
  311. r = self._repo
  312. os.remove(os.path.join(r.path, 'a'))
  313. r.stage(['a'])
  314. commit_sha = r.do_commit('deleted a',
  315. committer='Test Committer <test@nodomain.com>',
  316. author='Test Author <test@nodomain.com>',
  317. commit_timestamp=12395, commit_timezone=0,
  318. author_timestamp=12395, author_timezone=0)
  319. self.assertEqual([self._root_commit], r[commit_sha].parents)
  320. self.assertEqual([], list(r.open_index()))
  321. tree = r[r[commit_sha].tree]
  322. self.assertEqual([], tree.iteritems())
  323. def test_commit_fail_ref(self):
  324. r = self._repo
  325. def set_if_equals(name, old_ref, new_ref):
  326. return False
  327. r.refs.set_if_equals = set_if_equals
  328. def add_if_new(name, new_ref):
  329. self.fail('Unexpected call to add_if_new')
  330. r.refs.add_if_new = add_if_new
  331. old_shas = set(r.object_store)
  332. self.assertRaises(errors.CommitError, r.do_commit, 'failed commit',
  333. committer='Test Committer <test@nodomain.com>',
  334. author='Test Author <test@nodomain.com>',
  335. commit_timestamp=12345, commit_timezone=0,
  336. author_timestamp=12345, author_timezone=0)
  337. new_shas = set(r.object_store) - old_shas
  338. self.assertEqual(1, len(new_shas))
  339. # Check that the new commit (now garbage) was added.
  340. new_commit = r[new_shas.pop()]
  341. self.assertEqual(r[self._root_commit].tree, new_commit.tree)
  342. self.assertEqual('failed commit', new_commit.message)
  343. def test_stage_deleted(self):
  344. r = self._repo
  345. os.remove(os.path.join(r.path, 'a'))
  346. r.stage(['a'])
  347. r.stage(['a']) # double-stage a deleted path
  348. class CheckRefFormatTests(unittest.TestCase):
  349. """Tests for the check_ref_format function.
  350. These are the same tests as in the git test suite.
  351. """
  352. def test_valid(self):
  353. self.assertTrue(check_ref_format('heads/foo'))
  354. self.assertTrue(check_ref_format('foo/bar/baz'))
  355. self.assertTrue(check_ref_format('refs///heads/foo'))
  356. self.assertTrue(check_ref_format('foo./bar'))
  357. self.assertTrue(check_ref_format('heads/foo@bar'))
  358. self.assertTrue(check_ref_format('heads/fix.lock.error'))
  359. def test_invalid(self):
  360. self.assertFalse(check_ref_format('foo'))
  361. self.assertFalse(check_ref_format('heads/foo/'))
  362. self.assertFalse(check_ref_format('./foo'))
  363. self.assertFalse(check_ref_format('.refs/foo'))
  364. self.assertFalse(check_ref_format('heads/foo..bar'))
  365. self.assertFalse(check_ref_format('heads/foo?bar'))
  366. self.assertFalse(check_ref_format('heads/foo.lock'))
  367. self.assertFalse(check_ref_format('heads/v@{ation'))
  368. self.assertFalse(check_ref_format('heads/foo\bar'))
  369. ONES = "1" * 40
  370. TWOS = "2" * 40
  371. THREES = "3" * 40
  372. FOURS = "4" * 40
  373. class PackedRefsFileTests(unittest.TestCase):
  374. def test_split_ref_line_errors(self):
  375. self.assertRaises(errors.PackedRefsException, _split_ref_line,
  376. 'singlefield')
  377. self.assertRaises(errors.PackedRefsException, _split_ref_line,
  378. 'badsha name')
  379. self.assertRaises(errors.PackedRefsException, _split_ref_line,
  380. '%s bad/../refname' % ONES)
  381. def test_read_without_peeled(self):
  382. f = StringIO('# comment\n%s ref/1\n%s ref/2' % (ONES, TWOS))
  383. self.assertEqual([(ONES, 'ref/1'), (TWOS, 'ref/2')],
  384. list(read_packed_refs(f)))
  385. def test_read_without_peeled_errors(self):
  386. f = StringIO('%s ref/1\n^%s' % (ONES, TWOS))
  387. self.assertRaises(errors.PackedRefsException, list, read_packed_refs(f))
  388. def test_read_with_peeled(self):
  389. f = StringIO('%s ref/1\n%s ref/2\n^%s\n%s ref/4' % (
  390. ONES, TWOS, THREES, FOURS))
  391. self.assertEqual([
  392. (ONES, 'ref/1', None),
  393. (TWOS, 'ref/2', THREES),
  394. (FOURS, 'ref/4', None),
  395. ], list(read_packed_refs_with_peeled(f)))
  396. def test_read_with_peeled_errors(self):
  397. f = StringIO('^%s\n%s ref/1' % (TWOS, ONES))
  398. self.assertRaises(errors.PackedRefsException, list, read_packed_refs(f))
  399. f = StringIO('%s ref/1\n^%s\n^%s' % (ONES, TWOS, THREES))
  400. self.assertRaises(errors.PackedRefsException, list, read_packed_refs(f))
  401. def test_write_with_peeled(self):
  402. f = StringIO()
  403. write_packed_refs(f, {'ref/1': ONES, 'ref/2': TWOS},
  404. {'ref/1': THREES})
  405. self.assertEqual(
  406. "# pack-refs with: peeled\n%s ref/1\n^%s\n%s ref/2\n" % (
  407. ONES, THREES, TWOS), f.getvalue())
  408. def test_write_without_peeled(self):
  409. f = StringIO()
  410. write_packed_refs(f, {'ref/1': ONES, 'ref/2': TWOS})
  411. self.assertEqual("%s ref/1\n%s ref/2\n" % (ONES, TWOS), f.getvalue())
  412. # Dict of refs that we expect all RefsContainerTests subclasses to define.
  413. _TEST_REFS = {
  414. 'HEAD': '42d06bd4b77fed026b154d16493e5deab78f02ec',
  415. 'refs/heads/master': '42d06bd4b77fed026b154d16493e5deab78f02ec',
  416. 'refs/heads/packed': '42d06bd4b77fed026b154d16493e5deab78f02ec',
  417. 'refs/tags/refs-0.1': 'df6800012397fb85c56e7418dd4eb9405dee075c',
  418. 'refs/tags/refs-0.2': '3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8',
  419. }
  420. class RefsContainerTests(object):
  421. def test_keys(self):
  422. actual_keys = set(self._refs.keys())
  423. self.assertEqual(set(self._refs.allkeys()), actual_keys)
  424. # ignore the symref loop if it exists
  425. actual_keys.discard('refs/heads/loop')
  426. self.assertEqual(set(_TEST_REFS.iterkeys()), actual_keys)
  427. actual_keys = self._refs.keys('refs/heads')
  428. actual_keys.discard('loop')
  429. self.assertEqual(['master', 'packed'], sorted(actual_keys))
  430. self.assertEqual(['refs-0.1', 'refs-0.2'],
  431. sorted(self._refs.keys('refs/tags')))
  432. def test_as_dict(self):
  433. # refs/heads/loop does not show up even if it exists
  434. self.assertEqual(_TEST_REFS, self._refs.as_dict())
  435. def test_setitem(self):
  436. self._refs['refs/some/ref'] = '42d06bd4b77fed026b154d16493e5deab78f02ec'
  437. self.assertEqual('42d06bd4b77fed026b154d16493e5deab78f02ec',
  438. self._refs['refs/some/ref'])
  439. def test_set_if_equals(self):
  440. nines = '9' * 40
  441. self.assertFalse(self._refs.set_if_equals('HEAD', 'c0ffee', nines))
  442. self.assertEqual('42d06bd4b77fed026b154d16493e5deab78f02ec',
  443. self._refs['HEAD'])
  444. self.assertTrue(self._refs.set_if_equals(
  445. 'HEAD', '42d06bd4b77fed026b154d16493e5deab78f02ec', nines))
  446. self.assertEqual(nines, self._refs['HEAD'])
  447. self.assertTrue(self._refs.set_if_equals('refs/heads/master', None,
  448. nines))
  449. self.assertEqual(nines, self._refs['refs/heads/master'])
  450. def test_add_if_new(self):
  451. nines = '9' * 40
  452. self.assertFalse(self._refs.add_if_new('refs/heads/master', nines))
  453. self.assertEqual('42d06bd4b77fed026b154d16493e5deab78f02ec',
  454. self._refs['refs/heads/master'])
  455. self.assertTrue(self._refs.add_if_new('refs/some/ref', nines))
  456. self.assertEqual(nines, self._refs['refs/some/ref'])
  457. def test_set_symbolic_ref(self):
  458. self._refs.set_symbolic_ref('refs/heads/symbolic', 'refs/heads/master')
  459. self.assertEqual('ref: refs/heads/master',
  460. self._refs.read_loose_ref('refs/heads/symbolic'))
  461. self.assertEqual('42d06bd4b77fed026b154d16493e5deab78f02ec',
  462. self._refs['refs/heads/symbolic'])
  463. def test_set_symbolic_ref_overwrite(self):
  464. nines = '9' * 40
  465. self.assertFalse('refs/heads/symbolic' in self._refs)
  466. self._refs['refs/heads/symbolic'] = nines
  467. self.assertEqual(nines, self._refs.read_loose_ref('refs/heads/symbolic'))
  468. self._refs.set_symbolic_ref('refs/heads/symbolic', 'refs/heads/master')
  469. self.assertEqual('ref: refs/heads/master',
  470. self._refs.read_loose_ref('refs/heads/symbolic'))
  471. self.assertEqual('42d06bd4b77fed026b154d16493e5deab78f02ec',
  472. self._refs['refs/heads/symbolic'])
  473. def test_check_refname(self):
  474. try:
  475. self._refs._check_refname('HEAD')
  476. except KeyError:
  477. self.fail()
  478. try:
  479. self._refs._check_refname('refs/heads/foo')
  480. except KeyError:
  481. self.fail()
  482. self.assertRaises(KeyError, self._refs._check_refname, 'refs')
  483. self.assertRaises(KeyError, self._refs._check_refname, 'notrefs/foo')
  484. def test_contains(self):
  485. self.assertTrue('refs/heads/master' in self._refs)
  486. self.assertFalse('refs/heads/bar' in self._refs)
  487. def test_delitem(self):
  488. self.assertEqual('42d06bd4b77fed026b154d16493e5deab78f02ec',
  489. self._refs['refs/heads/master'])
  490. del self._refs['refs/heads/master']
  491. self.assertRaises(KeyError, lambda: self._refs['refs/heads/master'])
  492. def test_remove_if_equals(self):
  493. self.assertFalse(self._refs.remove_if_equals('HEAD', 'c0ffee'))
  494. self.assertEqual('42d06bd4b77fed026b154d16493e5deab78f02ec',
  495. self._refs['HEAD'])
  496. self.assertTrue(self._refs.remove_if_equals(
  497. 'refs/tags/refs-0.2', '3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8'))
  498. self.assertFalse('refs/tags/refs-0.2' in self._refs)
  499. class DictRefsContainerTests(RefsContainerTests, unittest.TestCase):
  500. def setUp(self):
  501. self._refs = DictRefsContainer(dict(_TEST_REFS))
  502. class DiskRefsContainerTests(RefsContainerTests, unittest.TestCase):
  503. def setUp(self):
  504. self._repo = open_repo('refs.git')
  505. self._refs = self._repo.refs
  506. def tearDown(self):
  507. tear_down_repo(self._repo)
  508. def test_get_packed_refs(self):
  509. self.assertEqual({
  510. 'refs/heads/packed': '42d06bd4b77fed026b154d16493e5deab78f02ec',
  511. 'refs/tags/refs-0.1': 'df6800012397fb85c56e7418dd4eb9405dee075c',
  512. }, self._refs.get_packed_refs())
  513. def test_get_peeled_not_packed(self):
  514. # not packed
  515. self.assertEqual(None, self._refs.get_peeled('refs/tags/refs-0.2'))
  516. self.assertEqual('3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8',
  517. self._refs['refs/tags/refs-0.2'])
  518. # packed, known not peelable
  519. self.assertEqual(self._refs['refs/heads/packed'],
  520. self._refs.get_peeled('refs/heads/packed'))
  521. # packed, peeled
  522. self.assertEqual('42d06bd4b77fed026b154d16493e5deab78f02ec',
  523. self._refs.get_peeled('refs/tags/refs-0.1'))
  524. def test_setitem(self):
  525. RefsContainerTests.test_setitem(self)
  526. f = open(os.path.join(self._refs.path, 'refs', 'some', 'ref'), 'rb')
  527. self.assertEqual('42d06bd4b77fed026b154d16493e5deab78f02ec',
  528. f.read()[:40])
  529. f.close()
  530. def test_setitem_symbolic(self):
  531. ones = '1' * 40
  532. self._refs['HEAD'] = ones
  533. self.assertEqual(ones, self._refs['HEAD'])
  534. # ensure HEAD was not modified
  535. f = open(os.path.join(self._refs.path, 'HEAD'), 'rb')
  536. self.assertEqual('ref: refs/heads/master', iter(f).next().rstrip('\n'))
  537. f.close()
  538. # ensure the symbolic link was written through
  539. f = open(os.path.join(self._refs.path, 'refs', 'heads', 'master'), 'rb')
  540. self.assertEqual(ones, f.read()[:40])
  541. f.close()
  542. def test_set_if_equals(self):
  543. RefsContainerTests.test_set_if_equals(self)
  544. # ensure symref was followed
  545. self.assertEqual('9' * 40, self._refs['refs/heads/master'])
  546. # ensure lockfile was deleted
  547. self.assertFalse(os.path.exists(
  548. os.path.join(self._refs.path, 'refs', 'heads', 'master.lock')))
  549. self.assertFalse(os.path.exists(
  550. os.path.join(self._refs.path, 'HEAD.lock')))
  551. def test_add_if_new_packed(self):
  552. # don't overwrite packed ref
  553. self.assertFalse(self._refs.add_if_new('refs/tags/refs-0.1', '9' * 40))
  554. self.assertEqual('df6800012397fb85c56e7418dd4eb9405dee075c',
  555. self._refs['refs/tags/refs-0.1'])
  556. def test_add_if_new_symbolic(self):
  557. # Use an empty repo instead of the default.
  558. tear_down_repo(self._repo)
  559. repo_dir = os.path.join(tempfile.mkdtemp(), 'test')
  560. os.makedirs(repo_dir)
  561. self._repo = Repo.init(repo_dir)
  562. refs = self._repo.refs
  563. nines = '9' * 40
  564. self.assertEqual('ref: refs/heads/master', refs.read_ref('HEAD'))
  565. self.assertFalse('refs/heads/master' in refs)
  566. self.assertTrue(refs.add_if_new('HEAD', nines))
  567. self.assertEqual('ref: refs/heads/master', refs.read_ref('HEAD'))
  568. self.assertEqual(nines, refs['HEAD'])
  569. self.assertEqual(nines, refs['refs/heads/master'])
  570. self.assertFalse(refs.add_if_new('HEAD', '1' * 40))
  571. self.assertEqual(nines, refs['HEAD'])
  572. self.assertEqual(nines, refs['refs/heads/master'])
  573. def test_follow(self):
  574. self.assertEquals(
  575. ('refs/heads/master', '42d06bd4b77fed026b154d16493e5deab78f02ec'),
  576. self._refs._follow('HEAD'))
  577. self.assertEquals(
  578. ('refs/heads/master', '42d06bd4b77fed026b154d16493e5deab78f02ec'),
  579. self._refs._follow('refs/heads/master'))
  580. self.assertRaises(KeyError, self._refs._follow, 'notrefs/foo')
  581. self.assertRaises(KeyError, self._refs._follow, 'refs/heads/loop')
  582. def test_delitem(self):
  583. RefsContainerTests.test_delitem(self)
  584. ref_file = os.path.join(self._refs.path, 'refs', 'heads', 'master')
  585. self.assertFalse(os.path.exists(ref_file))
  586. self.assertFalse('refs/heads/master' in self._refs.get_packed_refs())
  587. def test_delitem_symbolic(self):
  588. self.assertEqual('ref: refs/heads/master',
  589. self._refs.read_loose_ref('HEAD'))
  590. del self._refs['HEAD']
  591. self.assertRaises(KeyError, lambda: self._refs['HEAD'])
  592. self.assertEqual('42d06bd4b77fed026b154d16493e5deab78f02ec',
  593. self._refs['refs/heads/master'])
  594. self.assertFalse(os.path.exists(os.path.join(self._refs.path, 'HEAD')))
  595. def test_remove_if_equals_symref(self):
  596. # HEAD is a symref, so shouldn't equal its dereferenced value
  597. self.assertFalse(self._refs.remove_if_equals(
  598. 'HEAD', '42d06bd4b77fed026b154d16493e5deab78f02ec'))
  599. self.assertTrue(self._refs.remove_if_equals(
  600. 'refs/heads/master', '42d06bd4b77fed026b154d16493e5deab78f02ec'))
  601. self.assertRaises(KeyError, lambda: self._refs['refs/heads/master'])
  602. # HEAD is now a broken symref
  603. self.assertRaises(KeyError, lambda: self._refs['HEAD'])
  604. self.assertEqual('ref: refs/heads/master',
  605. self._refs.read_loose_ref('HEAD'))
  606. self.assertFalse(os.path.exists(
  607. os.path.join(self._refs.path, 'refs', 'heads', 'master.lock')))
  608. self.assertFalse(os.path.exists(
  609. os.path.join(self._refs.path, 'HEAD.lock')))
  610. def test_remove_if_equals_packed(self):
  611. # test removing ref that is only packed
  612. self.assertEqual('df6800012397fb85c56e7418dd4eb9405dee075c',
  613. self._refs['refs/tags/refs-0.1'])
  614. self.assertTrue(
  615. self._refs.remove_if_equals('refs/tags/refs-0.1',
  616. 'df6800012397fb85c56e7418dd4eb9405dee075c'))
  617. self.assertRaises(KeyError, lambda: self._refs['refs/tags/refs-0.1'])
  618. def test_read_ref(self):
  619. self.assertEqual('ref: refs/heads/master', self._refs.read_ref("HEAD"))
  620. self.assertEqual('42d06bd4b77fed026b154d16493e5deab78f02ec',
  621. self._refs.read_ref("refs/heads/packed"))
  622. self.assertEqual(None,
  623. self._refs.read_ref("nonexistant"))