tests.py 3.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. from django.core.handlers.wsgi import WSGIHandler
  2. from django.core.signals import request_started, request_finished
  3. from django.db import close_old_connections, connection
  4. from django.test import RequestFactory, TestCase, TransactionTestCase
  5. from django.test.utils import override_settings
  6. class HandlerTests(TestCase):
  7. def setUp(self):
  8. request_started.disconnect(close_old_connections)
  9. def tearDown(self):
  10. request_started.connect(close_old_connections)
  11. # Mangle settings so the handler will fail
  12. @override_settings(MIDDLEWARE_CLASSES=42)
  13. def test_lock_safety(self):
  14. """
  15. Tests for bug #11193 (errors inside middleware shouldn't leave
  16. the initLock locked).
  17. """
  18. # Try running the handler, it will fail in load_middleware
  19. handler = WSGIHandler()
  20. self.assertEqual(handler.initLock.locked(), False)
  21. with self.assertRaises(Exception):
  22. handler(None, None)
  23. self.assertEqual(handler.initLock.locked(), False)
  24. def test_bad_path_info(self):
  25. """Tests for bug #15672 ('request' referenced before assignment)"""
  26. environ = RequestFactory().get('/').environ
  27. environ['PATH_INFO'] = '\xed'
  28. handler = WSGIHandler()
  29. response = handler(environ, lambda *a, **k: None)
  30. self.assertEqual(response.status_code, 400)
  31. class TransactionsPerRequestTests(TransactionTestCase):
  32. urls = 'handlers.urls'
  33. def test_no_transaction(self):
  34. response = self.client.get('/in_transaction/')
  35. self.assertContains(response, 'False')
  36. def test_auto_transaction(self):
  37. old_atomic_requests = connection.settings_dict['ATOMIC_REQUESTS']
  38. try:
  39. connection.settings_dict['ATOMIC_REQUESTS'] = True
  40. response = self.client.get('/in_transaction/')
  41. finally:
  42. connection.settings_dict['ATOMIC_REQUESTS'] = old_atomic_requests
  43. self.assertContains(response, 'True')
  44. def test_no_auto_transaction(self):
  45. old_atomic_requests = connection.settings_dict['ATOMIC_REQUESTS']
  46. try:
  47. connection.settings_dict['ATOMIC_REQUESTS'] = True
  48. response = self.client.get('/not_in_transaction/')
  49. finally:
  50. connection.settings_dict['ATOMIC_REQUESTS'] = old_atomic_requests
  51. self.assertContains(response, 'False')
  52. class SignalsTests(TestCase):
  53. urls = 'handlers.urls'
  54. def setUp(self):
  55. self.signals = []
  56. request_started.connect(self.register_started)
  57. request_finished.connect(self.register_finished)
  58. def tearDown(self):
  59. request_started.disconnect(self.register_started)
  60. request_finished.disconnect(self.register_finished)
  61. def register_started(self, **kwargs):
  62. self.signals.append('started')
  63. def register_finished(self, **kwargs):
  64. self.signals.append('finished')
  65. def test_request_signals(self):
  66. response = self.client.get('/regular/')
  67. self.assertEqual(self.signals, ['started', 'finished'])
  68. self.assertEqual(response.content, b"regular content")
  69. def test_request_signals_streaming_response(self):
  70. response = self.client.get('/streaming/')
  71. self.assertEqual(self.signals, ['started'])
  72. self.assertEqual(b''.join(response.streaming_content), b"streaming content")
  73. self.assertEqual(self.signals, ['started', 'finished'])