tests.py 29 KB


  1. # coding: utf-8
  2. import asyncore
  3. import email
  4. import os
  5. import shutil
  6. import smtpd
  7. import sys
  8. from StringIO import StringIO
  9. import tempfile
  10. import threading
  11. from django.conf import settings
  12. from django.core import mail
  13. from django.core.mail import (EmailMessage, mail_admins, mail_managers,
  14. EmailMultiAlternatives, send_mail, send_mass_mail)
  15. from django.core.mail.backends import console, dummy, locmem, filebased, smtp
  16. from django.core.mail.message import BadHeaderError
  17. from django.test import TestCase
  18. from django.utils.translation import ugettext_lazy
  19. from django.utils.functional import wraps
  20. def alter_django_settings(**kwargs):
  21. oldvalues = {}
  22. nonexistant = []
  23. for setting, newvalue in kwargs.iteritems():
  24. try:
  25. oldvalues[setting] = getattr(settings, setting)
  26. except AttributeError:
  27. nonexistant.append(setting)
  28. setattr(settings, setting, newvalue)
  29. return oldvalues, nonexistant
  30. def restore_django_settings(state):
  31. oldvalues, nonexistant = state
  32. for setting, oldvalue in oldvalues.iteritems():
  33. setattr(settings, setting, oldvalue)
  34. for setting in nonexistant:
  35. delattr(settings, setting)
  36. def with_django_settings(**kwargs):
  37. def decorator(test):
  38. @wraps(test)
  39. def decorated_test(self):
  40. state = alter_django_settings(**kwargs)
  41. try:
  42. return test(self)
  43. finally:
  44. restore_django_settings(state)
  45. return decorated_test
  46. return decorator
  47. class MailTests(TestCase):
  48. """
  49. Non-backend specific tests.
  50. """
  51. def test_ascii(self):
  52. email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com'])
  53. message = email.message()
  54. self.assertEqual(message['Subject'].encode(), 'Subject')
  55. self.assertEqual(message.get_payload(), 'Content')
  56. self.assertEqual(message['From'], 'from@example.com')
  57. self.assertEqual(message['To'], 'to@example.com')
  58. def test_multiple_recipients(self):
  59. email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com', 'other@example.com'])
  60. message = email.message()
  61. self.assertEqual(message['Subject'].encode(), 'Subject')
  62. self.assertEqual(message.get_payload(), 'Content')
  63. self.assertEqual(message['From'], 'from@example.com')
  64. self.assertEqual(message['To'], 'to@example.com, other@example.com')
  65. def test_cc(self):
  66. """Regression test for #7722"""
  67. email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com'], cc=['cc@example.com'])
  68. message = email.message()
  69. self.assertEqual(message['Cc'], 'cc@example.com')
  70. self.assertEqual(email.recipients(), ['to@example.com', 'cc@example.com'])
  71. # Test multiple CC with multiple To
  72. email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com', 'other@example.com'], cc=['cc@example.com', 'cc.other@example.com'])
  73. message = email.message()
  74. self.assertEqual(message['Cc'], 'cc@example.com, cc.other@example.com')
  75. self.assertEqual(email.recipients(), ['to@example.com', 'other@example.com', 'cc@example.com', 'cc.other@example.com'])
  76. # Testing with Bcc
  77. email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com', 'other@example.com'], cc=['cc@example.com', 'cc.other@example.com'], bcc=['bcc@example.com'])
  78. message = email.message()
  79. self.assertEqual(message['Cc'], 'cc@example.com, cc.other@example.com')
  80. self.assertEqual(email.recipients(), ['to@example.com', 'other@example.com', 'cc@example.com', 'cc.other@example.com', 'bcc@example.com'])
  81. def test_header_injection(self):
  82. email = EmailMessage('Subject\nInjection Test', 'Content', 'from@example.com', ['to@example.com'])
  83. self.assertRaises(BadHeaderError, email.message)
  84. email = EmailMessage(ugettext_lazy('Subject\nInjection Test'), 'Content', 'from@example.com', ['to@example.com'])
  85. self.assertRaises(BadHeaderError, email.message)
  86. def test_space_continuation(self):
  87. """
  88. Test for space continuation character in long (ascii) subject headers (#7747)
  89. """
  90. email = EmailMessage('Long subject lines that get wrapped should use a space continuation character to get expected behaviour in Outlook and Thunderbird', 'Content', 'from@example.com', ['to@example.com'])
  91. message = email.message()
  92. self.assertEqual(message['Subject'], 'Long subject lines that get wrapped should use a space continuation\n character to get expected behaviour in Outlook and Thunderbird')
  93. def test_message_header_overrides(self):
  94. """
  95. Specifying dates or message-ids in the extra headers overrides the
  96. default values (#9233)
  97. """
  98. headers = {"date": "Fri, 09 Nov 2001 01:08:47 -0000", "Message-ID": "foo"}
  99. email = EmailMessage('subject', 'content', 'from@example.com', ['to@example.com'], headers=headers)
  100. self.assertEqual(email.message().as_string(), 'Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: subject\nFrom: from@example.com\nTo: to@example.com\ndate: Fri, 09 Nov 2001 01:08:47 -0000\nMessage-ID: foo\n\ncontent')
  101. def test_from_header(self):
  102. """
  103. Make sure we can manually set the From header (#9214)
  104. """
  105. email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})
  106. message = email.message()
  107. self.assertEqual(message['From'], 'from@example.com')
  108. def test_multiple_message_call(self):
  109. """
  110. Regression for #13259 - Make sure that headers are not changed when
  111. calling EmailMessage.message()
  112. """
  113. email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})
  114. message = email.message()
  115. self.assertEqual(message['From'], 'from@example.com')
  116. message = email.message()
  117. self.assertEqual(message['From'], 'from@example.com')
  118. def test_unicode_address_header(self):
  119. """
  120. Regression for #11144 - When a to/from/cc header contains unicode,
  121. make sure the email addresses are parsed correctly (especially with
  122. regards to commas)
  123. """
  124. email = EmailMessage('Subject', 'Content', 'from@example.com', ['"Firstname Sürname" <to@example.com>', 'other@example.com'])
  125. self.assertEqual(email.message()['To'], '=?utf-8?q?Firstname_S=C3=BCrname?= <to@example.com>, other@example.com')
  126. email = EmailMessage('Subject', 'Content', 'from@example.com', ['"Sürname, Firstname" <to@example.com>', 'other@example.com'])
  127. self.assertEqual(email.message()['To'], '=?utf-8?q?S=C3=BCrname=2C_Firstname?= <to@example.com>, other@example.com')
  128. def test_unicode_headers(self):
  129. email = EmailMessage(u"Gżegżółka", "Content", "from@example.com", ["to@example.com"],
  130. headers={"Sender": '"Firstname Sürname" <sender@example.com>',
  131. "Comments": 'My Sürname is non-ASCII'})
  132. message = email.message()
  133. self.assertEqual(message['Subject'], '=?utf-8?b?R8W8ZWfFvMOzxYJrYQ==?=')
  134. self.assertEqual(message['Sender'], '=?utf-8?q?Firstname_S=C3=BCrname?= <sender@example.com>')
  135. self.assertEqual(message['Comments'], '=?utf-8?q?My_S=C3=BCrname_is_non-ASCII?=')
  136. def test_safe_mime_multipart(self):
  137. """
  138. Make sure headers can be set with a different encoding than utf-8 in
  139. SafeMIMEMultipart as well
  140. """
  141. headers = {"Date": "Fri, 09 Nov 2001 01:08:47 -0000", "Message-ID": "foo"}
  142. subject, from_email, to = 'hello', 'from@example.com', '"Sürname, Firstname" <to@example.com>'
  143. text_content = 'This is an important message.'
  144. html_content = '<p>This is an <strong>important</strong> message.</p>'
  145. msg = EmailMultiAlternatives('Message from Firstname Sürname', text_content, from_email, [to], headers=headers)
  146. msg.attach_alternative(html_content, "text/html")
  147. msg.encoding = 'iso-8859-1'
  148. self.assertEqual(msg.message()['To'], '=?iso-8859-1?q?S=FCrname=2C_Firstname?= <to@example.com>')
  149. self.assertEqual(msg.message()['Subject'].encode(), u'=?iso-8859-1?q?Message_from_Firstname_S=FCrname?=')
  150. def test_encoding(self):
  151. """
  152. Regression for #12791 - Encode body correctly with other encodings
  153. than utf-8
  154. """
  155. email = EmailMessage('Subject', 'Firstname Sürname is a great guy.', 'from@example.com', ['other@example.com'])
  156. email.encoding = 'iso-8859-1'
  157. message = email.message()
  158. self.assertTrue(message.as_string().startswith('Content-Type: text/plain; charset="iso-8859-1"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: from@example.com\nTo: other@example.com'))
  159. self.assertEqual(message.get_payload(), 'Firstname S=FCrname is a great guy.')
  160. # Make sure MIME attachments also works correctly with other encodings than utf-8
  161. text_content = 'Firstname Sürname is a great guy.'
  162. html_content = '<p>Firstname Sürname is a <strong>great</strong> guy.</p>'
  163. msg = EmailMultiAlternatives('Subject', text_content, 'from@example.com', ['to@example.com'])
  164. msg.encoding = 'iso-8859-1'
  165. msg.attach_alternative(html_content, "text/html")
  166. self.assertEqual(msg.message().get_payload(0).as_string(), 'Content-Type: text/plain; charset="iso-8859-1"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\n\nFirstname S=FCrname is a great guy.')
  167. self.assertEqual(msg.message().get_payload(1).as_string(), 'Content-Type: text/html; charset="iso-8859-1"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\n\n<p>Firstname S=FCrname is a <strong>great</strong> guy.</p>')
  168. def test_attachments(self):
  169. """Regression test for #9367"""
  170. headers = {"Date": "Fri, 09 Nov 2001 01:08:47 -0000", "Message-ID": "foo"}
  171. subject, from_email, to = 'hello', 'from@example.com', 'to@example.com'
  172. text_content = 'This is an important message.'
  173. html_content = '<p>This is an <strong>important</strong> message.</p>'
  174. msg = EmailMultiAlternatives(subject, text_content, from_email, [to], headers=headers)
  175. msg.attach_alternative(html_content, "text/html")
  176. msg.attach("an attachment.pdf", "%PDF-1.4.%...", mimetype="application/pdf")
  177. msg_str = msg.message().as_string()
  178. message = email.message_from_string(msg_str)
  179. self.assertTrue(message.is_multipart())
  180. self.assertEqual(message.get_content_type(), 'multipart/mixed')
  181. self.assertEqual(message.get_default_type(), 'text/plain')
  182. payload = message.get_payload()
  183. self.assertEqual(payload[0].get_content_type(), 'multipart/alternative')
  184. self.assertEqual(payload[1].get_content_type(), 'application/pdf')
  185. def test_dummy_backend(self):
  186. """
  187. Make sure that dummy backends returns correct number of sent messages
  188. """
  189. connection = dummy.EmailBackend()
  190. email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})
  191. self.assertEqual(connection.send_messages([email, email, email]), 3)
  192. def test_arbitrary_keyword(self):
  193. """
  194. Make sure that get_connection() accepts arbitrary keyword that might be
  195. used with custom backends.
  196. """
  197. c = mail.get_connection(fail_silently=True, foo='bar')
  198. self.assertTrue(c.fail_silently)
  199. def test_custom_backend(self):
  200. """Test custom backend defined in this suite."""
  201. conn = mail.get_connection('regressiontests.mail.custombackend.EmailBackend')
  202. self.assertTrue(hasattr(conn, 'test_outbox'))
  203. email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})
  204. conn.send_messages([email])
  205. self.assertEqual(len(conn.test_outbox), 1)
  206. def test_backend_arg(self):
  207. """Test backend argument of mail.get_connection()"""
  208. self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.smtp.EmailBackend'), smtp.EmailBackend))
  209. self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.locmem.EmailBackend'), locmem.EmailBackend))
  210. self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.dummy.EmailBackend'), dummy.EmailBackend))
  211. self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.console.EmailBackend'), console.EmailBackend))
  212. tmp_dir = tempfile.mkdtemp()
  213. try:
  214. self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.filebased.EmailBackend', file_path=tmp_dir), filebased.EmailBackend))
  215. finally:
  216. shutil.rmtree(tmp_dir)
  217. self.assertTrue(isinstance(mail.get_connection(), locmem.EmailBackend))
  218. @with_django_settings(
  219. EMAIL_BACKEND='django.core.mail.backends.locmem.EmailBackend',
  220. ADMINS=[('nobody', 'nobody@example.com')],
  221. MANAGERS=[('nobody', 'nobody@example.com')])
  222. def test_connection_arg(self):
  223. """Test connection argument to send_mail(), et. al."""
  224. mail.outbox = []
  225. # Send using non-default connection
  226. connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend')
  227. send_mail('Subject', 'Content', 'from@example.com', ['to@example.com'], connection=connection)
  228. self.assertEqual(mail.outbox, [])
  229. self.assertEqual(len(connection.test_outbox), 1)
  230. self.assertEqual(connection.test_outbox[0].subject, 'Subject')
  231. connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend')
  232. send_mass_mail([
  233. ('Subject1', 'Content1', 'from1@example.com', ['to1@example.com']),
  234. ('Subject2', 'Content2', 'from2@example.com', ['to2@example.com']),
  235. ], connection=connection)
  236. self.assertEqual(mail.outbox, [])
  237. self.assertEqual(len(connection.test_outbox), 2)
  238. self.assertEqual(connection.test_outbox[0].subject, 'Subject1')
  239. self.assertEqual(connection.test_outbox[1].subject, 'Subject2')
  240. connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend')
  241. mail_admins('Admin message', 'Content', connection=connection)
  242. self.assertEqual(mail.outbox, [])
  243. self.assertEqual(len(connection.test_outbox), 1)
  244. self.assertEqual(connection.test_outbox[0].subject, '[Django] Admin message')
  245. connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend')
  246. mail_managers('Manager message', 'Content', connection=connection)
  247. self.assertEqual(mail.outbox, [])
  248. self.assertEqual(len(connection.test_outbox), 1)
  249. self.assertEqual(connection.test_outbox[0].subject, '[Django] Manager message')
  250. def test_dont_mangle_from_in_body(self):
  251. # Regression for #13433 - Make sure that EmailMessage doesn't mangle
  252. # 'From ' in message body.
  253. email = EmailMessage('Subject', 'From the future', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})
  254. self.assertFalse('>From the future' in email.message().as_string())
  255. class BaseEmailBackendTests(object):
  256. email_backend = None
  257. def setUp(self):
  258. self.__settings_state = alter_django_settings(EMAIL_BACKEND=self.email_backend)
  259. def tearDown(self):
  260. restore_django_settings(self.__settings_state)
  261. def assertStartsWith(self, first, second):
  262. if not first.startswith(second):
  263. self.longMessage = True
  264. self.assertEqual(first[:len(second)], second, "First string doesn't start with the second.")
  265. def get_mailbox_content(self):
  266. raise NotImplementedError
  267. def flush_mailbox(self):
  268. raise NotImplementedError
  269. def get_the_message(self):
  270. mailbox = self.get_mailbox_content()
  271. self.assertEqual(len(mailbox), 1,
  272. "Expected exactly one message, got %d.\n%r" % (len(mailbox), [
  273. m.as_string() for m in mailbox]))
  274. return mailbox[0]
  275. def test_send(self):
  276. email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com'])
  277. num_sent = mail.get_connection().send_messages([email])
  278. self.assertEqual(num_sent, 1)
  279. message = self.get_the_message()
  280. self.assertEqual(message["subject"], "Subject")
  281. self.assertEqual(message.get_payload(), "Content")
  282. self.assertEqual(message["from"], "from@example.com")
  283. self.assertEqual(message.get_all("to"), ["to@example.com"])
  284. def test_send_many(self):
  285. email1 = EmailMessage('Subject', 'Content1', 'from@example.com', ['to@example.com'])
  286. email2 = EmailMessage('Subject', 'Content2', 'from@example.com', ['to@example.com'])
  287. num_sent = mail.get_connection().send_messages([email1, email2])
  288. self.assertEqual(num_sent, 2)
  289. messages = self.get_mailbox_content()
  290. self.assertEqual(len(messages), 2)
  291. self.assertEqual(messages[0].get_payload(), "Content1")
  292. self.assertEqual(messages[1].get_payload(), "Content2")
  293. def test_send_verbose_name(self):
  294. email = EmailMessage("Subject", "Content", '"Firstname Sürname" <from@example.com>',
  295. ["to@example.com"])
  296. email.send()
  297. message = self.get_the_message()
  298. self.assertEqual(message["subject"], "Subject")
  299. self.assertEqual(message.get_payload(), "Content")
  300. self.assertEqual(message["from"], "=?utf-8?q?Firstname_S=C3=BCrname?= <from@example.com>")
  301. @with_django_settings(MANAGERS=[('nobody', 'nobody@example.com')])
  302. def test_html_mail_managers(self):
  303. """Test html_message argument to mail_managers"""
  304. mail_managers('Subject', 'Content', html_message='HTML Content')
  305. message = self.get_the_message()
  306. self.assertEqual(message.get('subject'), '[Django] Subject')
  307. self.assertEqual(message.get_all('to'), ['nobody@example.com'])
  308. self.assertTrue(message.is_multipart())
  309. self.assertEqual(len(message.get_payload()), 2)
  310. self.assertEqual(message.get_payload(0).get_payload(), 'Content')
  311. self.assertEqual(message.get_payload(0).get_content_type(), 'text/plain')
  312. self.assertEqual(message.get_payload(1).get_payload(), 'HTML Content')
  313. self.assertEqual(message.get_payload(1).get_content_type(), 'text/html')
  314. @with_django_settings(ADMINS=[('nobody', 'nobody@example.com')])
  315. def test_html_mail_admins(self):
  316. """Test html_message argument to mail_admins """
  317. mail_admins('Subject', 'Content', html_message='HTML Content')
  318. message = self.get_the_message()
  319. self.assertEqual(message.get('subject'), '[Django] Subject')
  320. self.assertEqual(message.get_all('to'), ['nobody@example.com'])
  321. self.assertTrue(message.is_multipart())
  322. self.assertEqual(len(message.get_payload()), 2)
  323. self.assertEqual(message.get_payload(0).get_payload(), 'Content')
  324. self.assertEqual(message.get_payload(0).get_content_type(), 'text/plain')
  325. self.assertEqual(message.get_payload(1).get_payload(), 'HTML Content')
  326. self.assertEqual(message.get_payload(1).get_content_type(), 'text/html')
  327. @with_django_settings(ADMINS=[('nobody', 'nobody+admin@example.com')],
  328. MANAGERS=[('nobody', 'nobody+manager@example.com')])
  329. def test_manager_and_admin_mail_prefix(self):
  330. """
  331. String prefix + lazy translated subject = bad output
  332. Regression for #13494
  333. """
  334. mail_managers(ugettext_lazy('Subject'), 'Content')
  335. message = self.get_the_message()
  336. self.assertEqual(message.get('subject'), '[Django] Subject')
  337. self.flush_mailbox()
  338. mail_admins(ugettext_lazy('Subject'), 'Content')
  339. message = self.get_the_message()
  340. self.assertEqual(message.get('subject'), '[Django] Subject')
  341. @with_django_settings(ADMINS=(), MANAGERS=())
  342. def test_empty_admins(self):
  343. """
  344. Test that mail_admins/mail_managers doesn't connect to the mail server
  345. if there are no recipients (#9383)
  346. """
  347. mail_admins('hi', 'there')
  348. self.assertEqual(self.get_mailbox_content(), [])
  349. mail_managers('hi', 'there')
  350. self.assertEqual(self.get_mailbox_content(), [])
  351. def test_message_cc_header(self):
  352. """
  353. Regression test for #7722
  354. """
  355. email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com'], cc=['cc@example.com'])
  356. mail.get_connection().send_messages([email])
  357. message = self.get_the_message()
  358. self.assertStartsWith(message.as_string(), 'Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: from@example.com\nTo: to@example.com\nCc: cc@example.com\nDate: ')
  359. def test_idn_send(self):
  360. """
  361. Regression test for #14301
  362. """
  363. self.assertTrue(send_mail('Subject', 'Content', 'from@öäü.com', [u'to@öäü.com']))
  364. message = self.get_the_message()
  365. self.assertEqual(message.get('subject'), 'Subject')
  366. self.assertEqual(message.get('from'), 'from@xn--4ca9at.com')
  367. self.assertEqual(message.get('to'), 'to@xn--4ca9at.com')
  368. self.flush_mailbox()
  369. m = EmailMessage('Subject', 'Content', 'from@öäü.com',
  370. [u'to@öäü.com'], cc=[u'cc@öäü.com'])
  371. m.send()
  372. message = self.get_the_message()
  373. self.assertEqual(message.get('subject'), 'Subject')
  374. self.assertEqual(message.get('from'), 'from@xn--4ca9at.com')
  375. self.assertEqual(message.get('to'), 'to@xn--4ca9at.com')
  376. self.assertEqual(message.get('cc'), 'cc@xn--4ca9at.com')
  377. def test_recipient_without_domain(self):
  378. """
  379. Regression test for #15042
  380. """
  381. self.assertTrue(send_mail("Subject", "Content", "tester", ["django"]))
  382. message = self.get_the_message()
  383. self.assertEqual(message.get('subject'), 'Subject')
  384. self.assertEqual(message.get('from'), "tester")
  385. self.assertEqual(message.get('to'), "django")
  386. class LocmemBackendTests(BaseEmailBackendTests, TestCase):
  387. email_backend = 'django.core.mail.backends.locmem.EmailBackend'
  388. def get_mailbox_content(self):
  389. return [m.message() for m in mail.outbox]
  390. def flush_mailbox(self):
  391. mail.outbox = []
  392. def tearDown(self):
  393. super(LocmemBackendTests, self).tearDown()
  394. mail.outbox = []
  395. def test_locmem_shared_messages(self):
  396. """
  397. Make sure that the locmen backend populates the outbox.
  398. """
  399. connection = locmem.EmailBackend()
  400. connection2 = locmem.EmailBackend()
  401. email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})
  402. connection.send_messages([email])
  403. connection2.send_messages([email])
  404. self.assertEqual(len(mail.outbox), 2)
  405. class FileBackendTests(BaseEmailBackendTests, TestCase):
  406. email_backend = 'django.core.mail.backends.filebased.EmailBackend'
  407. def setUp(self):
  408. super(FileBackendTests, self).setUp()
  409. self.tmp_dir = tempfile.mkdtemp()
  410. self.__settings_state = alter_django_settings(EMAIL_FILE_PATH=self.tmp_dir)
  411. def tearDown(self):
  412. restore_django_settings(self.__settings_state)
  413. shutil.rmtree(self.tmp_dir)
  414. super(FileBackendTests, self).tearDown()
  415. def flush_mailbox(self):
  416. for filename in os.listdir(self.tmp_dir):
  417. os.unlink(os.path.join(self.tmp_dir, filename))
  418. def get_mailbox_content(self):
  419. messages = []
  420. for filename in os.listdir(self.tmp_dir):
  421. session = open(os.path.join(self.tmp_dir, filename)).read().split('\n' + ('-' * 79) + '\n')
  422. messages.extend(email.message_from_string(m) for m in session if m)
  423. return messages
  424. def test_file_sessions(self):
  425. """Make sure opening a connection creates a new file"""
  426. msg = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})
  427. connection = mail.get_connection()
  428. connection.send_messages([msg])
  429. self.assertEqual(len(os.listdir(self.tmp_dir)), 1)
  430. message = email.message_from_file(open(os.path.join(self.tmp_dir, os.listdir(self.tmp_dir)[0])))
  431. self.assertEqual(message.get_content_type(), 'text/plain')
  432. self.assertEqual(message.get('subject'), 'Subject')
  433. self.assertEqual(message.get('from'), 'from@example.com')
  434. self.assertEqual(message.get('to'), 'to@example.com')
  435. connection2 = mail.get_connection()
  436. connection2.send_messages([msg])
  437. self.assertEqual(len(os.listdir(self.tmp_dir)), 2)
  438. connection.send_messages([msg])
  439. self.assertEqual(len(os.listdir(self.tmp_dir)), 2)
  440. msg.connection = mail.get_connection()
  441. self.assertTrue(connection.open())
  442. msg.send()
  443. self.assertEqual(len(os.listdir(self.tmp_dir)), 3)
  444. msg.send()
  445. self.assertEqual(len(os.listdir(self.tmp_dir)), 3)
  446. class ConsoleBackendTests(BaseEmailBackendTests, TestCase):
  447. email_backend = 'django.core.mail.backends.console.EmailBackend'
  448. def setUp(self):
  449. super(ConsoleBackendTests, self).setUp()
  450. self.__stdout = sys.stdout
  451. self.stream = sys.stdout = StringIO()
  452. def tearDown(self):
  453. del self.stream
  454. sys.stdout = self.__stdout
  455. del self.__stdout
  456. super(ConsoleBackendTests, self).tearDown()
  457. def flush_mailbox(self):
  458. self.stream = sys.stdout = StringIO()
  459. def get_mailbox_content(self):
  460. messages = self.stream.getvalue().split('\n' + ('-' * 79) + '\n')
  461. return [email.message_from_string(m) for m in messages if m]
  462. def test_console_stream_kwarg(self):
  463. """
  464. Test that the console backend can be pointed at an arbitrary stream.
  465. """
  466. s = StringIO()
  467. connection = mail.get_connection('django.core.mail.backends.console.EmailBackend', stream=s)
  468. send_mail('Subject', 'Content', 'from@example.com', ['to@example.com'], connection=connection)
  469. self.assertTrue(s.getvalue().startswith('Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: from@example.com\nTo: to@example.com\nDate: '))
  470. class FakeSMTPServer(smtpd.SMTPServer, threading.Thread):
  471. """
  472. Asyncore SMTP server wrapped into a thread. Based on DummyFTPServer from:
  473. http://svn.python.org/view/python/branches/py3k/Lib/test/test_ftplib.py?revision=86061&view=markup
  474. """
  475. def __init__(self, *args, **kwargs):
  476. threading.Thread.__init__(self)
  477. smtpd.SMTPServer.__init__(self, *args, **kwargs)
  478. self._sink = []
  479. self.active = False
  480. self.active_lock = threading.Lock()
  481. self.sink_lock = threading.Lock()
  482. def process_message(self, peer, mailfrom, rcpttos, data):
  483. m = email.message_from_string(data)
  484. maddr = email.Utils.parseaddr(m.get('from'))[1]
  485. if mailfrom != maddr:
  486. return "553 '%s' != '%s'" % (mailfrom, maddr)
  487. self.sink_lock.acquire()
  488. self._sink.append(m)
  489. self.sink_lock.release()
  490. def get_sink(self):
  491. self.sink_lock.acquire()
  492. try:
  493. return self._sink[:]
  494. finally:
  495. self.sink_lock.release()
  496. def flush_sink(self):
  497. self.sink_lock.acquire()
  498. self._sink[:] = []
  499. self.sink_lock.release()
  500. def start(self):
  501. assert not self.active
  502. self.__flag = threading.Event()
  503. threading.Thread.start(self)
  504. self.__flag.wait()
  505. def run(self):
  506. self.active = True
  507. self.__flag.set()
  508. while self.active and asyncore.socket_map:
  509. self.active_lock.acquire()
  510. asyncore.loop(timeout=0.1, count=1)
  511. self.active_lock.release()
  512. asyncore.close_all()
  513. def stop(self):
  514. assert self.active
  515. self.active = False
  516. self.join()
  517. class SMTPBackendTests(BaseEmailBackendTests, TestCase):
  518. email_backend = 'django.core.mail.backends.smtp.EmailBackend'
  519. @classmethod
  520. def setUpClass(cls):
  521. cls.server = FakeSMTPServer(('127.0.0.1', 0), None)
  522. cls.settings = alter_django_settings(
  523. EMAIL_HOST="127.0.0.1",
  524. EMAIL_PORT=cls.server.socket.getsockname()[1])
  525. cls.server.start()
  526. @classmethod
  527. def tearDownClass(cls):
  528. cls.server.stop()
  529. def setUp(self):
  530. super(SMTPBackendTests, self).setUp()
  531. self.server.flush_sink()
  532. def tearDown(self):
  533. self.server.flush_sink()
  534. super(SMTPBackendTests, self).tearDown()
  535. def flush_mailbox(self):
  536. self.server.flush_sink()
  537. def get_mailbox_content(self):
  538. return self.server.get_sink()