test_mbox.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515
  1. # test_mbox.py -- tests for mbox.py
  2. # Copyright (C) 2025 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for mbox.py."""
  22. import mailbox
  23. import os
  24. import tempfile
  25. from io import BytesIO
  26. from dulwich import porcelain
  27. from dulwich.mbox import split_maildir, split_mbox
  28. from . import TestCase
  29. class SplitMboxTests(TestCase):
  30. """Tests for split_mbox function."""
  31. def test_split_simple_mbox(self) -> None:
  32. """Test splitting a simple mbox with two messages."""
  33. mbox_content = b"""\
  34. From alice@example.com Mon Jan 01 00:00:00 2025
  35. From: Alice <alice@example.com>
  36. To: Bob <bob@example.com>
  37. Subject: First message
  38. This is the first message.
  39. From bob@example.com Mon Jan 01 00:01:00 2025
  40. From: Bob <bob@example.com>
  41. To: Alice <alice@example.com>
  42. Subject: Second message
  43. This is the second message.
  44. """
  45. with tempfile.TemporaryDirectory() as tmpdir:
  46. # Create temporary mbox file
  47. mbox_path = os.path.join(tmpdir, "test.mbox")
  48. with open(mbox_path, "wb") as f:
  49. f.write(mbox_content)
  50. output_dir = os.path.join(tmpdir, "output")
  51. os.makedirs(output_dir)
  52. # Split the mbox
  53. output_files = split_mbox(mbox_path, output_dir)
  54. # Verify output
  55. self.assertEqual(len(output_files), 2)
  56. self.assertEqual(output_files[0], os.path.join(output_dir, "0001"))
  57. self.assertEqual(output_files[1], os.path.join(output_dir, "0002"))
  58. # Check first message
  59. with open(output_files[0], "rb") as f:
  60. content = f.read()
  61. expected = b"""\
  62. From: Alice <alice@example.com>
  63. To: Bob <bob@example.com>
  64. Subject: First message
  65. This is the first message.
  66. """
  67. self.assertEqual(content, expected)
  68. # Check second message
  69. with open(output_files[1], "rb") as f:
  70. content = f.read()
  71. expected = b"""\
  72. From: Bob <bob@example.com>
  73. To: Alice <alice@example.com>
  74. Subject: Second message
  75. This is the second message.
  76. """
  77. self.assertEqual(content, expected)
  78. def test_split_mbox_with_precision(self) -> None:
  79. """Test splitting mbox with custom precision."""
  80. mbox_content = b"""\
  81. From test@example.com Mon Jan 01 00:00:00 2025
  82. From: Test <test@example.com>
  83. Subject: Test
  84. Test message.
  85. """
  86. with tempfile.TemporaryDirectory() as tmpdir:
  87. mbox_path = os.path.join(tmpdir, "test.mbox")
  88. with open(mbox_path, "wb") as f:
  89. f.write(mbox_content)
  90. output_dir = os.path.join(tmpdir, "output")
  91. os.makedirs(output_dir)
  92. # Split with precision=2
  93. output_files = split_mbox(mbox_path, output_dir, precision=2)
  94. self.assertEqual(len(output_files), 1)
  95. self.assertEqual(output_files[0], os.path.join(output_dir, "01"))
  96. def test_split_mbox_with_start_number(self) -> None:
  97. """Test splitting mbox with custom start number."""
  98. mbox_content = b"""\
  99. From test@example.com Mon Jan 01 00:00:00 2025
  100. From: Test <test@example.com>
  101. Subject: Test
  102. Test message.
  103. """
  104. with tempfile.TemporaryDirectory() as tmpdir:
  105. mbox_path = os.path.join(tmpdir, "test.mbox")
  106. with open(mbox_path, "wb") as f:
  107. f.write(mbox_content)
  108. output_dir = os.path.join(tmpdir, "output")
  109. os.makedirs(output_dir)
  110. # Split starting at message 10
  111. output_files = split_mbox(mbox_path, output_dir, start_number=10)
  112. self.assertEqual(len(output_files), 1)
  113. self.assertEqual(output_files[0], os.path.join(output_dir, "0010"))
  114. def test_split_mbox_keep_cr(self) -> None:
  115. """Test splitting mbox with keep_cr option."""
  116. # Note: Python's mailbox module normalizes line endings, so this test
  117. # verifies that keep_cr=False removes CR while keep_cr=True preserves
  118. # whatever the mailbox module outputs
  119. mbox_content = b"""\
  120. From test@example.com Mon Jan 01 00:00:00 2025
  121. From: Test <test@example.com>
  122. Subject: Test
  123. Test message.
  124. """
  125. with tempfile.TemporaryDirectory() as tmpdir:
  126. mbox_path = os.path.join(tmpdir, "test.mbox")
  127. with open(mbox_path, "wb") as f:
  128. f.write(mbox_content)
  129. output_dir = os.path.join(tmpdir, "output")
  130. os.makedirs(output_dir)
  131. # Split without keep_cr (default removes \r\n)
  132. output_files_no_cr = split_mbox(mbox_path, output_dir, keep_cr=False)
  133. with open(output_files_no_cr[0], "rb") as f:
  134. content_no_cr = f.read()
  135. # Verify the output
  136. self.assertEqual(len(output_files_no_cr), 1)
  137. expected = b"""\
  138. From: Test <test@example.com>
  139. Subject: Test
  140. Test message.
  141. """
  142. self.assertEqual(content_no_cr, expected)
  143. def test_split_mbox_from_file_object(self) -> None:
  144. """Test splitting mbox from a file-like object."""
  145. mbox_content = b"""\
  146. From test@example.com Mon Jan 01 00:00:00 2025
  147. From: Test <test@example.com>
  148. Subject: Test
  149. Test message.
  150. """
  151. with tempfile.TemporaryDirectory() as tmpdir:
  152. output_dir = os.path.join(tmpdir, "output")
  153. os.makedirs(output_dir)
  154. # Split from BytesIO
  155. output_files = split_mbox(BytesIO(mbox_content), output_dir)
  156. self.assertEqual(len(output_files), 1)
  157. self.assertTrue(os.path.exists(output_files[0]))
  158. def test_split_mbox_output_dir_not_exists(self) -> None:
  159. """Test that split_mbox raises ValueError if output_dir doesn't exist."""
  160. mbox_content = b"From test@example.com Mon Jan 01 00:00:00 2025\n"
  161. with tempfile.TemporaryDirectory() as tmpdir:
  162. mbox_path = os.path.join(tmpdir, "test.mbox")
  163. with open(mbox_path, "wb") as f:
  164. f.write(mbox_content)
  165. nonexistent_dir = os.path.join(tmpdir, "nonexistent")
  166. with self.assertRaises(ValueError) as cm:
  167. split_mbox(mbox_path, nonexistent_dir)
  168. self.assertIn("does not exist", str(cm.exception))
  169. def test_split_mboxrd(self) -> None:
  170. """Test splitting mboxrd format with >From escaping.
  171. In mboxrd format, lines starting with ">From " have one leading ">" removed.
  172. So ">From " becomes "From " and ">>From " becomes ">From ".
  173. """
  174. mbox_content = b"""\
  175. From test@example.com Mon Jan 01 00:00:00 2025
  176. From: Test <test@example.com>
  177. Subject: Test
  178. >From the beginning...
  179. >>From the middle...
  180. """
  181. with tempfile.TemporaryDirectory() as tmpdir:
  182. mbox_path = os.path.join(tmpdir, "test.mbox")
  183. with open(mbox_path, "wb") as f:
  184. f.write(mbox_content)
  185. output_dir = os.path.join(tmpdir, "output")
  186. os.makedirs(output_dir)
  187. # Split with mboxrd=True
  188. output_files = split_mbox(mbox_path, output_dir, mboxrd=True)
  189. self.assertEqual(len(output_files), 1)
  190. # Check that >From escaping was reversed (one ">" removed per line)
  191. with open(output_files[0], "rb") as f:
  192. content = f.read()
  193. expected = b"""\
  194. From: Test <test@example.com>
  195. Subject: Test
  196. From the beginning...
  197. >From the middle...
  198. """
  199. self.assertEqual(content, expected)
  200. class SplitMaildirTests(TestCase):
  201. """Tests for split_maildir function."""
  202. def test_split_maildir(self) -> None:
  203. """Test splitting a Maildir."""
  204. with tempfile.TemporaryDirectory() as tmpdir:
  205. # Create a Maildir
  206. maildir_path = os.path.join(tmpdir, "maildir")
  207. md = mailbox.Maildir(maildir_path)
  208. # Add two messages
  209. msg1 = mailbox.MaildirMessage()
  210. msg1.set_payload(b"First message")
  211. msg1["From"] = "alice@example.com"
  212. msg1["Subject"] = "First"
  213. md.add(msg1)
  214. msg2 = mailbox.MaildirMessage()
  215. msg2.set_payload(b"Second message")
  216. msg2["From"] = "bob@example.com"
  217. msg2["Subject"] = "Second"
  218. md.add(msg2)
  219. output_dir = os.path.join(tmpdir, "output")
  220. os.makedirs(output_dir)
  221. # Split the Maildir
  222. output_files = split_maildir(maildir_path, output_dir)
  223. # Verify output
  224. self.assertEqual(len(output_files), 2)
  225. self.assertTrue(all(os.path.exists(f) for f in output_files))
  226. # Check that files are numbered correctly
  227. self.assertEqual(output_files[0], os.path.join(output_dir, "0001"))
  228. self.assertEqual(output_files[1], os.path.join(output_dir, "0002"))
  229. def test_split_maildir_not_exists(self) -> None:
  230. """Test that split_maildir raises ValueError if Maildir doesn't exist."""
  231. with tempfile.TemporaryDirectory() as tmpdir:
  232. nonexistent_dir = os.path.join(tmpdir, "nonexistent")
  233. output_dir = os.path.join(tmpdir, "output")
  234. os.makedirs(output_dir)
  235. with self.assertRaises(ValueError) as cm:
  236. split_maildir(nonexistent_dir, output_dir)
  237. self.assertIn("does not exist", str(cm.exception))
  238. def test_split_maildir_with_precision(self) -> None:
  239. """Test splitting Maildir with custom precision."""
  240. with tempfile.TemporaryDirectory() as tmpdir:
  241. maildir_path = os.path.join(tmpdir, "maildir")
  242. md = mailbox.Maildir(maildir_path)
  243. msg = mailbox.MaildirMessage()
  244. msg.set_payload(b"Test message")
  245. msg["From"] = "test@example.com"
  246. md.add(msg)
  247. output_dir = os.path.join(tmpdir, "output")
  248. os.makedirs(output_dir)
  249. # Split with precision=2
  250. output_files = split_maildir(maildir_path, output_dir, precision=2)
  251. self.assertEqual(len(output_files), 1)
  252. self.assertEqual(output_files[0], os.path.join(output_dir, "01"))
  253. class PorcelainMailsplitTests(TestCase):
  254. """Tests for porcelain.mailsplit function."""
  255. def test_mailsplit_mbox(self) -> None:
  256. """Test porcelain mailsplit with mbox file."""
  257. mbox_content = b"""\
  258. From alice@example.com Mon Jan 01 00:00:00 2025
  259. From: Alice <alice@example.com>
  260. Subject: Test
  261. Test message.
  262. """
  263. with tempfile.TemporaryDirectory() as tmpdir:
  264. mbox_path = os.path.join(tmpdir, "test.mbox")
  265. with open(mbox_path, "wb") as f:
  266. f.write(mbox_content)
  267. output_dir = os.path.join(tmpdir, "output")
  268. os.makedirs(output_dir)
  269. # Split using porcelain function
  270. output_files = porcelain.mailsplit(
  271. input_path=mbox_path, output_dir=output_dir
  272. )
  273. self.assertEqual(len(output_files), 1)
  274. self.assertEqual(output_files[0], os.path.join(output_dir, "0001"))
  275. def test_mailsplit_maildir(self) -> None:
  276. """Test porcelain mailsplit with Maildir."""
  277. with tempfile.TemporaryDirectory() as tmpdir:
  278. # Create a Maildir
  279. maildir_path = os.path.join(tmpdir, "maildir")
  280. md = mailbox.Maildir(maildir_path)
  281. msg = mailbox.MaildirMessage()
  282. msg.set_payload(b"Test message")
  283. msg["From"] = "test@example.com"
  284. md.add(msg)
  285. output_dir = os.path.join(tmpdir, "output")
  286. os.makedirs(output_dir)
  287. # Split using porcelain function with is_maildir=True
  288. output_files = porcelain.mailsplit(
  289. input_path=maildir_path, output_dir=output_dir, is_maildir=True
  290. )
  291. self.assertEqual(len(output_files), 1)
  292. self.assertTrue(os.path.exists(output_files[0]))
  293. def test_mailsplit_with_options(self) -> None:
  294. """Test porcelain mailsplit with various options."""
  295. mbox_content = b"""\
  296. From test@example.com Mon Jan 01 00:00:00 2025
  297. From: Test <test@example.com>
  298. Subject: Test
  299. Test message.
  300. """
  301. with tempfile.TemporaryDirectory() as tmpdir:
  302. mbox_path = os.path.join(tmpdir, "test.mbox")
  303. with open(mbox_path, "wb") as f:
  304. f.write(mbox_content)
  305. output_dir = os.path.join(tmpdir, "output")
  306. os.makedirs(output_dir)
  307. # Split with custom options
  308. output_files = porcelain.mailsplit(
  309. input_path=mbox_path,
  310. output_dir=output_dir,
  311. start_number=5,
  312. precision=3,
  313. keep_cr=True,
  314. )
  315. self.assertEqual(len(output_files), 1)
  316. self.assertEqual(output_files[0], os.path.join(output_dir, "005"))
  317. def test_mailsplit_mboxrd(self) -> None:
  318. """Test porcelain mailsplit with mboxrd format."""
  319. mbox_content = b"""\
  320. From test@example.com Mon Jan 01 00:00:00 2025
  321. From: Test <test@example.com>
  322. Subject: Test
  323. >From quoted text
  324. """
  325. with tempfile.TemporaryDirectory() as tmpdir:
  326. mbox_path = os.path.join(tmpdir, "test.mbox")
  327. with open(mbox_path, "wb") as f:
  328. f.write(mbox_content)
  329. output_dir = os.path.join(tmpdir, "output")
  330. os.makedirs(output_dir)
  331. # Split with mboxrd=True
  332. output_files = porcelain.mailsplit(
  333. input_path=mbox_path, output_dir=output_dir, mboxrd=True
  334. )
  335. self.assertEqual(len(output_files), 1)
  336. # Verify >From escaping was reversed
  337. with open(output_files[0], "rb") as f:
  338. content = f.read()
  339. expected = b"""\
  340. From: Test <test@example.com>
  341. Subject: Test
  342. From quoted text
  343. """
  344. self.assertEqual(content, expected)
  345. def test_mailsplit_maildir_requires_path(self) -> None:
  346. """Test that mailsplit raises ValueError when is_maildir=True but no input_path."""
  347. with tempfile.TemporaryDirectory() as tmpdir:
  348. output_dir = os.path.join(tmpdir, "output")
  349. os.makedirs(output_dir)
  350. with self.assertRaises(ValueError) as cm:
  351. porcelain.mailsplit(
  352. input_path=None, output_dir=output_dir, is_maildir=True
  353. )
  354. self.assertIn("required", str(cm.exception).lower())
  355. class MailinfoTests(TestCase):
  356. """Tests for mbox.mailinfo function."""
  357. def test_mailinfo_from_file_path(self) -> None:
  358. """Test mailinfo with file path."""
  359. from dulwich.mbox import mailinfo
  360. email_content = b"""From: Test User <test@example.com>
  361. Subject: [PATCH] Test patch
  362. Date: Mon, 1 Jan 2024 12:00:00 +0000
  363. This is the commit message.
  364. ---
  365. diff --git a/test.txt b/test.txt
  366. """
  367. with tempfile.TemporaryDirectory() as tmpdir:
  368. email_path = os.path.join(tmpdir, "email.txt")
  369. with open(email_path, "wb") as f:
  370. f.write(email_content)
  371. # Test with file path
  372. result = mailinfo(email_path)
  373. self.assertEqual("Test User", result.author_name)
  374. self.assertEqual("test@example.com", result.author_email)
  375. self.assertEqual("Test patch", result.subject)
  376. self.assertIn("This is the commit message.", result.message)
  377. self.assertIn("diff --git", result.patch)
  378. def test_mailinfo_from_file_object(self) -> None:
  379. """Test mailinfo with file-like object."""
  380. from dulwich.mbox import mailinfo
  381. email_content = b"""From: Test User <test@example.com>
  382. Subject: Test subject
  383. Body text
  384. """
  385. result = mailinfo(BytesIO(email_content))
  386. self.assertEqual("Test User", result.author_name)
  387. self.assertEqual("test@example.com", result.author_email)
  388. self.assertEqual("Test subject", result.subject)
  389. def test_mailinfo_with_options(self) -> None:
  390. """Test mailinfo with various options."""
  391. from dulwich.mbox import mailinfo
  392. email_content = b"""From: Test <test@example.com>
  393. Subject: [PATCH] Feature
  394. Message-ID: <test123@example.com>
  395. Ignore this
  396. -- >8 --
  397. Keep this
  398. """
  399. # Test with scissors and message_id
  400. result = mailinfo(
  401. BytesIO(email_content), scissors=True, message_id=True, keep_subject=False
  402. )
  403. self.assertEqual("Feature", result.subject)
  404. self.assertIn("Keep this", result.message)
  405. self.assertNotIn("Ignore this", result.message)
  406. self.assertIn("Message-ID:", result.message)