mbox.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. # mbox.py -- For dealing with mbox files
  2. # Copyright (C) 2025 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Classes for dealing with mbox files and Maildir.
  22. This module provides functionality to split mbox files and Maildir
  23. into individual message files, similar to git mailsplit, and to extract
  24. patch information from email messages, similar to git mailinfo.
  25. """
  26. __all__ = [
  27. "mailinfo",
  28. "split_maildir",
  29. "split_mbox",
  30. ]
  31. import mailbox
  32. import os
  33. from collections.abc import Iterable, Iterator
  34. from pathlib import Path
  35. from typing import TYPE_CHECKING, BinaryIO, TextIO
  36. if TYPE_CHECKING:
  37. from .patch import MailinfoResult
  38. def split_mbox(
  39. input_file: str | bytes | BinaryIO,
  40. output_dir: str | bytes | Path,
  41. start_number: int = 1,
  42. precision: int = 4,
  43. keep_cr: bool = False,
  44. mboxrd: bool = False,
  45. ) -> list[str]:
  46. r"""Split an mbox file into individual message files.
  47. Args:
  48. input_file: Path to mbox file or file-like object. If None, reads from stdin.
  49. output_dir: Directory where individual messages will be written
  50. start_number: Starting number for output files (default: 1)
  51. precision: Number of digits for output filenames (default: 4)
  52. keep_cr: If True, preserve \r in lines ending with \r\n (default: False)
  53. mboxrd: If True, treat input as mboxrd format and reverse escaping (default: False)
  54. Returns:
  55. List of output file paths that were created
  56. Raises:
  57. ValueError: If output_dir doesn't exist or isn't a directory
  58. OSError: If there are issues reading/writing files
  59. """
  60. # Convert output_dir to Path for easier manipulation
  61. if isinstance(output_dir, bytes):
  62. output_dir = output_dir.decode("utf-8")
  63. output_path = Path(output_dir)
  64. if not output_path.exists():
  65. raise ValueError(f"Output directory does not exist: {output_dir}")
  66. if not output_path.is_dir():
  67. raise ValueError(f"Output path is not a directory: {output_dir}")
  68. # Open the mbox file
  69. mbox_iter: Iterable[mailbox.mboxMessage]
  70. if isinstance(input_file, (str, bytes)):
  71. if isinstance(input_file, bytes):
  72. input_file = input_file.decode("utf-8")
  73. mbox_iter = mailbox.mbox(input_file)
  74. else:
  75. # For file-like objects, we need to read and parse manually
  76. mbox_iter = _parse_mbox_from_file(input_file)
  77. output_files = []
  78. msg_number = start_number
  79. for message in mbox_iter:
  80. # Format the output filename with the specified precision
  81. output_filename = f"{msg_number:0{precision}d}"
  82. output_file_path = output_path / output_filename
  83. # Write the message to the output file
  84. with open(output_file_path, "wb") as f:
  85. message_bytes = bytes(message)
  86. # Handle mboxrd format - reverse the escaping
  87. if mboxrd:
  88. message_bytes = _reverse_mboxrd_escaping(message_bytes)
  89. # Handle CR/LF if needed
  90. if not keep_cr:
  91. message_bytes = message_bytes.replace(b"\r\n", b"\n")
  92. # Strip trailing newlines (mailbox module adds separator newlines)
  93. message_bytes = message_bytes.rstrip(b"\n")
  94. if message_bytes:
  95. message_bytes += b"\n"
  96. f.write(message_bytes)
  97. output_files.append(str(output_file_path))
  98. msg_number += 1
  99. return output_files
  100. def split_maildir(
  101. maildir_path: str | bytes | Path,
  102. output_dir: str | bytes | Path,
  103. start_number: int = 1,
  104. precision: int = 4,
  105. keep_cr: bool = False,
  106. ) -> list[str]:
  107. r"""Split a Maildir into individual message files.
  108. Maildir splitting relies upon filenames being sorted to output
  109. patches in the correct order.
  110. Args:
  111. maildir_path: Path to the Maildir directory (should contain cur, tmp, new subdirectories)
  112. output_dir: Directory where individual messages will be written
  113. start_number: Starting number for output files (default: 1)
  114. precision: Number of digits for output filenames (default: 4)
  115. keep_cr: If True, preserve \r in lines ending with \r\n (default: False)
  116. Returns:
  117. List of output file paths that were created
  118. Raises:
  119. ValueError: If maildir_path or output_dir don't exist or aren't valid
  120. OSError: If there are issues reading/writing files
  121. """
  122. # Convert paths to Path objects
  123. if isinstance(maildir_path, bytes):
  124. maildir_path = maildir_path.decode("utf-8")
  125. if isinstance(output_dir, bytes):
  126. output_dir = output_dir.decode("utf-8")
  127. maildir = Path(maildir_path)
  128. output_path = Path(output_dir)
  129. if not maildir.exists():
  130. raise ValueError(f"Maildir does not exist: {maildir_path}")
  131. if not maildir.is_dir():
  132. raise ValueError(f"Maildir path is not a directory: {maildir_path}")
  133. if not output_path.exists():
  134. raise ValueError(f"Output directory does not exist: {output_dir}")
  135. if not output_path.is_dir():
  136. raise ValueError(f"Output path is not a directory: {output_dir}")
  137. # Open the Maildir
  138. md = mailbox.Maildir(str(maildir), factory=None)
  139. # Get all messages and sort by their keys to ensure consistent ordering
  140. sorted_keys = sorted(md.keys())
  141. output_files = []
  142. msg_number = start_number
  143. for key in sorted_keys:
  144. message = md[key]
  145. # Format the output filename with the specified precision
  146. output_filename = f"{msg_number:0{precision}d}"
  147. output_file_path = output_path / output_filename
  148. # Write the message to the output file
  149. with open(output_file_path, "wb") as f:
  150. message_bytes = bytes(message)
  151. # Handle CR/LF if needed
  152. if not keep_cr:
  153. message_bytes = message_bytes.replace(b"\r\n", b"\n")
  154. f.write(message_bytes)
  155. output_files.append(str(output_file_path))
  156. msg_number += 1
  157. return output_files
  158. def _parse_mbox_from_file(file_obj: BinaryIO) -> Iterator[mailbox.mboxMessage]:
  159. """Parse mbox format from a file-like object.
  160. Args:
  161. file_obj: Binary file-like object containing mbox data
  162. Yields:
  163. Individual mboxMessage objects
  164. """
  165. import tempfile
  166. # Create a temporary file to hold the mbox data
  167. with tempfile.NamedTemporaryFile(mode="wb", delete=False) as tmp:
  168. tmp.write(file_obj.read())
  169. tmp_path = tmp.name
  170. mbox = mailbox.mbox(tmp_path)
  171. try:
  172. yield from mbox
  173. finally:
  174. mbox.close()
  175. os.unlink(tmp_path)
  176. def _reverse_mboxrd_escaping(message_bytes: bytes) -> bytes:
  177. """Reverse mboxrd escaping (^>+From lines).
  178. In mboxrd format, lines matching ^>+From have one leading ">" removed.
  179. Args:
  180. message_bytes: Message content with mboxrd escaping
  181. Returns:
  182. Message content with escaping reversed
  183. """
  184. lines = message_bytes.split(b"\n")
  185. result_lines = []
  186. for line in lines:
  187. # Check if line matches the pattern ^>+From (one or more > followed by From)
  188. if line.startswith(b">") and line.lstrip(b">").startswith(b"From "):
  189. # Remove one leading ">"
  190. result_lines.append(line[1:])
  191. else:
  192. result_lines.append(line)
  193. return b"\n".join(result_lines)
  194. def mailinfo(
  195. input_file: str | bytes | BinaryIO | TextIO,
  196. keep_subject: bool = False,
  197. keep_non_patch: bool = False,
  198. encoding: str | None = None,
  199. scissors: bool = False,
  200. message_id: bool = False,
  201. ) -> "MailinfoResult":
  202. """Extract patch information from an email message.
  203. High-level wrapper around patch.mailinfo() that handles file I/O.
  204. Args:
  205. input_file: Path to email file or file-like object (binary or text)
  206. keep_subject: If True, keep subject intact without munging (-k)
  207. keep_non_patch: If True, only strip [PATCH] from brackets (-b)
  208. encoding: Character encoding to use (default: detect from message)
  209. scissors: If True, remove everything before scissors line
  210. message_id: If True, include Message-ID in commit message (-m)
  211. Returns:
  212. MailinfoResult with parsed information (from patch.mailinfo)
  213. Raises:
  214. ValueError: If message is malformed or missing required fields
  215. OSError: If there are issues reading the file
  216. """
  217. from .patch import mailinfo as patch_mailinfo
  218. # Handle file path input
  219. if isinstance(input_file, (str, bytes)):
  220. if isinstance(input_file, bytes):
  221. input_file = input_file.decode("utf-8")
  222. with open(input_file, "rb") as f:
  223. return patch_mailinfo(
  224. f,
  225. keep_subject=keep_subject,
  226. keep_non_patch=keep_non_patch,
  227. encoding=encoding,
  228. scissors=scissors,
  229. message_id=message_id,
  230. )
  231. # Handle file-like objects
  232. return patch_mailinfo(
  233. input_file,
  234. keep_subject=keep_subject,
  235. keep_non_patch=keep_non_patch,
  236. encoding=encoding,
  237. scissors=scissors,
  238. message_id=message_id,
  239. )