| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308 |
- # mbox.py -- For dealing with mbox files
- # Copyright (C) 2025 Jelmer Vernooij <jelmer@jelmer.uk>
- #
- # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
- # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
- # General Public License as published by the Free Software Foundation; version 2.0
- # or (at your option) any later version. You can redistribute it and/or
- # modify it under the terms of either of these two licenses.
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- # You should have received a copy of the licenses; if not, see
- # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
- # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
- # License, Version 2.0.
- #
- """Classes for dealing with mbox files and Maildir.
- This module provides functionality to split mbox files and Maildir
- into individual message files, similar to git mailsplit, and to extract
- patch information from email messages, similar to git mailinfo.
- """
- __all__ = [
- "mailinfo",
- "split_maildir",
- "split_mbox",
- ]
- import mailbox
- import os
- from collections.abc import Iterable, Iterator
- from pathlib import Path
- from typing import TYPE_CHECKING, BinaryIO, TextIO
- if TYPE_CHECKING:
- from .patch import MailinfoResult
- def split_mbox(
- input_file: str | bytes | BinaryIO,
- output_dir: str | bytes | Path,
- start_number: int = 1,
- precision: int = 4,
- keep_cr: bool = False,
- mboxrd: bool = False,
- ) -> list[str]:
- r"""Split an mbox file into individual message files.
- Args:
- input_file: Path to mbox file or file-like object. If None, reads from stdin.
- output_dir: Directory where individual messages will be written
- start_number: Starting number for output files (default: 1)
- precision: Number of digits for output filenames (default: 4)
- keep_cr: If True, preserve \r in lines ending with \r\n (default: False)
- mboxrd: If True, treat input as mboxrd format and reverse escaping (default: False)
- Returns:
- List of output file paths that were created
- Raises:
- ValueError: If output_dir doesn't exist or isn't a directory
- OSError: If there are issues reading/writing files
- """
- # Convert output_dir to Path for easier manipulation
- if isinstance(output_dir, bytes):
- output_dir = output_dir.decode("utf-8")
- output_path = Path(output_dir)
- if not output_path.exists():
- raise ValueError(f"Output directory does not exist: {output_dir}")
- if not output_path.is_dir():
- raise ValueError(f"Output path is not a directory: {output_dir}")
- # Open the mbox file
- mbox_obj: mailbox.mbox | None = None
- mbox_iter: Iterable[mailbox.mboxMessage]
- if isinstance(input_file, (str, bytes)):
- if isinstance(input_file, bytes):
- input_file = input_file.decode("utf-8")
- mbox_obj = mailbox.mbox(input_file)
- mbox_iter = mbox_obj
- else:
- # For file-like objects, we need to read and parse manually
- mbox_iter = _parse_mbox_from_file(input_file)
- try:
- output_files = []
- msg_number = start_number
- for message in mbox_iter:
- # Format the output filename with the specified precision
- output_filename = f"{msg_number:0{precision}d}"
- output_file_path = output_path / output_filename
- # Write the message to the output file
- with open(output_file_path, "wb") as f:
- message_bytes = bytes(message)
- # Handle mboxrd format - reverse the escaping
- if mboxrd:
- message_bytes = _reverse_mboxrd_escaping(message_bytes)
- # Handle CR/LF if needed
- if not keep_cr:
- message_bytes = message_bytes.replace(b"\r\n", b"\n")
- # Strip trailing newlines (mailbox module adds separator newlines)
- message_bytes = message_bytes.rstrip(b"\n")
- if message_bytes:
- message_bytes += b"\n"
- f.write(message_bytes)
- output_files.append(str(output_file_path))
- msg_number += 1
- return output_files
- finally:
- if mbox_obj is not None:
- mbox_obj.close()
- def split_maildir(
- maildir_path: str | bytes | Path,
- output_dir: str | bytes | Path,
- start_number: int = 1,
- precision: int = 4,
- keep_cr: bool = False,
- ) -> list[str]:
- r"""Split a Maildir into individual message files.
- Maildir splitting relies upon filenames being sorted to output
- patches in the correct order.
- Args:
- maildir_path: Path to the Maildir directory (should contain cur, tmp, new subdirectories)
- output_dir: Directory where individual messages will be written
- start_number: Starting number for output files (default: 1)
- precision: Number of digits for output filenames (default: 4)
- keep_cr: If True, preserve \r in lines ending with \r\n (default: False)
- Returns:
- List of output file paths that were created
- Raises:
- ValueError: If maildir_path or output_dir don't exist or aren't valid
- OSError: If there are issues reading/writing files
- """
- # Convert paths to Path objects
- if isinstance(maildir_path, bytes):
- maildir_path = maildir_path.decode("utf-8")
- if isinstance(output_dir, bytes):
- output_dir = output_dir.decode("utf-8")
- maildir = Path(maildir_path)
- output_path = Path(output_dir)
- if not maildir.exists():
- raise ValueError(f"Maildir does not exist: {maildir_path}")
- if not maildir.is_dir():
- raise ValueError(f"Maildir path is not a directory: {maildir_path}")
- if not output_path.exists():
- raise ValueError(f"Output directory does not exist: {output_dir}")
- if not output_path.is_dir():
- raise ValueError(f"Output path is not a directory: {output_dir}")
- # Open the Maildir
- md = mailbox.Maildir(str(maildir), factory=None)
- try:
- # Get all messages and sort by their keys to ensure consistent ordering
- sorted_keys = sorted(md.keys())
- output_files = []
- msg_number = start_number
- for key in sorted_keys:
- message = md[key]
- # Format the output filename with the specified precision
- output_filename = f"{msg_number:0{precision}d}"
- output_file_path = output_path / output_filename
- # Write the message to the output file
- with open(output_file_path, "wb") as f:
- message_bytes = bytes(message)
- # Handle CR/LF if needed
- if not keep_cr:
- message_bytes = message_bytes.replace(b"\r\n", b"\n")
- f.write(message_bytes)
- output_files.append(str(output_file_path))
- msg_number += 1
- return output_files
- finally:
- md.close()
- def _parse_mbox_from_file(file_obj: BinaryIO) -> Iterator[mailbox.mboxMessage]:
- """Parse mbox format from a file-like object.
- Args:
- file_obj: Binary file-like object containing mbox data
- Yields:
- Individual mboxMessage objects
- """
- import tempfile
- # Create a temporary file to hold the mbox data
- with tempfile.NamedTemporaryFile(mode="wb", delete=False) as tmp:
- tmp.write(file_obj.read())
- tmp_path = tmp.name
- mbox = mailbox.mbox(tmp_path)
- try:
- yield from mbox
- finally:
- mbox.close()
- os.unlink(tmp_path)
- def _reverse_mboxrd_escaping(message_bytes: bytes) -> bytes:
- """Reverse mboxrd escaping (^>+From lines).
- In mboxrd format, lines matching ^>+From have one leading ">" removed.
- Args:
- message_bytes: Message content with mboxrd escaping
- Returns:
- Message content with escaping reversed
- """
- lines = message_bytes.split(b"\n")
- result_lines = []
- for line in lines:
- # Check if line matches the pattern ^>+From (one or more > followed by From)
- if line.startswith(b">") and line.lstrip(b">").startswith(b"From "):
- # Remove one leading ">"
- result_lines.append(line[1:])
- else:
- result_lines.append(line)
- return b"\n".join(result_lines)
- def mailinfo(
- input_file: str | bytes | BinaryIO | TextIO,
- keep_subject: bool = False,
- keep_non_patch: bool = False,
- encoding: str | None = None,
- scissors: bool = False,
- message_id: bool = False,
- ) -> "MailinfoResult":
- """Extract patch information from an email message.
- High-level wrapper around patch.mailinfo() that handles file I/O.
- Args:
- input_file: Path to email file or file-like object (binary or text)
- keep_subject: If True, keep subject intact without munging (-k)
- keep_non_patch: If True, only strip [PATCH] from brackets (-b)
- encoding: Character encoding to use (default: detect from message)
- scissors: If True, remove everything before scissors line
- message_id: If True, include Message-ID in commit message (-m)
- Returns:
- MailinfoResult with parsed information (from patch.mailinfo)
- Raises:
- ValueError: If message is malformed or missing required fields
- OSError: If there are issues reading the file
- """
- from .patch import mailinfo as patch_mailinfo
- # Handle file path input
- if isinstance(input_file, (str, bytes)):
- if isinstance(input_file, bytes):
- input_file = input_file.decode("utf-8")
- with open(input_file, "rb") as f:
- return patch_mailinfo(
- f,
- keep_subject=keep_subject,
- keep_non_patch=keep_non_patch,
- encoding=encoding,
- scissors=scissors,
- message_id=message_id,
- )
- # Handle file-like objects
- return patch_mailinfo(
- input_file,
- keep_subject=keep_subject,
- keep_non_patch=keep_non_patch,
- encoding=encoding,
- scissors=scissors,
- message_id=message_id,
- )
|