@@ -1,11 +1,9 @@
-import asyncore
import mimetypes
import os
import shutil
-import smtpd
+import socket
import sys
import tempfile
-import threading
from email import charset, message_from_binary_file, message_from_bytes
from email.header import Header
from email.mime.text import MIMEText
@@ -14,7 +12,7 @@ from io import StringIO
from pathlib import Path
from smtplib import SMTP, SMTPException
from ssl import SSLError
-from unittest import mock
+from unittest import mock, skipUnless
from django.core import mail
from django.core.mail import (
@@ -27,6 +25,12 @@ from django.test import SimpleTestCase, override_settings
from django.test.utils import requires_tz_support
from django.utils.translation import gettext_lazy
+ from aiosmtpd.controller import Controller
+except ImportError:
class HeadersCheckMixin:
@@ -1336,109 +1340,78 @@ class ConsoleBackendTests(BaseEmailBackendTests, SimpleTestCase):
self.assertIn(b'\nDate: ', message)
-class FakeSMTPChannel(smtpd.SMTPChannel):
- def collect_incoming_data(self, data):
- try:
- smtpd.SMTPChannel.collect_incoming_data(self, data)
- except UnicodeDecodeError:
- pass
+class SMTPHandler:
+ def __init__(self, *args, **kwargs):
+ self.mailbox = []
+ async def handle_DATA(self, server, session, envelope):
+ data = envelope.content
+ mail_from = envelope.mail_from
-class FakeSMTPServer(smtpd.SMTPServer, threading.Thread):
- """
- Asyncore SMTP server wrapped into a thread. Based on DummyFTPServer from:
- http://svn.python.org/view/python/branches/py3k/Lib/test/test_ftplib.py?revision=86061&view=markup
- """
- channel_class = FakeSMTPChannel
- def __init__(self, *args, **kwargs):
- threading.Thread.__init__(self)
- smtpd.SMTPServer.__init__(self, *args, decode_data=True, **kwargs)
- self._sink = []
- self.active = False
- self.active_lock = threading.Lock()
- self.sink_lock = threading.Lock()
- def process_message(self, peer, mailfrom, rcpttos, data):
- data = data.encode()
- m = message_from_bytes(data)
- maddr = parseaddr(m.get('from'))[1]
- if mailfrom != maddr:
+ message = message_from_bytes(data.rstrip())
+ message_addr = parseaddr(message.get('from'))[1]
+ if mail_from != message_addr:
- lp, domain = mailfrom.split('@', 1)
+ lp, domain = mail_from.split('@', 1)
lp = Header(lp, 'utf-8').encode()
- mailfrom = '@'.join([lp, domain])
- if mailfrom != maddr:
- return "553 '%s' != '%s'" % (mailfrom, maddr)
- with self.sink_lock:
- self._sink.append(m)
- def get_sink(self):
- with self.sink_lock:
- return self._sink[:]
- def flush_sink(self):
- with self.sink_lock:
- self._sink[:] = []
+ mail_from = '@'.join([lp, domain])
- def start(self):
- assert not self.active
- self.__flag = threading.Event()
- threading.Thread.start(self)
- self.__flag.wait()
+ if mail_from != message_addr:
+ return f"553 '{mail_from}' != '{message_addr}'"
+ self.mailbox.append(message)
+ return '250 OK'
- def run(self):
- self.active = True
- self.__flag.set()
- while self.active and asyncore.socket_map:
- with self.active_lock:
- asyncore.loop(timeout=0.1, count=1)
- asyncore.close_all()
- def stop(self):
- if self.active:
- self.active = False
- self.join()
+ def flush_mailbox(self):
+ self.mailbox[:] = []
+@skipUnless(HAS_AIOSMTPD, 'No aiosmtpd library detected.')
class SMTPBackendTestsBase(SimpleTestCase):
def setUpClass(cls):
- cls.server = FakeSMTPServer(('', 0), None)
+ with socket.socket() as s:
+ s.bind(('', 0))
+ port = s.getsockname()[1]
+ cls.smtp_handler = SMTPHandler()
+ cls.smtp_controller = Controller(
+ cls.smtp_handler, hostname='', port=port,
+ )
cls._settings_override = override_settings(
- EMAIL_PORT=cls.server.socket.getsockname()[1])
+ EMAIL_HOST=cls.smtp_controller.hostname,
+ EMAIL_PORT=cls.smtp_controller.port,
+ )
- cls.server.start()
- cls.addClassCleanup(cls.server.stop)
+ cls.smtp_controller.start()
+ cls.addClassCleanup(cls.stop_smtp)
+ @classmethod
+ def stop_smtp(cls):
+ cls.smtp_controller.stop()
+@skipUnless(HAS_AIOSMTPD, 'No aiosmtpd library detected.')
class SMTPBackendTests(BaseEmailBackendTests, SMTPBackendTestsBase):
email_backend = 'django.core.mail.backends.smtp.EmailBackend'
def setUp(self):
- self.server.flush_sink()
+ self.smtp_handler.flush_mailbox()
def tearDown(self):
- self.server.flush_sink()
+ self.smtp_handler.flush_mailbox()
def flush_mailbox(self):
- self.server.flush_sink()
+ self.smtp_handler.flush_mailbox()
def get_mailbox_content(self):
- return self.server.get_sink()
+ return self.smtp_handler.mailbox
EMAIL_HOST_USER="not empty username",
@@ -1657,17 +1630,18 @@ class SMTPBackendTests(BaseEmailBackendTests, SMTPBackendTestsBase):
self.assertEqual(sent, 0)
+@skipUnless(HAS_AIOSMTPD, 'No aiosmtpd library detected.')
class SMTPBackendStoppedServerTests(SMTPBackendTestsBase):
- """
- These tests require a separate class, because the FakeSMTPServer is shut
- down in setUpClass(), and it cannot be restarted ("RuntimeError: threads
- can only be started once").
- """
def setUpClass(cls):
cls.backend = smtp.EmailBackend(username='', password='')
- cls.server.stop()
+ cls.smtp_controller.stop()
+ @classmethod
+ def stop_smtp(cls):
+ pass
def test_server_stopped(self):