test_repository.py 47 KB


  1. # test_repository.py -- tests for repository.py
  2. # Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
  3. #
  4. # This program is free software; you can redistribute it and/or
  5. # modify it under the terms of the GNU General Public License
  6. # as published by the Free Software Foundation; version 2
  7. # of the License or (at your option) any later version of
  8. # the License.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program; if not, write to the Free Software
  17. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  18. # MA 02110-1301, USA.
  19. """Tests for the repository."""
  20. from cStringIO import StringIO
  21. import os
  22. import stat
  23. import shutil
  24. import tempfile
  25. import warnings
  26. from dulwich import errors
  27. from dulwich.file import (
  28. GitFile,
  29. )
  30. from dulwich.object_store import (
  31. tree_lookup_path,
  32. )
  33. from dulwich import objects
  34. from dulwich.config import Config
  35. from dulwich.refs import (
  36. _split_ref_line,
  37. )
  38. from dulwich.repo import (
  39. check_ref_format,
  40. DictRefsContainer,
  41. InfoRefsContainer,
  42. Repo,
  43. MemoryRepo,
  44. read_packed_refs,
  45. read_packed_refs_with_peeled,
  46. write_packed_refs,
  47. )
  48. from dulwich.tests import (
  49. TestCase,
  50. )
  51. from dulwich.tests.utils import (
  52. open_repo,
  53. tear_down_repo,
  54. setup_warning_catcher,
  55. )
  56. missing_sha = 'b91fa4d900e17e99b433218e988c4eb4a3e9a097'
  57. class CreateRepositoryTests(TestCase):
  58. def assertFileContentsEqual(self, expected, repo, path):
  59. f = repo.get_named_file(path)
  60. if not f:
  61. self.assertEqual(expected, None)
  62. else:
  63. try:
  64. self.assertEqual(expected, f.read())
  65. finally:
  66. f.close()
  67. def _check_repo_contents(self, repo, expect_bare):
  68. self.assertEqual(expect_bare, repo.bare)
  69. self.assertFileContentsEqual('Unnamed repository', repo, 'description')
  70. self.assertFileContentsEqual('', repo, os.path.join('info', 'exclude'))
  71. self.assertFileContentsEqual(None, repo, 'nonexistent file')
  72. barestr = 'bare = %s' % str(expect_bare).lower()
  73. config_text = repo.get_named_file('config').read()
  74. self.assertTrue(barestr in config_text, "%r" % config_text)
  75. def test_create_disk_bare(self):
  76. tmp_dir = tempfile.mkdtemp()
  77. self.addCleanup(shutil.rmtree, tmp_dir)
  78. repo = Repo.init_bare(tmp_dir)
  79. self.assertEqual(tmp_dir, repo._controldir)
  80. self._check_repo_contents(repo, True)
  81. def test_create_disk_non_bare(self):
  82. tmp_dir = tempfile.mkdtemp()
  83. self.addCleanup(shutil.rmtree, tmp_dir)
  84. repo = Repo.init(tmp_dir)
  85. self.assertEqual(os.path.join(tmp_dir, '.git'), repo._controldir)
  86. self._check_repo_contents(repo, False)
  87. def test_create_memory(self):
  88. repo = MemoryRepo.init_bare([], {})
  89. self._check_repo_contents(repo, True)
  90. class RepositoryTests(TestCase):
  91. def setUp(self):
  92. super(RepositoryTests, self).setUp()
  93. self._repo = None
  94. def tearDown(self):
  95. if self._repo is not None:
  96. tear_down_repo(self._repo)
  97. super(RepositoryTests, self).tearDown()
  98. def test_simple_props(self):
  99. r = self._repo = open_repo('a.git')
  100. self.assertEqual(r.controldir(), r.path)
  101. def test_ref(self):
  102. r = self._repo = open_repo('a.git')
  103. warnings.simplefilter("ignore", DeprecationWarning)
  104. self.assertEqual(r.ref('refs/heads/master'),
  105. 'a90fa2d900a17e99b433217e988c4eb4a2e9a097')
  106. def test_setitem(self):
  107. r = self._repo = open_repo('a.git')
  108. r["refs/tags/foo"] = 'a90fa2d900a17e99b433217e988c4eb4a2e9a097'
  109. self.assertEqual('a90fa2d900a17e99b433217e988c4eb4a2e9a097',
  110. r["refs/tags/foo"].id)
  111. def test_delitem(self):
  112. r = self._repo = open_repo('a.git')
  113. del r['refs/heads/master']
  114. self.assertRaises(KeyError, lambda: r['refs/heads/master'])
  115. del r['HEAD']
  116. self.assertRaises(KeyError, lambda: r['HEAD'])
  117. self.assertRaises(ValueError, r.__delitem__, 'notrefs/foo')
  118. def test_get_refs(self):
  119. r = self._repo = open_repo('a.git')
  120. self.assertEqual({
  121. 'HEAD': 'a90fa2d900a17e99b433217e988c4eb4a2e9a097',
  122. 'refs/heads/master': 'a90fa2d900a17e99b433217e988c4eb4a2e9a097',
  123. 'refs/tags/mytag': '28237f4dc30d0d462658d6b937b08a0f0b6ef55a',
  124. 'refs/tags/mytag-packed': 'b0931cadc54336e78a1d980420e3268903b57a50',
  125. }, r.get_refs())
  126. def test_head(self):
  127. r = self._repo = open_repo('a.git')
  128. self.assertEqual(r.head(), 'a90fa2d900a17e99b433217e988c4eb4a2e9a097')
  129. def test_get_object(self):
  130. r = self._repo = open_repo('a.git')
  131. obj = r.get_object(r.head())
  132. self.assertEqual(obj.type_name, 'commit')
  133. def test_get_object_non_existant(self):
  134. r = self._repo = open_repo('a.git')
  135. self.assertRaises(KeyError, r.get_object, missing_sha)
  136. def test_contains_object(self):
  137. r = self._repo = open_repo('a.git')
  138. self.assertTrue(r.head() in r)
  139. def test_contains_ref(self):
  140. r = self._repo = open_repo('a.git')
  141. self.assertTrue("HEAD" in r)
  142. def test_get_no_description(self):
  143. r = self._repo = open_repo('a.git')
  144. self.assertIs(None, r.get_description())
  145. def test_get_description(self):
  146. r = self._repo = open_repo('a.git')
  147. f = open(os.path.join(r.path, 'description'), 'w')
  148. try:
  149. f.write("Some description")
  150. finally:
  151. f.close()
  152. self.assertEquals("Some description", r.get_description())
  153. def test_set_description(self):
  154. r = self._repo = open_repo('a.git')
  155. description = "Some description"
  156. r.set_description(description)
  157. self.assertEquals(description, r.get_description())
  158. def test_contains_missing(self):
  159. r = self._repo = open_repo('a.git')
  160. self.assertFalse("bar" in r)
  161. def test_commit(self):
  162. r = self._repo = open_repo('a.git')
  163. warnings.simplefilter("ignore", DeprecationWarning)
  164. self.addCleanup(warnings.resetwarnings)
  165. obj = r.commit(r.head())
  166. self.assertEqual(obj.type_name, 'commit')
  167. def test_commit_not_commit(self):
  168. r = self._repo = open_repo('a.git')
  169. warnings.simplefilter("ignore", DeprecationWarning)
  170. self.addCleanup(warnings.resetwarnings)
  171. self.assertRaises(errors.NotCommitError,
  172. r.commit, '4f2e6529203aa6d44b5af6e3292c837ceda003f9')
  173. def test_tree(self):
  174. r = self._repo = open_repo('a.git')
  175. commit = r[r.head()]
  176. warnings.simplefilter("ignore", DeprecationWarning)
  177. self.addCleanup(warnings.resetwarnings)
  178. tree = r.tree(commit.tree)
  179. self.assertEqual(tree.type_name, 'tree')
  180. self.assertEqual(tree.sha().hexdigest(), commit.tree)
  181. def test_tree_not_tree(self):
  182. r = self._repo = open_repo('a.git')
  183. warnings.simplefilter("ignore", DeprecationWarning)
  184. self.addCleanup(warnings.resetwarnings)
  185. self.assertRaises(errors.NotTreeError, r.tree, r.head())
  186. def test_tag(self):
  187. r = self._repo = open_repo('a.git')
  188. tag_sha = '28237f4dc30d0d462658d6b937b08a0f0b6ef55a'
  189. warnings.simplefilter("ignore", DeprecationWarning)
  190. self.addCleanup(warnings.resetwarnings)
  191. tag = r.tag(tag_sha)
  192. self.assertEqual(tag.type_name, 'tag')
  193. self.assertEqual(tag.sha().hexdigest(), tag_sha)
  194. obj_class, obj_sha = tag.object
  195. self.assertEqual(obj_class, objects.Commit)
  196. self.assertEqual(obj_sha, r.head())
  197. def test_tag_not_tag(self):
  198. r = self._repo = open_repo('a.git')
  199. warnings.simplefilter("ignore", DeprecationWarning)
  200. self.addCleanup(warnings.resetwarnings)
  201. self.assertRaises(errors.NotTagError, r.tag, r.head())
  202. def test_get_peeled(self):
  203. # unpacked ref
  204. r = self._repo = open_repo('a.git')
  205. tag_sha = '28237f4dc30d0d462658d6b937b08a0f0b6ef55a'
  206. self.assertNotEqual(r[tag_sha].sha().hexdigest(), r.head())
  207. self.assertEqual(r.get_peeled('refs/tags/mytag'), r.head())
  208. # packed ref with cached peeled value
  209. packed_tag_sha = 'b0931cadc54336e78a1d980420e3268903b57a50'
  210. parent_sha = r[r.head()].parents[0]
  211. self.assertNotEqual(r[packed_tag_sha].sha().hexdigest(), parent_sha)
  212. self.assertEqual(r.get_peeled('refs/tags/mytag-packed'), parent_sha)
  213. # TODO: add more corner cases to test repo
  214. def test_get_peeled_not_tag(self):
  215. r = self._repo = open_repo('a.git')
  216. self.assertEqual(r.get_peeled('HEAD'), r.head())
  217. def test_get_blob(self):
  218. r = self._repo = open_repo('a.git')
  219. commit = r[r.head()]
  220. tree = r[commit.tree]
  221. blob_sha = tree.items()[0][2]
  222. warnings.simplefilter("ignore", DeprecationWarning)
  223. self.addCleanup(warnings.resetwarnings)
  224. blob = r.get_blob(blob_sha)
  225. self.assertEqual(blob.type_name, 'blob')
  226. self.assertEqual(blob.sha().hexdigest(), blob_sha)
  227. def test_get_blob_notblob(self):
  228. r = self._repo = open_repo('a.git')
  229. warnings.simplefilter("ignore", DeprecationWarning)
  230. self.addCleanup(warnings.resetwarnings)
  231. self.assertRaises(errors.NotBlobError, r.get_blob, r.head())
  232. def test_get_walker(self):
  233. r = self._repo = open_repo('a.git')
  234. # include defaults to [r.head()]
  235. self.assertEqual([e.commit.id for e in r.get_walker()],
  236. [r.head(), '2a72d929692c41d8554c07f6301757ba18a65d91'])
  237. self.assertEqual(
  238. [e.commit.id for e in r.get_walker(['2a72d929692c41d8554c07f6301757ba18a65d91'])],
  239. ['2a72d929692c41d8554c07f6301757ba18a65d91'])
  240. self.assertEqual(
  241. [e.commit.id for e in r.get_walker('2a72d929692c41d8554c07f6301757ba18a65d91')],
  242. ['2a72d929692c41d8554c07f6301757ba18a65d91'])
  243. def test_linear_history(self):
  244. r = self._repo = open_repo('a.git')
  245. warnings.simplefilter("ignore", DeprecationWarning)
  246. self.addCleanup(warnings.resetwarnings)
  247. history = r.revision_history(r.head())
  248. shas = [c.sha().hexdigest() for c in history]
  249. self.assertEqual(shas, [r.head(),
  250. '2a72d929692c41d8554c07f6301757ba18a65d91'])
  251. def test_clone(self):
  252. r = self._repo = open_repo('a.git')
  253. tmp_dir = tempfile.mkdtemp()
  254. self.addCleanup(shutil.rmtree, tmp_dir)
  255. t = r.clone(tmp_dir, mkdir=False)
  256. self.assertEqual({
  257. 'HEAD': 'a90fa2d900a17e99b433217e988c4eb4a2e9a097',
  258. 'refs/remotes/origin/master':
  259. 'a90fa2d900a17e99b433217e988c4eb4a2e9a097',
  260. 'refs/heads/master': 'a90fa2d900a17e99b433217e988c4eb4a2e9a097',
  261. 'refs/tags/mytag': '28237f4dc30d0d462658d6b937b08a0f0b6ef55a',
  262. 'refs/tags/mytag-packed':
  263. 'b0931cadc54336e78a1d980420e3268903b57a50',
  264. }, t.refs.as_dict())
  265. shas = [e.commit.id for e in r.get_walker()]
  266. self.assertEqual(shas, [t.head(),
  267. '2a72d929692c41d8554c07f6301757ba18a65d91'])
  268. def test_clone_no_head(self):
  269. temp_dir = tempfile.mkdtemp()
  270. self.addCleanup(shutil.rmtree, temp_dir)
  271. repo_dir = os.path.join(os.path.dirname(__file__), 'data', 'repos')
  272. dest_dir = os.path.join(temp_dir, 'a.git')
  273. shutil.copytree(os.path.join(repo_dir, 'a.git'),
  274. dest_dir, symlinks=True)
  275. r = Repo(dest_dir)
  276. del r.refs["refs/heads/master"]
  277. del r.refs["HEAD"]
  278. t = r.clone(os.path.join(temp_dir, 'b.git'), mkdir=True)
  279. self.assertEqual({
  280. 'refs/tags/mytag': '28237f4dc30d0d462658d6b937b08a0f0b6ef55a',
  281. 'refs/tags/mytag-packed':
  282. 'b0931cadc54336e78a1d980420e3268903b57a50',
  283. }, t.refs.as_dict())
  284. def test_clone_empty(self):
  285. """Test clone() doesn't crash if HEAD points to a non-existing ref.
  286. This simulates cloning server-side bare repository either when it is
  287. still empty or if user renames master branch and pushes private repo
  288. to the server.
  289. Non-bare repo HEAD always points to an existing ref.
  290. """
  291. r = self._repo = open_repo('empty.git')
  292. tmp_dir = tempfile.mkdtemp()
  293. self.addCleanup(shutil.rmtree, tmp_dir)
  294. r.clone(tmp_dir, mkdir=False, bare=True)
  295. def test_merge_history(self):
  296. r = self._repo = open_repo('simple_merge.git')
  297. shas = [e.commit.id for e in r.get_walker()]
  298. self.assertEqual(shas, ['5dac377bdded4c9aeb8dff595f0faeebcc8498cc',
  299. 'ab64bbdcc51b170d21588e5c5d391ee5c0c96dfd',
  300. '4cffe90e0a41ad3f5190079d7c8f036bde29cbe6',
  301. '60dacdc733de308bb77bb76ce0fb0f9b44c9769e',
  302. '0d89f20333fbb1d2f3a94da77f4981373d8f4310'])
  303. def test_revision_history_missing_commit(self):
  304. r = self._repo = open_repo('simple_merge.git')
  305. warnings.simplefilter("ignore", DeprecationWarning)
  306. self.addCleanup(warnings.resetwarnings)
  307. self.assertRaises(errors.MissingCommitError, r.revision_history,
  308. missing_sha)
  309. def test_out_of_order_merge(self):
  310. """Test that revision history is ordered by date, not parent order."""
  311. r = self._repo = open_repo('ooo_merge.git')
  312. shas = [e.commit.id for e in r.get_walker()]
  313. self.assertEqual(shas, ['7601d7f6231db6a57f7bbb79ee52e4d462fd44d1',
  314. 'f507291b64138b875c28e03469025b1ea20bc614',
  315. 'fb5b0425c7ce46959bec94d54b9a157645e114f5',
  316. 'f9e39b120c68182a4ba35349f832d0e4e61f485c'])
  317. def test_get_tags_empty(self):
  318. r = self._repo = open_repo('ooo_merge.git')
  319. self.assertEqual({}, r.refs.as_dict('refs/tags'))
  320. def test_get_config(self):
  321. r = self._repo = open_repo('ooo_merge.git')
  322. self.assertIsInstance(r.get_config(), Config)
  323. def test_get_config_stack(self):
  324. r = self._repo = open_repo('ooo_merge.git')
  325. self.assertIsInstance(r.get_config_stack(), Config)
  326. def test_submodule(self):
  327. temp_dir = tempfile.mkdtemp()
  328. repo_dir = os.path.join(os.path.dirname(__file__), 'data', 'repos')
  329. shutil.copytree(os.path.join(repo_dir, 'a.git'),
  330. os.path.join(temp_dir, 'a.git'), symlinks=True)
  331. rel = os.path.relpath(os.path.join(repo_dir, 'submodule'), temp_dir)
  332. os.symlink(os.path.join(rel, 'dotgit'), os.path.join(temp_dir, '.git'))
  333. r = Repo(temp_dir)
  334. self.assertEqual(r.head(), 'a90fa2d900a17e99b433217e988c4eb4a2e9a097')
  335. def test_common_revisions(self):
  336. """
  337. This test demonstrates that ``find_common_revisions()`` actually returns
  338. common heads, not revisions; dulwich already uses
  339. ``find_common_revisions()`` in such a manner (see
  340. ``Repo.fetch_objects()``).
  341. """
  342. expected_shas = set(['60dacdc733de308bb77bb76ce0fb0f9b44c9769e'])
  343. # Source for objects.
  344. r_base = open_repo('simple_merge.git')
  345. # Re-create each-side of the merge in simple_merge.git.
  346. #
  347. # Since the trees and blobs are missing, the repository created is
  348. # corrupted, but we're only checking for commits for the purpose of this
  349. # test, so it's immaterial.
  350. r1_dir = tempfile.mkdtemp()
  351. r1_commits = ['ab64bbdcc51b170d21588e5c5d391ee5c0c96dfd', # HEAD
  352. '60dacdc733de308bb77bb76ce0fb0f9b44c9769e',
  353. '0d89f20333fbb1d2f3a94da77f4981373d8f4310']
  354. r2_dir = tempfile.mkdtemp()
  355. r2_commits = ['4cffe90e0a41ad3f5190079d7c8f036bde29cbe6', # HEAD
  356. '60dacdc733de308bb77bb76ce0fb0f9b44c9769e',
  357. '0d89f20333fbb1d2f3a94da77f4981373d8f4310']
  358. try:
  359. r1 = Repo.init_bare(r1_dir)
  360. map(lambda c: r1.object_store.add_object(r_base.get_object(c)), \
  361. r1_commits)
  362. r1.refs['HEAD'] = r1_commits[0]
  363. r2 = Repo.init_bare(r2_dir)
  364. map(lambda c: r2.object_store.add_object(r_base.get_object(c)), \
  365. r2_commits)
  366. r2.refs['HEAD'] = r2_commits[0]
  367. # Finally, the 'real' testing!
  368. shas = r2.object_store.find_common_revisions(r1.get_graph_walker())
  369. self.assertEqual(set(shas), expected_shas)
  370. shas = r1.object_store.find_common_revisions(r2.get_graph_walker())
  371. self.assertEqual(set(shas), expected_shas)
  372. finally:
  373. shutil.rmtree(r1_dir)
  374. shutil.rmtree(r2_dir)
  375. def test_shell_hook_pre_commit(self):
  376. if os.name != 'posix':
  377. self.skipTest('shell hook tests requires POSIX shell')
  378. pre_commit_fail = """#!/bin/sh
  379. exit 1
  380. """
  381. pre_commit_success = """#!/bin/sh
  382. exit 0
  383. """
  384. repo_dir = os.path.join(tempfile.mkdtemp())
  385. r = Repo.init(repo_dir)
  386. self.addCleanup(shutil.rmtree, repo_dir)
  387. pre_commit = os.path.join(r.controldir(), 'hooks', 'pre-commit')
  388. f = open(pre_commit, 'wb')
  389. try:
  390. f.write(pre_commit_fail)
  391. finally:
  392. f.close()
  393. os.chmod(pre_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
  394. self.assertRaises(errors.CommitError, r.do_commit, 'failed commit',
  395. committer='Test Committer <test@nodomain.com>',
  396. author='Test Author <test@nodomain.com>',
  397. commit_timestamp=12345, commit_timezone=0,
  398. author_timestamp=12345, author_timezone=0)
  399. f = open(pre_commit, 'wb')
  400. try:
  401. f.write(pre_commit_success)
  402. finally:
  403. f.close()
  404. os.chmod(pre_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
  405. commit_sha = r.do_commit(
  406. 'empty commit',
  407. committer='Test Committer <test@nodomain.com>',
  408. author='Test Author <test@nodomain.com>',
  409. commit_timestamp=12395, commit_timezone=0,
  410. author_timestamp=12395, author_timezone=0)
  411. self.assertEqual([], r[commit_sha].parents)
  412. def test_shell_hook_commit_msg(self):
  413. if os.name != 'posix':
  414. self.skipTest('shell hook tests requires POSIX shell')
  415. commit_msg_fail = """#!/bin/sh
  416. exit 1
  417. """
  418. commit_msg_success = """#!/bin/sh
  419. exit 0
  420. """
  421. repo_dir = os.path.join(tempfile.mkdtemp())
  422. r = Repo.init(repo_dir)
  423. self.addCleanup(shutil.rmtree, repo_dir)
  424. commit_msg = os.path.join(r.controldir(), 'hooks', 'commit-msg')
  425. f = open(commit_msg, 'wb')
  426. try:
  427. f.write(commit_msg_fail)
  428. finally:
  429. f.close()
  430. os.chmod(commit_msg, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
  431. self.assertRaises(errors.CommitError, r.do_commit, 'failed commit',
  432. committer='Test Committer <test@nodomain.com>',
  433. author='Test Author <test@nodomain.com>',
  434. commit_timestamp=12345, commit_timezone=0,
  435. author_timestamp=12345, author_timezone=0)
  436. f = open(commit_msg, 'wb')
  437. try:
  438. f.write(commit_msg_success)
  439. finally:
  440. f.close()
  441. os.chmod(commit_msg, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
  442. commit_sha = r.do_commit(
  443. 'empty commit',
  444. committer='Test Committer <test@nodomain.com>',
  445. author='Test Author <test@nodomain.com>',
  446. commit_timestamp=12395, commit_timezone=0,
  447. author_timestamp=12395, author_timezone=0)
  448. self.assertEqual([], r[commit_sha].parents)
  449. def test_shell_hook_post_commit(self):
  450. if os.name != 'posix':
  451. self.skipTest('shell hook tests requires POSIX shell')
  452. repo_dir = os.path.join(tempfile.mkdtemp())
  453. r = Repo.init(repo_dir)
  454. self.addCleanup(shutil.rmtree, repo_dir)
  455. (fd, path) = tempfile.mkstemp(dir=repo_dir)
  456. post_commit_msg = """#!/bin/sh
  457. unlink %(file)s
  458. """ % {'file': path}
  459. root_sha = r.do_commit(
  460. 'empty commit',
  461. committer='Test Committer <test@nodomain.com>',
  462. author='Test Author <test@nodomain.com>',
  463. commit_timestamp=12345, commit_timezone=0,
  464. author_timestamp=12345, author_timezone=0)
  465. self.assertEqual([], r[root_sha].parents)
  466. post_commit = os.path.join(r.controldir(), 'hooks', 'post-commit')
  467. f = open(post_commit, 'wb')
  468. try:
  469. f.write(post_commit_msg)
  470. finally:
  471. f.close()
  472. os.chmod(post_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
  473. commit_sha = r.do_commit(
  474. 'empty commit',
  475. committer='Test Committer <test@nodomain.com>',
  476. author='Test Author <test@nodomain.com>',
  477. commit_timestamp=12345, commit_timezone=0,
  478. author_timestamp=12345, author_timezone=0)
  479. self.assertEqual([root_sha], r[commit_sha].parents)
  480. self.assertFalse(os.path.exists(path))
  481. post_commit_msg_fail = """#!/bin/sh
  482. exit 1
  483. """
  484. f = open(post_commit, 'wb')
  485. try:
  486. f.write(post_commit_msg_fail)
  487. finally:
  488. f.close()
  489. os.chmod(post_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
  490. warnings.simplefilter("always", UserWarning)
  491. self.addCleanup(warnings.resetwarnings)
  492. warnings_list = setup_warning_catcher()
  493. commit_sha2 = r.do_commit(
  494. 'empty commit',
  495. committer='Test Committer <test@nodomain.com>',
  496. author='Test Author <test@nodomain.com>',
  497. commit_timestamp=12345, commit_timezone=0,
  498. author_timestamp=12345, author_timezone=0)
  499. self.assertEqual(len(warnings_list), 1)
  500. self.assertIsInstance(warnings_list[-1], UserWarning)
  501. self.assertTrue("post-commit hook failed: " in str(warnings_list[-1]))
  502. self.assertEqual([commit_sha], r[commit_sha2].parents)
  503. class BuildRepoTests(TestCase):
  504. """Tests that build on-disk repos from scratch.
  505. Repos live in a temp dir and are torn down after each test. They start with
  506. a single commit in master having single file named 'a'.
  507. """
  508. def setUp(self):
  509. super(BuildRepoTests, self).setUp()
  510. repo_dir = os.path.join(tempfile.mkdtemp(), 'test')
  511. os.makedirs(repo_dir)
  512. r = self._repo = Repo.init(repo_dir)
  513. self.assertFalse(r.bare)
  514. self.assertEqual('ref: refs/heads/master', r.refs.read_ref('HEAD'))
  515. self.assertRaises(KeyError, lambda: r.refs['refs/heads/master'])
  516. f = open(os.path.join(r.path, 'a'), 'wb')
  517. try:
  518. f.write('file contents')
  519. finally:
  520. f.close()
  521. r.stage(['a'])
  522. commit_sha = r.do_commit('msg',
  523. committer='Test Committer <test@nodomain.com>',
  524. author='Test Author <test@nodomain.com>',
  525. commit_timestamp=12345, commit_timezone=0,
  526. author_timestamp=12345, author_timezone=0)
  527. self.assertEqual([], r[commit_sha].parents)
  528. self._root_commit = commit_sha
  529. def tearDown(self):
  530. tear_down_repo(self._repo)
  531. super(BuildRepoTests, self).tearDown()
  532. def test_build_repo(self):
  533. r = self._repo
  534. self.assertEqual('ref: refs/heads/master', r.refs.read_ref('HEAD'))
  535. self.assertEqual(self._root_commit, r.refs['refs/heads/master'])
  536. expected_blob = objects.Blob.from_string('file contents')
  537. self.assertEqual(expected_blob.data, r[expected_blob.id].data)
  538. actual_commit = r[self._root_commit]
  539. self.assertEqual('msg', actual_commit.message)
  540. def test_commit_modified(self):
  541. r = self._repo
  542. f = open(os.path.join(r.path, 'a'), 'wb')
  543. try:
  544. f.write('new contents')
  545. finally:
  546. f.close()
  547. r.stage(['a'])
  548. commit_sha = r.do_commit('modified a',
  549. committer='Test Committer <test@nodomain.com>',
  550. author='Test Author <test@nodomain.com>',
  551. commit_timestamp=12395, commit_timezone=0,
  552. author_timestamp=12395, author_timezone=0)
  553. self.assertEqual([self._root_commit], r[commit_sha].parents)
  554. _, blob_id = tree_lookup_path(r.get_object, r[commit_sha].tree, 'a')
  555. self.assertEqual('new contents', r[blob_id].data)
  556. def test_commit_deleted(self):
  557. r = self._repo
  558. os.remove(os.path.join(r.path, 'a'))
  559. r.stage(['a'])
  560. commit_sha = r.do_commit('deleted a',
  561. committer='Test Committer <test@nodomain.com>',
  562. author='Test Author <test@nodomain.com>',
  563. commit_timestamp=12395, commit_timezone=0,
  564. author_timestamp=12395, author_timezone=0)
  565. self.assertEqual([self._root_commit], r[commit_sha].parents)
  566. self.assertEqual([], list(r.open_index()))
  567. tree = r[r[commit_sha].tree]
  568. self.assertEqual([], list(tree.iteritems()))
  569. def test_commit_encoding(self):
  570. r = self._repo
  571. commit_sha = r.do_commit('commit with strange character \xee',
  572. committer='Test Committer <test@nodomain.com>',
  573. author='Test Author <test@nodomain.com>',
  574. commit_timestamp=12395, commit_timezone=0,
  575. author_timestamp=12395, author_timezone=0,
  576. encoding="iso8859-1")
  577. self.assertEqual("iso8859-1", r[commit_sha].encoding)
  578. def test_commit_config_identity(self):
  579. # commit falls back to the users' identity if it wasn't specified
  580. r = self._repo
  581. c = r.get_config()
  582. c.set(("user", ), "name", "Jelmer")
  583. c.set(("user", ), "email", "jelmer@apache.org")
  584. c.write_to_path()
  585. commit_sha = r.do_commit('message')
  586. self.assertEqual(
  587. "Jelmer <jelmer@apache.org>",
  588. r[commit_sha].author)
  589. self.assertEqual(
  590. "Jelmer <jelmer@apache.org>",
  591. r[commit_sha].committer)
  592. def test_commit_fail_ref(self):
  593. r = self._repo
  594. def set_if_equals(name, old_ref, new_ref):
  595. return False
  596. r.refs.set_if_equals = set_if_equals
  597. def add_if_new(name, new_ref):
  598. self.fail('Unexpected call to add_if_new')
  599. r.refs.add_if_new = add_if_new
  600. old_shas = set(r.object_store)
  601. self.assertRaises(errors.CommitError, r.do_commit, 'failed commit',
  602. committer='Test Committer <test@nodomain.com>',
  603. author='Test Author <test@nodomain.com>',
  604. commit_timestamp=12345, commit_timezone=0,
  605. author_timestamp=12345, author_timezone=0)
  606. new_shas = set(r.object_store) - old_shas
  607. self.assertEqual(1, len(new_shas))
  608. # Check that the new commit (now garbage) was added.
  609. new_commit = r[new_shas.pop()]
  610. self.assertEqual(r[self._root_commit].tree, new_commit.tree)
  611. self.assertEqual('failed commit', new_commit.message)
  612. def test_commit_branch(self):
  613. r = self._repo
  614. commit_sha = r.do_commit('commit to branch',
  615. committer='Test Committer <test@nodomain.com>',
  616. author='Test Author <test@nodomain.com>',
  617. commit_timestamp=12395, commit_timezone=0,
  618. author_timestamp=12395, author_timezone=0,
  619. ref="refs/heads/new_branch")
  620. self.assertEqual(self._root_commit, r["HEAD"].id)
  621. self.assertEqual(commit_sha, r["refs/heads/new_branch"].id)
  622. self.assertEqual([], r[commit_sha].parents)
  623. self.assertTrue("refs/heads/new_branch" in r)
  624. new_branch_head = commit_sha
  625. commit_sha = r.do_commit('commit to branch 2',
  626. committer='Test Committer <test@nodomain.com>',
  627. author='Test Author <test@nodomain.com>',
  628. commit_timestamp=12395, commit_timezone=0,
  629. author_timestamp=12395, author_timezone=0,
  630. ref="refs/heads/new_branch")
  631. self.assertEqual(self._root_commit, r["HEAD"].id)
  632. self.assertEqual(commit_sha, r["refs/heads/new_branch"].id)
  633. self.assertEqual([new_branch_head], r[commit_sha].parents)
  634. def test_commit_merge_heads(self):
  635. r = self._repo
  636. merge_1 = r.do_commit('commit to branch 2',
  637. committer='Test Committer <test@nodomain.com>',
  638. author='Test Author <test@nodomain.com>',
  639. commit_timestamp=12395, commit_timezone=0,
  640. author_timestamp=12395, author_timezone=0,
  641. ref="refs/heads/new_branch")
  642. commit_sha = r.do_commit('commit with merge',
  643. committer='Test Committer <test@nodomain.com>',
  644. author='Test Author <test@nodomain.com>',
  645. commit_timestamp=12395, commit_timezone=0,
  646. author_timestamp=12395, author_timezone=0,
  647. merge_heads=[merge_1])
  648. self.assertEqual(
  649. [self._root_commit, merge_1],
  650. r[commit_sha].parents)
  651. def test_stage_deleted(self):
  652. r = self._repo
  653. os.remove(os.path.join(r.path, 'a'))
  654. r.stage(['a'])
  655. r.stage(['a']) # double-stage a deleted path
  656. class CheckRefFormatTests(TestCase):
  657. """Tests for the check_ref_format function.
  658. These are the same tests as in the git test suite.
  659. """
  660. def test_valid(self):
  661. self.assertTrue(check_ref_format('heads/foo'))
  662. self.assertTrue(check_ref_format('foo/bar/baz'))
  663. self.assertTrue(check_ref_format('refs///heads/foo'))
  664. self.assertTrue(check_ref_format('foo./bar'))
  665. self.assertTrue(check_ref_format('heads/foo@bar'))
  666. self.assertTrue(check_ref_format('heads/fix.lock.error'))
  667. def test_invalid(self):
  668. self.assertFalse(check_ref_format('foo'))
  669. self.assertFalse(check_ref_format('heads/foo/'))
  670. self.assertFalse(check_ref_format('./foo'))
  671. self.assertFalse(check_ref_format('.refs/foo'))
  672. self.assertFalse(check_ref_format('heads/foo..bar'))
  673. self.assertFalse(check_ref_format('heads/foo?bar'))
  674. self.assertFalse(check_ref_format('heads/foo.lock'))
  675. self.assertFalse(check_ref_format('heads/v@{ation'))
  676. self.assertFalse(check_ref_format('heads/foo\bar'))
  677. ONES = "1" * 40
  678. TWOS = "2" * 40
  679. THREES = "3" * 40
  680. FOURS = "4" * 40
  681. class PackedRefsFileTests(TestCase):
  682. def test_split_ref_line_errors(self):
  683. self.assertRaises(errors.PackedRefsException, _split_ref_line,
  684. 'singlefield')
  685. self.assertRaises(errors.PackedRefsException, _split_ref_line,
  686. 'badsha name')
  687. self.assertRaises(errors.PackedRefsException, _split_ref_line,
  688. '%s bad/../refname' % ONES)
  689. def test_read_without_peeled(self):
  690. f = StringIO('# comment\n%s ref/1\n%s ref/2' % (ONES, TWOS))
  691. self.assertEqual([(ONES, 'ref/1'), (TWOS, 'ref/2')],
  692. list(read_packed_refs(f)))
  693. def test_read_without_peeled_errors(self):
  694. f = StringIO('%s ref/1\n^%s' % (ONES, TWOS))
  695. self.assertRaises(errors.PackedRefsException, list, read_packed_refs(f))
  696. def test_read_with_peeled(self):
  697. f = StringIO('%s ref/1\n%s ref/2\n^%s\n%s ref/4' % (
  698. ONES, TWOS, THREES, FOURS))
  699. self.assertEqual([
  700. (ONES, 'ref/1', None),
  701. (TWOS, 'ref/2', THREES),
  702. (FOURS, 'ref/4', None),
  703. ], list(read_packed_refs_with_peeled(f)))
  704. def test_read_with_peeled_errors(self):
  705. f = StringIO('^%s\n%s ref/1' % (TWOS, ONES))
  706. self.assertRaises(errors.PackedRefsException, list, read_packed_refs(f))
  707. f = StringIO('%s ref/1\n^%s\n^%s' % (ONES, TWOS, THREES))
  708. self.assertRaises(errors.PackedRefsException, list, read_packed_refs(f))
  709. def test_write_with_peeled(self):
  710. f = StringIO()
  711. write_packed_refs(f, {'ref/1': ONES, 'ref/2': TWOS},
  712. {'ref/1': THREES})
  713. self.assertEqual(
  714. "# pack-refs with: peeled\n%s ref/1\n^%s\n%s ref/2\n" % (
  715. ONES, THREES, TWOS), f.getvalue())
  716. def test_write_without_peeled(self):
  717. f = StringIO()
  718. write_packed_refs(f, {'ref/1': ONES, 'ref/2': TWOS})
  719. self.assertEqual("%s ref/1\n%s ref/2\n" % (ONES, TWOS), f.getvalue())
  720. # Dict of refs that we expect all RefsContainerTests subclasses to define.
  721. _TEST_REFS = {
  722. 'HEAD': '42d06bd4b77fed026b154d16493e5deab78f02ec',
  723. 'refs/heads/40-char-ref-aaaaaaaaaaaaaaaaaa': '42d06bd4b77fed026b154d16493e5deab78f02ec',
  724. 'refs/heads/master': '42d06bd4b77fed026b154d16493e5deab78f02ec',
  725. 'refs/heads/packed': '42d06bd4b77fed026b154d16493e5deab78f02ec',
  726. 'refs/tags/refs-0.1': 'df6800012397fb85c56e7418dd4eb9405dee075c',
  727. 'refs/tags/refs-0.2': '3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8',
  728. }
  729. class RefsContainerTests(object):
  730. def test_keys(self):
  731. actual_keys = set(self._refs.keys())
  732. self.assertEqual(set(self._refs.allkeys()), actual_keys)
  733. # ignore the symref loop if it exists
  734. actual_keys.discard('refs/heads/loop')
  735. self.assertEqual(set(_TEST_REFS.iterkeys()), actual_keys)
  736. actual_keys = self._refs.keys('refs/heads')
  737. actual_keys.discard('loop')
  738. self.assertEqual(
  739. ['40-char-ref-aaaaaaaaaaaaaaaaaa', 'master', 'packed'],
  740. sorted(actual_keys))
  741. self.assertEqual(['refs-0.1', 'refs-0.2'],
  742. sorted(self._refs.keys('refs/tags')))
  743. def test_as_dict(self):
  744. # refs/heads/loop does not show up even if it exists
  745. self.assertEqual(_TEST_REFS, self._refs.as_dict())
  746. def test_setitem(self):
  747. self._refs['refs/some/ref'] = '42d06bd4b77fed026b154d16493e5deab78f02ec'
  748. self.assertEqual('42d06bd4b77fed026b154d16493e5deab78f02ec',
  749. self._refs['refs/some/ref'])
  750. self.assertRaises(errors.RefFormatError, self._refs.__setitem__,
  751. 'notrefs/foo', '42d06bd4b77fed026b154d16493e5deab78f02ec')
  752. def test_set_if_equals(self):
  753. nines = '9' * 40
  754. self.assertFalse(self._refs.set_if_equals('HEAD', 'c0ffee', nines))
  755. self.assertEqual('42d06bd4b77fed026b154d16493e5deab78f02ec',
  756. self._refs['HEAD'])
  757. self.assertTrue(self._refs.set_if_equals(
  758. 'HEAD', '42d06bd4b77fed026b154d16493e5deab78f02ec', nines))
  759. self.assertEqual(nines, self._refs['HEAD'])
  760. self.assertTrue(self._refs.set_if_equals('refs/heads/master', None,
  761. nines))
  762. self.assertEqual(nines, self._refs['refs/heads/master'])
  763. def test_add_if_new(self):
  764. nines = '9' * 40
  765. self.assertFalse(self._refs.add_if_new('refs/heads/master', nines))
  766. self.assertEqual('42d06bd4b77fed026b154d16493e5deab78f02ec',
  767. self._refs['refs/heads/master'])
  768. self.assertTrue(self._refs.add_if_new('refs/some/ref', nines))
  769. self.assertEqual(nines, self._refs['refs/some/ref'])
  770. def test_set_symbolic_ref(self):
  771. self._refs.set_symbolic_ref('refs/heads/symbolic', 'refs/heads/master')
  772. self.assertEqual('ref: refs/heads/master',
  773. self._refs.read_loose_ref('refs/heads/symbolic'))
  774. self.assertEqual('42d06bd4b77fed026b154d16493e5deab78f02ec',
  775. self._refs['refs/heads/symbolic'])
  776. def test_set_symbolic_ref_overwrite(self):
  777. nines = '9' * 40
  778. self.assertFalse('refs/heads/symbolic' in self._refs)
  779. self._refs['refs/heads/symbolic'] = nines
  780. self.assertEqual(nines, self._refs.read_loose_ref('refs/heads/symbolic'))
  781. self._refs.set_symbolic_ref('refs/heads/symbolic', 'refs/heads/master')
  782. self.assertEqual('ref: refs/heads/master',
  783. self._refs.read_loose_ref('refs/heads/symbolic'))
  784. self.assertEqual('42d06bd4b77fed026b154d16493e5deab78f02ec',
  785. self._refs['refs/heads/symbolic'])
  786. def test_check_refname(self):
  787. self._refs._check_refname('HEAD')
  788. self._refs._check_refname('refs/stash')
  789. self._refs._check_refname('refs/heads/foo')
  790. self.assertRaises(errors.RefFormatError, self._refs._check_refname,
  791. 'refs')
  792. self.assertRaises(errors.RefFormatError, self._refs._check_refname,
  793. 'notrefs/foo')
  794. def test_contains(self):
  795. self.assertTrue('refs/heads/master' in self._refs)
  796. self.assertFalse('refs/heads/bar' in self._refs)
  797. def test_delitem(self):
  798. self.assertEqual('42d06bd4b77fed026b154d16493e5deab78f02ec',
  799. self._refs['refs/heads/master'])
  800. del self._refs['refs/heads/master']
  801. self.assertRaises(KeyError, lambda: self._refs['refs/heads/master'])
  802. def test_remove_if_equals(self):
  803. self.assertFalse(self._refs.remove_if_equals('HEAD', 'c0ffee'))
  804. self.assertEqual('42d06bd4b77fed026b154d16493e5deab78f02ec',
  805. self._refs['HEAD'])
  806. self.assertTrue(self._refs.remove_if_equals(
  807. 'refs/tags/refs-0.2', '3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8'))
  808. self.assertFalse('refs/tags/refs-0.2' in self._refs)
  809. class DictRefsContainerTests(RefsContainerTests, TestCase):
  810. def setUp(self):
  811. TestCase.setUp(self)
  812. self._refs = DictRefsContainer(dict(_TEST_REFS))
  813. def test_invalid_refname(self):
  814. # FIXME: Move this test into RefsContainerTests, but requires
  815. # some way of injecting invalid refs.
  816. self._refs._refs["refs/stash"] = "00" * 20
  817. expected_refs = dict(_TEST_REFS)
  818. expected_refs["refs/stash"] = "00" * 20
  819. self.assertEqual(expected_refs, self._refs.as_dict())
  820. class DiskRefsContainerTests(RefsContainerTests, TestCase):
  821. def setUp(self):
  822. TestCase.setUp(self)
  823. self._repo = open_repo('refs.git')
  824. self._refs = self._repo.refs
  825. def tearDown(self):
  826. tear_down_repo(self._repo)
  827. TestCase.tearDown(self)
  828. def test_get_packed_refs(self):
  829. self.assertEqual({
  830. 'refs/heads/packed': '42d06bd4b77fed026b154d16493e5deab78f02ec',
  831. 'refs/tags/refs-0.1': 'df6800012397fb85c56e7418dd4eb9405dee075c',
  832. }, self._refs.get_packed_refs())
  833. def test_get_peeled_not_packed(self):
  834. # not packed
  835. self.assertEqual(None, self._refs.get_peeled('refs/tags/refs-0.2'))
  836. self.assertEqual('3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8',
  837. self._refs['refs/tags/refs-0.2'])
  838. # packed, known not peelable
  839. self.assertEqual(self._refs['refs/heads/packed'],
  840. self._refs.get_peeled('refs/heads/packed'))
  841. # packed, peeled
  842. self.assertEqual('42d06bd4b77fed026b154d16493e5deab78f02ec',
  843. self._refs.get_peeled('refs/tags/refs-0.1'))
  844. def test_setitem(self):
  845. RefsContainerTests.test_setitem(self)
  846. f = open(os.path.join(self._refs.path, 'refs', 'some', 'ref'), 'rb')
  847. self.assertEqual('42d06bd4b77fed026b154d16493e5deab78f02ec',
  848. f.read()[:40])
  849. f.close()
  850. def test_setitem_symbolic(self):
  851. ones = '1' * 40
  852. self._refs['HEAD'] = ones
  853. self.assertEqual(ones, self._refs['HEAD'])
  854. # ensure HEAD was not modified
  855. f = open(os.path.join(self._refs.path, 'HEAD'), 'rb')
  856. self.assertEqual('ref: refs/heads/master', iter(f).next().rstrip('\n'))
  857. f.close()
  858. # ensure the symbolic link was written through
  859. f = open(os.path.join(self._refs.path, 'refs', 'heads', 'master'), 'rb')
  860. self.assertEqual(ones, f.read()[:40])
  861. f.close()
  862. def test_set_if_equals(self):
  863. RefsContainerTests.test_set_if_equals(self)
  864. # ensure symref was followed
  865. self.assertEqual('9' * 40, self._refs['refs/heads/master'])
  866. # ensure lockfile was deleted
  867. self.assertFalse(os.path.exists(
  868. os.path.join(self._refs.path, 'refs', 'heads', 'master.lock')))
  869. self.assertFalse(os.path.exists(
  870. os.path.join(self._refs.path, 'HEAD.lock')))
  871. def test_add_if_new_packed(self):
  872. # don't overwrite packed ref
  873. self.assertFalse(self._refs.add_if_new('refs/tags/refs-0.1', '9' * 40))
  874. self.assertEqual('df6800012397fb85c56e7418dd4eb9405dee075c',
  875. self._refs['refs/tags/refs-0.1'])
  876. def test_add_if_new_symbolic(self):
  877. # Use an empty repo instead of the default.
  878. tear_down_repo(self._repo)
  879. repo_dir = os.path.join(tempfile.mkdtemp(), 'test')
  880. os.makedirs(repo_dir)
  881. self._repo = Repo.init(repo_dir)
  882. refs = self._repo.refs
  883. nines = '9' * 40
  884. self.assertEqual('ref: refs/heads/master', refs.read_ref('HEAD'))
  885. self.assertFalse('refs/heads/master' in refs)
  886. self.assertTrue(refs.add_if_new('HEAD', nines))
  887. self.assertEqual('ref: refs/heads/master', refs.read_ref('HEAD'))
  888. self.assertEqual(nines, refs['HEAD'])
  889. self.assertEqual(nines, refs['refs/heads/master'])
  890. self.assertFalse(refs.add_if_new('HEAD', '1' * 40))
  891. self.assertEqual(nines, refs['HEAD'])
  892. self.assertEqual(nines, refs['refs/heads/master'])
  893. def test_follow(self):
  894. self.assertEqual(
  895. ('refs/heads/master', '42d06bd4b77fed026b154d16493e5deab78f02ec'),
  896. self._refs._follow('HEAD'))
  897. self.assertEqual(
  898. ('refs/heads/master', '42d06bd4b77fed026b154d16493e5deab78f02ec'),
  899. self._refs._follow('refs/heads/master'))
  900. self.assertRaises(KeyError, self._refs._follow, 'refs/heads/loop')
  901. def test_delitem(self):
  902. RefsContainerTests.test_delitem(self)
  903. ref_file = os.path.join(self._refs.path, 'refs', 'heads', 'master')
  904. self.assertFalse(os.path.exists(ref_file))
  905. self.assertFalse('refs/heads/master' in self._refs.get_packed_refs())
  906. def test_delitem_symbolic(self):
  907. self.assertEqual('ref: refs/heads/master',
  908. self._refs.read_loose_ref('HEAD'))
  909. del self._refs['HEAD']
  910. self.assertRaises(KeyError, lambda: self._refs['HEAD'])
  911. self.assertEqual('42d06bd4b77fed026b154d16493e5deab78f02ec',
  912. self._refs['refs/heads/master'])
  913. self.assertFalse(os.path.exists(os.path.join(self._refs.path, 'HEAD')))
  914. def test_remove_if_equals_symref(self):
  915. # HEAD is a symref, so shouldn't equal its dereferenced value
  916. self.assertFalse(self._refs.remove_if_equals(
  917. 'HEAD', '42d06bd4b77fed026b154d16493e5deab78f02ec'))
  918. self.assertTrue(self._refs.remove_if_equals(
  919. 'refs/heads/master', '42d06bd4b77fed026b154d16493e5deab78f02ec'))
  920. self.assertRaises(KeyError, lambda: self._refs['refs/heads/master'])
  921. # HEAD is now a broken symref
  922. self.assertRaises(KeyError, lambda: self._refs['HEAD'])
  923. self.assertEqual('ref: refs/heads/master',
  924. self._refs.read_loose_ref('HEAD'))
  925. self.assertFalse(os.path.exists(
  926. os.path.join(self._refs.path, 'refs', 'heads', 'master.lock')))
  927. self.assertFalse(os.path.exists(
  928. os.path.join(self._refs.path, 'HEAD.lock')))
  929. def test_remove_packed_without_peeled(self):
  930. refs_file = os.path.join(self._repo.path, 'packed-refs')
  931. f = GitFile(refs_file)
  932. refs_data = f.read()
  933. f.close()
  934. f = GitFile(refs_file, 'wb')
  935. f.write('\n'.join(l for l in refs_data.split('\n')
  936. if not l or l[0] not in '#^'))
  937. f.close()
  938. self._repo = Repo(self._repo.path)
  939. refs = self._repo.refs
  940. self.assertTrue(refs.remove_if_equals(
  941. 'refs/heads/packed', '42d06bd4b77fed026b154d16493e5deab78f02ec'))
  942. def test_remove_if_equals_packed(self):
  943. # test removing ref that is only packed
  944. self.assertEqual('df6800012397fb85c56e7418dd4eb9405dee075c',
  945. self._refs['refs/tags/refs-0.1'])
  946. self.assertTrue(
  947. self._refs.remove_if_equals('refs/tags/refs-0.1',
  948. 'df6800012397fb85c56e7418dd4eb9405dee075c'))
  949. self.assertRaises(KeyError, lambda: self._refs['refs/tags/refs-0.1'])
  950. def test_read_ref(self):
  951. self.assertEqual('ref: refs/heads/master', self._refs.read_ref("HEAD"))
  952. self.assertEqual('42d06bd4b77fed026b154d16493e5deab78f02ec',
  953. self._refs.read_ref("refs/heads/packed"))
  954. self.assertEqual(None,
  955. self._refs.read_ref("nonexistant"))
  956. _TEST_REFS_SERIALIZED = (
  957. '42d06bd4b77fed026b154d16493e5deab78f02ec\trefs/heads/40-char-ref-aaaaaaaaaaaaaaaaaa\n'
  958. '42d06bd4b77fed026b154d16493e5deab78f02ec\trefs/heads/master\n'
  959. '42d06bd4b77fed026b154d16493e5deab78f02ec\trefs/heads/packed\n'
  960. 'df6800012397fb85c56e7418dd4eb9405dee075c\trefs/tags/refs-0.1\n'
  961. '3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8\trefs/tags/refs-0.2\n')
  962. class InfoRefsContainerTests(TestCase):
  963. def test_invalid_refname(self):
  964. text = _TEST_REFS_SERIALIZED + '00' * 20 + '\trefs/stash\n'
  965. refs = InfoRefsContainer(StringIO(text))
  966. expected_refs = dict(_TEST_REFS)
  967. del expected_refs['HEAD']
  968. expected_refs["refs/stash"] = "00" * 20
  969. self.assertEqual(expected_refs, refs.as_dict())
  970. def test_keys(self):
  971. refs = InfoRefsContainer(StringIO(_TEST_REFS_SERIALIZED))
  972. actual_keys = set(refs.keys())
  973. self.assertEqual(set(refs.allkeys()), actual_keys)
  974. # ignore the symref loop if it exists
  975. actual_keys.discard('refs/heads/loop')
  976. expected_refs = dict(_TEST_REFS)
  977. del expected_refs['HEAD']
  978. self.assertEqual(set(expected_refs.iterkeys()), actual_keys)
  979. actual_keys = refs.keys('refs/heads')
  980. actual_keys.discard('loop')
  981. self.assertEqual(
  982. ['40-char-ref-aaaaaaaaaaaaaaaaaa', 'master', 'packed'],
  983. sorted(actual_keys))
  984. self.assertEqual(['refs-0.1', 'refs-0.2'],
  985. sorted(refs.keys('refs/tags')))
  986. def test_as_dict(self):
  987. refs = InfoRefsContainer(StringIO(_TEST_REFS_SERIALIZED))
  988. # refs/heads/loop does not show up even if it exists
  989. expected_refs = dict(_TEST_REFS)
  990. del expected_refs['HEAD']
  991. self.assertEqual(expected_refs, refs.as_dict())
  992. def test_contains(self):
  993. refs = InfoRefsContainer(StringIO(_TEST_REFS_SERIALIZED))
  994. self.assertTrue('refs/heads/master' in refs)
  995. self.assertFalse('refs/heads/bar' in refs)
  996. def test_get_peeled(self):
  997. refs = InfoRefsContainer(StringIO(_TEST_REFS_SERIALIZED))
  998. # refs/heads/loop does not show up even if it exists
  999. self.assertEqual(
  1000. _TEST_REFS['refs/heads/master'],
  1001. refs.get_peeled('refs/heads/master'))