test_index.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789
  1. # test_index.py -- Tests for the git index
  2. # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as public by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for the index."""
  22. import os
  23. import shutil
  24. import stat
  25. import struct
  26. import sys
  27. import tempfile
  28. from io import BytesIO
  29. from dulwich.index import (
  30. Index,
  31. IndexEntry,
  32. SerializedIndexEntry,
  33. _fs_to_tree_path,
  34. _tree_to_fs_path,
  35. build_index_from_tree,
  36. cleanup_mode,
  37. commit_tree,
  38. get_unstaged_changes,
  39. index_entry_from_stat,
  40. read_index,
  41. read_index_dict,
  42. validate_path_element_default,
  43. validate_path_element_ntfs,
  44. write_cache_time,
  45. write_index,
  46. write_index_dict,
  47. )
  48. from dulwich.object_store import MemoryObjectStore
  49. from dulwich.objects import S_IFGITLINK, Blob, Commit, Tree
  50. from dulwich.repo import Repo
  51. from . import TestCase, skipIf
  52. def can_symlink() -> bool:
  53. """Return whether running process can create symlinks."""
  54. if sys.platform != "win32":
  55. # Platforms other than Windows should allow symlinks without issues.
  56. return True
  57. test_source = tempfile.mkdtemp()
  58. test_target = test_source + "can_symlink"
  59. try:
  60. os.symlink(test_source, test_target)
  61. except (NotImplementedError, OSError):
  62. return False
  63. return True
  64. class IndexTestCase(TestCase):
  65. datadir = os.path.join(os.path.dirname(__file__), "../testdata/indexes")
  66. def get_simple_index(self, name):
  67. return Index(os.path.join(self.datadir, name))
  68. class SimpleIndexTestCase(IndexTestCase):
  69. def test_len(self) -> None:
  70. self.assertEqual(1, len(self.get_simple_index("index")))
  71. def test_iter(self) -> None:
  72. self.assertEqual([b"bla"], list(self.get_simple_index("index")))
  73. def test_iter_skip_hash(self) -> None:
  74. self.assertEqual([b"bla"], list(self.get_simple_index("index_skip_hash")))
  75. def test_iterobjects(self) -> None:
  76. self.assertEqual(
  77. [(b"bla", b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391", 33188)],
  78. list(self.get_simple_index("index").iterobjects()),
  79. )
  80. def test_getitem(self) -> None:
  81. self.assertEqual(
  82. IndexEntry(
  83. (1230680220, 0),
  84. (1230680220, 0),
  85. 2050,
  86. 3761020,
  87. 33188,
  88. 1000,
  89. 1000,
  90. 0,
  91. b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391",
  92. ),
  93. self.get_simple_index("index")[b"bla"],
  94. )
  95. def test_empty(self) -> None:
  96. i = self.get_simple_index("notanindex")
  97. self.assertEqual(0, len(i))
  98. self.assertFalse(os.path.exists(i._filename))
  99. def test_against_empty_tree(self) -> None:
  100. i = self.get_simple_index("index")
  101. changes = list(i.changes_from_tree(MemoryObjectStore(), None))
  102. self.assertEqual(1, len(changes))
  103. (oldname, newname), (oldmode, newmode), (oldsha, newsha) = changes[0]
  104. self.assertEqual(b"bla", newname)
  105. self.assertEqual(b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391", newsha)
  106. class SimpleIndexWriterTestCase(IndexTestCase):
  107. def setUp(self) -> None:
  108. IndexTestCase.setUp(self)
  109. self.tempdir = tempfile.mkdtemp()
  110. def tearDown(self) -> None:
  111. IndexTestCase.tearDown(self)
  112. shutil.rmtree(self.tempdir)
  113. def test_simple_write(self) -> None:
  114. entries = [
  115. (
  116. SerializedIndexEntry(
  117. b"barbla",
  118. (1230680220, 0),
  119. (1230680220, 0),
  120. 2050,
  121. 3761020,
  122. 33188,
  123. 1000,
  124. 1000,
  125. 0,
  126. b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391",
  127. 0,
  128. 0,
  129. )
  130. )
  131. ]
  132. filename = os.path.join(self.tempdir, "test-simple-write-index")
  133. with open(filename, "wb+") as x:
  134. write_index(x, entries)
  135. with open(filename, "rb") as x:
  136. self.assertEqual(entries, list(read_index(x)))
  137. class ReadIndexDictTests(IndexTestCase):
  138. def setUp(self) -> None:
  139. IndexTestCase.setUp(self)
  140. self.tempdir = tempfile.mkdtemp()
  141. def tearDown(self) -> None:
  142. IndexTestCase.tearDown(self)
  143. shutil.rmtree(self.tempdir)
  144. def test_simple_write(self) -> None:
  145. entries = {
  146. b"barbla": IndexEntry(
  147. (1230680220, 0),
  148. (1230680220, 0),
  149. 2050,
  150. 3761020,
  151. 33188,
  152. 1000,
  153. 1000,
  154. 0,
  155. b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391",
  156. )
  157. }
  158. filename = os.path.join(self.tempdir, "test-simple-write-index")
  159. with open(filename, "wb+") as x:
  160. write_index_dict(x, entries)
  161. with open(filename, "rb") as x:
  162. self.assertEqual(entries, read_index_dict(x))
  163. class CommitTreeTests(TestCase):
  164. def setUp(self) -> None:
  165. super().setUp()
  166. self.store = MemoryObjectStore()
  167. def test_single_blob(self) -> None:
  168. blob = Blob()
  169. blob.data = b"foo"
  170. self.store.add_object(blob)
  171. blobs = [(b"bla", blob.id, stat.S_IFREG)]
  172. rootid = commit_tree(self.store, blobs)
  173. self.assertEqual(rootid, b"1a1e80437220f9312e855c37ac4398b68e5c1d50")
  174. self.assertEqual((stat.S_IFREG, blob.id), self.store[rootid][b"bla"])
  175. self.assertEqual({rootid, blob.id}, set(self.store._data.keys()))
  176. def test_nested(self) -> None:
  177. blob = Blob()
  178. blob.data = b"foo"
  179. self.store.add_object(blob)
  180. blobs = [(b"bla/bar", blob.id, stat.S_IFREG)]
  181. rootid = commit_tree(self.store, blobs)
  182. self.assertEqual(rootid, b"d92b959b216ad0d044671981196781b3258fa537")
  183. dirid = self.store[rootid][b"bla"][1]
  184. self.assertEqual(dirid, b"c1a1deb9788150829579a8b4efa6311e7b638650")
  185. self.assertEqual((stat.S_IFDIR, dirid), self.store[rootid][b"bla"])
  186. self.assertEqual((stat.S_IFREG, blob.id), self.store[dirid][b"bar"])
  187. self.assertEqual({rootid, dirid, blob.id}, set(self.store._data.keys()))
  188. class CleanupModeTests(TestCase):
  189. def assertModeEqual(self, expected, got) -> None:
  190. self.assertEqual(expected, got, f"{expected:o} != {got:o}")
  191. def test_file(self) -> None:
  192. self.assertModeEqual(0o100644, cleanup_mode(0o100000))
  193. def test_executable(self) -> None:
  194. self.assertModeEqual(0o100755, cleanup_mode(0o100711))
  195. self.assertModeEqual(0o100755, cleanup_mode(0o100700))
  196. def test_symlink(self) -> None:
  197. self.assertModeEqual(0o120000, cleanup_mode(0o120711))
  198. def test_dir(self) -> None:
  199. self.assertModeEqual(0o040000, cleanup_mode(0o40531))
  200. def test_submodule(self) -> None:
  201. self.assertModeEqual(0o160000, cleanup_mode(0o160744))
  202. class WriteCacheTimeTests(TestCase):
  203. def test_write_string(self) -> None:
  204. f = BytesIO()
  205. self.assertRaises(TypeError, write_cache_time, f, "foo")
  206. def test_write_int(self) -> None:
  207. f = BytesIO()
  208. write_cache_time(f, 434343)
  209. self.assertEqual(struct.pack(">LL", 434343, 0), f.getvalue())
  210. def test_write_tuple(self) -> None:
  211. f = BytesIO()
  212. write_cache_time(f, (434343, 21))
  213. self.assertEqual(struct.pack(">LL", 434343, 21), f.getvalue())
  214. def test_write_float(self) -> None:
  215. f = BytesIO()
  216. write_cache_time(f, 434343.000000021)
  217. self.assertEqual(struct.pack(">LL", 434343, 21), f.getvalue())
  218. class IndexEntryFromStatTests(TestCase):
  219. def test_simple(self) -> None:
  220. st = os.stat_result(
  221. (
  222. 16877,
  223. 131078,
  224. 64769,
  225. 154,
  226. 1000,
  227. 1000,
  228. 12288,
  229. 1323629595,
  230. 1324180496,
  231. 1324180496,
  232. )
  233. )
  234. entry = index_entry_from_stat(st, b"22" * 20)
  235. self.assertEqual(
  236. entry,
  237. IndexEntry(
  238. 1324180496,
  239. 1324180496,
  240. 64769,
  241. 131078,
  242. 16384,
  243. 1000,
  244. 1000,
  245. 12288,
  246. b"2222222222222222222222222222222222222222",
  247. ),
  248. )
  249. def test_override_mode(self) -> None:
  250. st = os.stat_result(
  251. (
  252. stat.S_IFREG + 0o644,
  253. 131078,
  254. 64769,
  255. 154,
  256. 1000,
  257. 1000,
  258. 12288,
  259. 1323629595,
  260. 1324180496,
  261. 1324180496,
  262. )
  263. )
  264. entry = index_entry_from_stat(st, b"22" * 20, mode=stat.S_IFREG + 0o755)
  265. self.assertEqual(
  266. entry,
  267. IndexEntry(
  268. 1324180496,
  269. 1324180496,
  270. 64769,
  271. 131078,
  272. 33261,
  273. 1000,
  274. 1000,
  275. 12288,
  276. b"2222222222222222222222222222222222222222",
  277. ),
  278. )
  279. class BuildIndexTests(TestCase):
  280. def assertReasonableIndexEntry(self, index_entry, mode, filesize, sha) -> None:
  281. self.assertEqual(index_entry.mode, mode) # mode
  282. self.assertEqual(index_entry.size, filesize) # filesize
  283. self.assertEqual(index_entry.sha, sha) # sha
  284. def assertFileContents(self, path, contents, symlink=False) -> None:
  285. if symlink:
  286. self.assertEqual(os.readlink(path), contents)
  287. else:
  288. with open(path, "rb") as f:
  289. self.assertEqual(f.read(), contents)
  290. def test_empty(self) -> None:
  291. repo_dir = tempfile.mkdtemp()
  292. self.addCleanup(shutil.rmtree, repo_dir)
  293. with Repo.init(repo_dir) as repo:
  294. tree = Tree()
  295. repo.object_store.add_object(tree)
  296. build_index_from_tree(
  297. repo.path, repo.index_path(), repo.object_store, tree.id
  298. )
  299. # Verify index entries
  300. index = repo.open_index()
  301. self.assertEqual(len(index), 0)
  302. # Verify no files
  303. self.assertEqual([".git"], os.listdir(repo.path))
  304. def test_git_dir(self) -> None:
  305. repo_dir = tempfile.mkdtemp()
  306. self.addCleanup(shutil.rmtree, repo_dir)
  307. with Repo.init(repo_dir) as repo:
  308. # Populate repo
  309. filea = Blob.from_string(b"file a")
  310. filee = Blob.from_string(b"d")
  311. tree = Tree()
  312. tree[b".git/a"] = (stat.S_IFREG | 0o644, filea.id)
  313. tree[b"c/e"] = (stat.S_IFREG | 0o644, filee.id)
  314. repo.object_store.add_objects([(o, None) for o in [filea, filee, tree]])
  315. build_index_from_tree(
  316. repo.path, repo.index_path(), repo.object_store, tree.id
  317. )
  318. # Verify index entries
  319. index = repo.open_index()
  320. self.assertEqual(len(index), 1)
  321. # filea
  322. apath = os.path.join(repo.path, ".git", "a")
  323. self.assertFalse(os.path.exists(apath))
  324. # filee
  325. epath = os.path.join(repo.path, "c", "e")
  326. self.assertTrue(os.path.exists(epath))
  327. self.assertReasonableIndexEntry(
  328. index[b"c/e"], stat.S_IFREG | 0o644, 1, filee.id
  329. )
  330. self.assertFileContents(epath, b"d")
  331. def test_nonempty(self) -> None:
  332. repo_dir = tempfile.mkdtemp()
  333. self.addCleanup(shutil.rmtree, repo_dir)
  334. with Repo.init(repo_dir) as repo:
  335. # Populate repo
  336. filea = Blob.from_string(b"file a")
  337. fileb = Blob.from_string(b"file b")
  338. filed = Blob.from_string(b"file d")
  339. tree = Tree()
  340. tree[b"a"] = (stat.S_IFREG | 0o644, filea.id)
  341. tree[b"b"] = (stat.S_IFREG | 0o644, fileb.id)
  342. tree[b"c/d"] = (stat.S_IFREG | 0o644, filed.id)
  343. repo.object_store.add_objects(
  344. [(o, None) for o in [filea, fileb, filed, tree]]
  345. )
  346. build_index_from_tree(
  347. repo.path, repo.index_path(), repo.object_store, tree.id
  348. )
  349. # Verify index entries
  350. index = repo.open_index()
  351. self.assertEqual(len(index), 3)
  352. # filea
  353. apath = os.path.join(repo.path, "a")
  354. self.assertTrue(os.path.exists(apath))
  355. self.assertReasonableIndexEntry(
  356. index[b"a"], stat.S_IFREG | 0o644, 6, filea.id
  357. )
  358. self.assertFileContents(apath, b"file a")
  359. # fileb
  360. bpath = os.path.join(repo.path, "b")
  361. self.assertTrue(os.path.exists(bpath))
  362. self.assertReasonableIndexEntry(
  363. index[b"b"], stat.S_IFREG | 0o644, 6, fileb.id
  364. )
  365. self.assertFileContents(bpath, b"file b")
  366. # filed
  367. dpath = os.path.join(repo.path, "c", "d")
  368. self.assertTrue(os.path.exists(dpath))
  369. self.assertReasonableIndexEntry(
  370. index[b"c/d"], stat.S_IFREG | 0o644, 6, filed.id
  371. )
  372. self.assertFileContents(dpath, b"file d")
  373. # Verify no extra files
  374. self.assertEqual([".git", "a", "b", "c"], sorted(os.listdir(repo.path)))
  375. self.assertEqual(["d"], sorted(os.listdir(os.path.join(repo.path, "c"))))
  376. @skipIf(not getattr(os, "sync", None), "Requires sync support")
  377. def test_norewrite(self) -> None:
  378. repo_dir = tempfile.mkdtemp()
  379. self.addCleanup(shutil.rmtree, repo_dir)
  380. with Repo.init(repo_dir) as repo:
  381. # Populate repo
  382. filea = Blob.from_string(b"file a")
  383. filea_path = os.path.join(repo_dir, "a")
  384. tree = Tree()
  385. tree[b"a"] = (stat.S_IFREG | 0o644, filea.id)
  386. repo.object_store.add_objects([(o, None) for o in [filea, tree]])
  387. # First Write
  388. build_index_from_tree(
  389. repo.path, repo.index_path(), repo.object_store, tree.id
  390. )
  391. # Use sync as metadata can be cached on some FS
  392. os.sync()
  393. mtime = os.stat(filea_path).st_mtime
  394. # Test Rewrite
  395. build_index_from_tree(
  396. repo.path, repo.index_path(), repo.object_store, tree.id
  397. )
  398. os.sync()
  399. self.assertEqual(mtime, os.stat(filea_path).st_mtime)
  400. # Modify content
  401. with open(filea_path, "wb") as fh:
  402. fh.write(b"test a")
  403. os.sync()
  404. mtime = os.stat(filea_path).st_mtime
  405. # Test rewrite
  406. build_index_from_tree(
  407. repo.path, repo.index_path(), repo.object_store, tree.id
  408. )
  409. os.sync()
  410. with open(filea_path, "rb") as fh:
  411. self.assertEqual(b"file a", fh.read())
  412. @skipIf(not can_symlink(), "Requires symlink support")
  413. def test_symlink(self) -> None:
  414. repo_dir = tempfile.mkdtemp()
  415. self.addCleanup(shutil.rmtree, repo_dir)
  416. with Repo.init(repo_dir) as repo:
  417. # Populate repo
  418. filed = Blob.from_string(b"file d")
  419. filee = Blob.from_string(b"d")
  420. tree = Tree()
  421. tree[b"c/d"] = (stat.S_IFREG | 0o644, filed.id)
  422. tree[b"c/e"] = (stat.S_IFLNK, filee.id) # symlink
  423. repo.object_store.add_objects([(o, None) for o in [filed, filee, tree]])
  424. build_index_from_tree(
  425. repo.path, repo.index_path(), repo.object_store, tree.id
  426. )
  427. # Verify index entries
  428. index = repo.open_index()
  429. # symlink to d
  430. epath = os.path.join(repo.path, "c", "e")
  431. self.assertTrue(os.path.exists(epath))
  432. self.assertReasonableIndexEntry(
  433. index[b"c/e"],
  434. stat.S_IFLNK,
  435. 0 if sys.platform == "win32" else 1,
  436. filee.id,
  437. )
  438. self.assertFileContents(epath, "d", symlink=True)
  439. def test_no_decode_encode(self) -> None:
  440. repo_dir = tempfile.mkdtemp()
  441. repo_dir_bytes = os.fsencode(repo_dir)
  442. self.addCleanup(shutil.rmtree, repo_dir)
  443. with Repo.init(repo_dir) as repo:
  444. # Populate repo
  445. file = Blob.from_string(b"foo")
  446. tree = Tree()
  447. latin1_name = "À".encode("latin1")
  448. try:
  449. latin1_path = os.path.join(repo_dir_bytes, latin1_name)
  450. except UnicodeDecodeError:
  451. self.skipTest("can not decode as latin1")
  452. utf8_name = "À".encode()
  453. utf8_path = os.path.join(repo_dir_bytes, utf8_name)
  454. tree[latin1_name] = (stat.S_IFREG | 0o644, file.id)
  455. tree[utf8_name] = (stat.S_IFREG | 0o644, file.id)
  456. repo.object_store.add_objects([(o, None) for o in [file, tree]])
  457. try:
  458. build_index_from_tree(
  459. repo.path, repo.index_path(), repo.object_store, tree.id
  460. )
  461. except OSError as e:
  462. if e.errno == 92 and sys.platform == "darwin":
  463. # Our filename isn't supported by the platform :(
  464. self.skipTest(f"can not write filename {e.filename!r}")
  465. else:
  466. raise
  467. except UnicodeDecodeError:
  468. # This happens e.g. with python3.6 on Windows.
  469. # It implicitly decodes using utf8, which doesn't work.
  470. self.skipTest("can not implicitly convert as utf8")
  471. # Verify index entries
  472. index = repo.open_index()
  473. self.assertIn(latin1_name, index)
  474. self.assertIn(utf8_name, index)
  475. self.assertTrue(os.path.exists(latin1_path))
  476. self.assertTrue(os.path.exists(utf8_path))
  477. def test_git_submodule(self) -> None:
  478. repo_dir = tempfile.mkdtemp()
  479. self.addCleanup(shutil.rmtree, repo_dir)
  480. with Repo.init(repo_dir) as repo:
  481. filea = Blob.from_string(b"file alalala")
  482. subtree = Tree()
  483. subtree[b"a"] = (stat.S_IFREG | 0o644, filea.id)
  484. c = Commit()
  485. c.tree = subtree.id
  486. c.committer = c.author = b"Somebody <somebody@example.com>"
  487. c.commit_time = c.author_time = 42342
  488. c.commit_timezone = c.author_timezone = 0
  489. c.parents = []
  490. c.message = b"Subcommit"
  491. tree = Tree()
  492. tree[b"c"] = (S_IFGITLINK, c.id)
  493. repo.object_store.add_objects([(o, None) for o in [tree]])
  494. build_index_from_tree(
  495. repo.path, repo.index_path(), repo.object_store, tree.id
  496. )
  497. # Verify index entries
  498. index = repo.open_index()
  499. self.assertEqual(len(index), 1)
  500. # filea
  501. apath = os.path.join(repo.path, "c/a")
  502. self.assertFalse(os.path.exists(apath))
  503. # dir c
  504. cpath = os.path.join(repo.path, "c")
  505. self.assertTrue(os.path.isdir(cpath))
  506. self.assertEqual(index[b"c"].mode, S_IFGITLINK) # mode
  507. self.assertEqual(index[b"c"].sha, c.id) # sha
  508. def test_git_submodule_exists(self) -> None:
  509. repo_dir = tempfile.mkdtemp()
  510. self.addCleanup(shutil.rmtree, repo_dir)
  511. with Repo.init(repo_dir) as repo:
  512. filea = Blob.from_string(b"file alalala")
  513. subtree = Tree()
  514. subtree[b"a"] = (stat.S_IFREG | 0o644, filea.id)
  515. c = Commit()
  516. c.tree = subtree.id
  517. c.committer = c.author = b"Somebody <somebody@example.com>"
  518. c.commit_time = c.author_time = 42342
  519. c.commit_timezone = c.author_timezone = 0
  520. c.parents = []
  521. c.message = b"Subcommit"
  522. tree = Tree()
  523. tree[b"c"] = (S_IFGITLINK, c.id)
  524. os.mkdir(os.path.join(repo_dir, "c"))
  525. repo.object_store.add_objects([(o, None) for o in [tree]])
  526. build_index_from_tree(
  527. repo.path, repo.index_path(), repo.object_store, tree.id
  528. )
  529. # Verify index entries
  530. index = repo.open_index()
  531. self.assertEqual(len(index), 1)
  532. # filea
  533. apath = os.path.join(repo.path, "c/a")
  534. self.assertFalse(os.path.exists(apath))
  535. # dir c
  536. cpath = os.path.join(repo.path, "c")
  537. self.assertTrue(os.path.isdir(cpath))
  538. self.assertEqual(index[b"c"].mode, S_IFGITLINK) # mode
  539. self.assertEqual(index[b"c"].sha, c.id) # sha
  540. class GetUnstagedChangesTests(TestCase):
  541. def test_get_unstaged_changes(self) -> None:
  542. """Unit test for get_unstaged_changes."""
  543. repo_dir = tempfile.mkdtemp()
  544. self.addCleanup(shutil.rmtree, repo_dir)
  545. with Repo.init(repo_dir) as repo:
  546. # Commit a dummy file then modify it
  547. foo1_fullpath = os.path.join(repo_dir, "foo1")
  548. with open(foo1_fullpath, "wb") as f:
  549. f.write(b"origstuff")
  550. foo2_fullpath = os.path.join(repo_dir, "foo2")
  551. with open(foo2_fullpath, "wb") as f:
  552. f.write(b"origstuff")
  553. repo.stage(["foo1", "foo2"])
  554. repo.do_commit(
  555. b"test status",
  556. author=b"author <email>",
  557. committer=b"committer <email>",
  558. )
  559. with open(foo1_fullpath, "wb") as f:
  560. f.write(b"newstuff")
  561. # modify access and modify time of path
  562. os.utime(foo1_fullpath, (0, 0))
  563. changes = get_unstaged_changes(repo.open_index(), repo_dir)
  564. self.assertEqual(list(changes), [b"foo1"])
  565. def test_get_unstaged_deleted_changes(self) -> None:
  566. """Unit test for get_unstaged_changes."""
  567. repo_dir = tempfile.mkdtemp()
  568. self.addCleanup(shutil.rmtree, repo_dir)
  569. with Repo.init(repo_dir) as repo:
  570. # Commit a dummy file then remove it
  571. foo1_fullpath = os.path.join(repo_dir, "foo1")
  572. with open(foo1_fullpath, "wb") as f:
  573. f.write(b"origstuff")
  574. repo.stage(["foo1"])
  575. repo.do_commit(
  576. b"test status",
  577. author=b"author <email>",
  578. committer=b"committer <email>",
  579. )
  580. os.unlink(foo1_fullpath)
  581. changes = get_unstaged_changes(repo.open_index(), repo_dir)
  582. self.assertEqual(list(changes), [b"foo1"])
  583. def test_get_unstaged_changes_removed_replaced_by_directory(self) -> None:
  584. """Unit test for get_unstaged_changes."""
  585. repo_dir = tempfile.mkdtemp()
  586. self.addCleanup(shutil.rmtree, repo_dir)
  587. with Repo.init(repo_dir) as repo:
  588. # Commit a dummy file then modify it
  589. foo1_fullpath = os.path.join(repo_dir, "foo1")
  590. with open(foo1_fullpath, "wb") as f:
  591. f.write(b"origstuff")
  592. repo.stage(["foo1"])
  593. repo.do_commit(
  594. b"test status",
  595. author=b"author <email>",
  596. committer=b"committer <email>",
  597. )
  598. os.remove(foo1_fullpath)
  599. os.mkdir(foo1_fullpath)
  600. changes = get_unstaged_changes(repo.open_index(), repo_dir)
  601. self.assertEqual(list(changes), [b"foo1"])
  602. @skipIf(not can_symlink(), "Requires symlink support")
  603. def test_get_unstaged_changes_removed_replaced_by_link(self) -> None:
  604. """Unit test for get_unstaged_changes."""
  605. repo_dir = tempfile.mkdtemp()
  606. self.addCleanup(shutil.rmtree, repo_dir)
  607. with Repo.init(repo_dir) as repo:
  608. # Commit a dummy file then modify it
  609. foo1_fullpath = os.path.join(repo_dir, "foo1")
  610. with open(foo1_fullpath, "wb") as f:
  611. f.write(b"origstuff")
  612. repo.stage(["foo1"])
  613. repo.do_commit(
  614. b"test status",
  615. author=b"author <email>",
  616. committer=b"committer <email>",
  617. )
  618. os.remove(foo1_fullpath)
  619. os.symlink(os.path.dirname(foo1_fullpath), foo1_fullpath)
  620. changes = get_unstaged_changes(repo.open_index(), repo_dir)
  621. self.assertEqual(list(changes), [b"foo1"])
  622. class TestValidatePathElement(TestCase):
  623. def test_default(self) -> None:
  624. self.assertTrue(validate_path_element_default(b"bla"))
  625. self.assertTrue(validate_path_element_default(b".bla"))
  626. self.assertFalse(validate_path_element_default(b".git"))
  627. self.assertFalse(validate_path_element_default(b".giT"))
  628. self.assertFalse(validate_path_element_default(b".."))
  629. self.assertTrue(validate_path_element_default(b"git~1"))
  630. def test_ntfs(self) -> None:
  631. self.assertTrue(validate_path_element_ntfs(b"bla"))
  632. self.assertTrue(validate_path_element_ntfs(b".bla"))
  633. self.assertFalse(validate_path_element_ntfs(b".git"))
  634. self.assertFalse(validate_path_element_ntfs(b".giT"))
  635. self.assertFalse(validate_path_element_ntfs(b".."))
  636. self.assertFalse(validate_path_element_ntfs(b"git~1"))
  637. class TestTreeFSPathConversion(TestCase):
  638. def test_tree_to_fs_path(self) -> None:
  639. tree_path = "délwíçh/foo".encode()
  640. fs_path = _tree_to_fs_path(b"/prefix/path", tree_path)
  641. self.assertEqual(
  642. fs_path,
  643. os.fsencode(os.path.join("/prefix/path", "délwíçh", "foo")),
  644. )
  645. def test_fs_to_tree_path_str(self) -> None:
  646. fs_path = os.path.join(os.path.join("délwíçh", "foo"))
  647. tree_path = _fs_to_tree_path(fs_path)
  648. self.assertEqual(tree_path, "délwíçh/foo".encode())
  649. def test_fs_to_tree_path_bytes(self) -> None:
  650. fs_path = os.path.join(os.fsencode(os.path.join("délwíçh", "foo")))
  651. tree_path = _fs_to_tree_path(fs_path)
  652. self.assertEqual(tree_path, "délwíçh/foo".encode())