|
|
@@ -78,46 +78,52 @@ def split_mbox(
|
|
|
raise ValueError(f"Output path is not a directory: {output_dir}")
|
|
|
|
|
|
# Open the mbox file
|
|
|
+ mbox_obj: mailbox.mbox | None = None
|
|
|
mbox_iter: Iterable[mailbox.mboxMessage]
|
|
|
if isinstance(input_file, (str, bytes)):
|
|
|
if isinstance(input_file, bytes):
|
|
|
input_file = input_file.decode("utf-8")
|
|
|
- mbox_iter = mailbox.mbox(input_file)
|
|
|
+ mbox_obj = mailbox.mbox(input_file)
|
|
|
+ mbox_iter = mbox_obj
|
|
|
else:
|
|
|
# For file-like objects, we need to read and parse manually
|
|
|
mbox_iter = _parse_mbox_from_file(input_file)
|
|
|
|
|
|
- output_files = []
|
|
|
- msg_number = start_number
|
|
|
+ try:
|
|
|
+ output_files = []
|
|
|
+ msg_number = start_number
|
|
|
|
|
|
- for message in mbox_iter:
|
|
|
- # Format the output filename with the specified precision
|
|
|
- output_filename = f"{msg_number:0{precision}d}"
|
|
|
- output_file_path = output_path / output_filename
|
|
|
+ for message in mbox_iter:
|
|
|
+ # Format the output filename with the specified precision
|
|
|
+ output_filename = f"{msg_number:0{precision}d}"
|
|
|
+ output_file_path = output_path / output_filename
|
|
|
|
|
|
- # Write the message to the output file
|
|
|
- with open(output_file_path, "wb") as f:
|
|
|
- message_bytes = bytes(message)
|
|
|
+ # Write the message to the output file
|
|
|
+ with open(output_file_path, "wb") as f:
|
|
|
+ message_bytes = bytes(message)
|
|
|
|
|
|
- # Handle mboxrd format - reverse the escaping
|
|
|
- if mboxrd:
|
|
|
- message_bytes = _reverse_mboxrd_escaping(message_bytes)
|
|
|
+ # Handle mboxrd format - reverse the escaping
|
|
|
+ if mboxrd:
|
|
|
+ message_bytes = _reverse_mboxrd_escaping(message_bytes)
|
|
|
|
|
|
- # Handle CR/LF if needed
|
|
|
- if not keep_cr:
|
|
|
- message_bytes = message_bytes.replace(b"\r\n", b"\n")
|
|
|
+ # Handle CR/LF if needed
|
|
|
+ if not keep_cr:
|
|
|
+ message_bytes = message_bytes.replace(b"\r\n", b"\n")
|
|
|
|
|
|
- # Strip trailing newlines (mailbox module adds separator newlines)
|
|
|
- message_bytes = message_bytes.rstrip(b"\n")
|
|
|
- if message_bytes:
|
|
|
- message_bytes += b"\n"
|
|
|
+ # Strip trailing newlines (mailbox module adds separator newlines)
|
|
|
+ message_bytes = message_bytes.rstrip(b"\n")
|
|
|
+ if message_bytes:
|
|
|
+ message_bytes += b"\n"
|
|
|
|
|
|
- f.write(message_bytes)
|
|
|
+ f.write(message_bytes)
|
|
|
|
|
|
- output_files.append(str(output_file_path))
|
|
|
- msg_number += 1
|
|
|
+ output_files.append(str(output_file_path))
|
|
|
+ msg_number += 1
|
|
|
|
|
|
- return output_files
|
|
|
+ return output_files
|
|
|
+ finally:
|
|
|
+ if mbox_obj is not None:
|
|
|
+ mbox_obj.close()
|
|
|
|
|
|
|
|
|
def split_maildir(
|
|
|
@@ -167,33 +173,36 @@ def split_maildir(
|
|
|
# Open the Maildir
|
|
|
md = mailbox.Maildir(str(maildir), factory=None)
|
|
|
|
|
|
- # Get all messages and sort by their keys to ensure consistent ordering
|
|
|
- sorted_keys = sorted(md.keys())
|
|
|
+ try:
|
|
|
+ # Get all messages and sort by their keys to ensure consistent ordering
|
|
|
+ sorted_keys = sorted(md.keys())
|
|
|
|
|
|
- output_files = []
|
|
|
- msg_number = start_number
|
|
|
+ output_files = []
|
|
|
+ msg_number = start_number
|
|
|
|
|
|
- for key in sorted_keys:
|
|
|
- message = md[key]
|
|
|
+ for key in sorted_keys:
|
|
|
+ message = md[key]
|
|
|
|
|
|
- # Format the output filename with the specified precision
|
|
|
- output_filename = f"{msg_number:0{precision}d}"
|
|
|
- output_file_path = output_path / output_filename
|
|
|
+ # Format the output filename with the specified precision
|
|
|
+ output_filename = f"{msg_number:0{precision}d}"
|
|
|
+ output_file_path = output_path / output_filename
|
|
|
|
|
|
- # Write the message to the output file
|
|
|
- with open(output_file_path, "wb") as f:
|
|
|
- message_bytes = bytes(message)
|
|
|
+ # Write the message to the output file
|
|
|
+ with open(output_file_path, "wb") as f:
|
|
|
+ message_bytes = bytes(message)
|
|
|
|
|
|
- # Handle CR/LF if needed
|
|
|
- if not keep_cr:
|
|
|
- message_bytes = message_bytes.replace(b"\r\n", b"\n")
|
|
|
+ # Handle CR/LF if needed
|
|
|
+ if not keep_cr:
|
|
|
+ message_bytes = message_bytes.replace(b"\r\n", b"\n")
|
|
|
|
|
|
- f.write(message_bytes)
|
|
|
+ f.write(message_bytes)
|
|
|
|
|
|
- output_files.append(str(output_file_path))
|
|
|
- msg_number += 1
|
|
|
+ output_files.append(str(output_file_path))
|
|
|
+ msg_number += 1
|
|
|
|
|
|
- return output_files
|
|
|
+ return output_files
|
|
|
+ finally:
|
|
|
+ md.close()
|
|
|
|
|
|
|
|
|
def _parse_mbox_from_file(file_obj: BinaryIO) -> Iterator[mailbox.mboxMessage]:
|