tests.py 14 KB


  1. # -*- coding: utf-8 -*-
  2. # Unit and doctests for specific database backends.
  3. import datetime
  4. from django.conf import settings
  5. from django.core.management.color import no_style
  6. from django.db import backend, connection, connections, DEFAULT_DB_ALIAS, IntegrityError
  7. from django.db.backends.signals import connection_created
  8. from django.db.backends.postgresql_psycopg2 import version as pg_version
  9. from django.test import TestCase, skipUnlessDBFeature, TransactionTestCase
  10. from django.utils import unittest
  11. from regressiontests.backends import models
  12. class OracleChecks(unittest.TestCase):
  13. @unittest.skipUnless(connection.vendor == 'oracle',
  14. "No need to check Oracle cursor semantics")
  15. def test_dbms_session(self):
  16. # If the backend is Oracle, test that we can call a standard
  17. # stored procedure through our cursor wrapper.
  18. convert_unicode = backend.convert_unicode
  19. cursor = connection.cursor()
  20. cursor.callproc(convert_unicode('DBMS_SESSION.SET_IDENTIFIER'),
  21. [convert_unicode('_django_testing!'),])
  22. @unittest.skipUnless(connection.vendor == 'oracle',
  23. "No need to check Oracle cursor semantics")
  24. def test_cursor_var(self):
  25. # If the backend is Oracle, test that we can pass cursor variables
  26. # as query parameters.
  27. cursor = connection.cursor()
  28. var = cursor.var(backend.Database.STRING)
  29. cursor.execute("BEGIN %s := 'X'; END; ", [var])
  30. self.assertEqual(var.getvalue(), 'X')
  31. @unittest.skipUnless(connection.vendor == 'oracle',
  32. "No need to check Oracle cursor semantics")
  33. def test_long_string(self):
  34. # If the backend is Oracle, test that we can save a text longer
  35. # than 4000 chars and read it properly
  36. c = connection.cursor()
  37. c.execute('CREATE TABLE ltext ("TEXT" NCLOB)')
  38. long_str = ''.join([unicode(x) for x in xrange(4000)])
  39. c.execute('INSERT INTO ltext VALUES (%s)',[long_str])
  40. c.execute('SELECT text FROM ltext')
  41. row = c.fetchone()
  42. self.assertEqual(long_str, row[0].read())
  43. c.execute('DROP TABLE ltext')
  44. @unittest.skipUnless(connection.vendor == 'oracle',
  45. "No need to check Oracle connection semantics")
  46. def test_client_encoding(self):
  47. # If the backend is Oracle, test that the client encoding is set
  48. # correctly. This was broken under Cygwin prior to r14781.
  49. c = connection.cursor() # Ensure the connection is initialized.
  50. self.assertEqual(connection.connection.encoding, "UTF-8")
  51. self.assertEqual(connection.connection.nencoding, "UTF-8")
  52. class DateQuotingTest(TestCase):
  53. def test_django_date_trunc(self):
  54. """
  55. Test the custom ``django_date_trunc method``, in particular against
  56. fields which clash with strings passed to it (e.g. 'year') - see
  57. #12818__.
  58. __: http://code.djangoproject.com/ticket/12818
  59. """
  60. updated = datetime.datetime(2010, 2, 20)
  61. models.SchoolClass.objects.create(year=2009, last_updated=updated)
  62. years = models.SchoolClass.objects.dates('last_updated', 'year')
  63. self.assertEqual(list(years), [datetime.datetime(2010, 1, 1, 0, 0)])
  64. def test_django_extract(self):
  65. """
  66. Test the custom ``django_extract method``, in particular against fields
  67. which clash with strings passed to it (e.g. 'day') - see #12818__.
  68. __: http://code.djangoproject.com/ticket/12818
  69. """
  70. updated = datetime.datetime(2010, 2, 20)
  71. models.SchoolClass.objects.create(year=2009, last_updated=updated)
  72. classes = models.SchoolClass.objects.filter(last_updated__day=20)
  73. self.assertEqual(len(classes), 1)
  74. class LastExecutedQueryTest(TestCase):
  75. def setUp(self):
  76. # connection.queries will not be filled in without this
  77. settings.DEBUG = True
  78. def tearDown(self):
  79. settings.DEBUG = False
  80. # There are no tests for the sqlite backend because it does not
  81. # implement paramater escaping. See #14091.
  82. @unittest.skipUnless(connection.vendor in ('oracle', 'postgresql'),
  83. "These backends use the standard parameter escaping rules")
  84. def test_parameter_escaping(self):
  85. # check that both numbers and string are properly quoted
  86. list(models.Tag.objects.filter(name="special:\\\"':", object_id=12))
  87. sql = connection.queries[-1]['sql']
  88. self.assertTrue("= 'special:\\\"'':' " in sql)
  89. self.assertTrue("= 12 " in sql)
  90. @unittest.skipUnless(connection.vendor == 'mysql',
  91. "MySQL uses backslashes to escape parameters.")
  92. def test_parameter_escaping(self):
  93. list(models.Tag.objects.filter(name="special:\\\"':", object_id=12))
  94. sql = connection.queries[-1]['sql']
  95. # only this line is different from the test above
  96. self.assertTrue("= 'special:\\\\\\\"\\':' " in sql)
  97. self.assertTrue("= 12 " in sql)
  98. class ParameterHandlingTest(TestCase):
  99. def test_bad_parameter_count(self):
  100. "An executemany call with too many/not enough parameters will raise an exception (Refs #12612)"
  101. cursor = connection.cursor()
  102. query = ('INSERT INTO %s (%s, %s) VALUES (%%s, %%s)' % (
  103. connection.introspection.table_name_converter('backends_square'),
  104. connection.ops.quote_name('root'),
  105. connection.ops.quote_name('square')
  106. ))
  107. self.assertRaises(Exception, cursor.executemany, query, [(1,2,3),])
  108. self.assertRaises(Exception, cursor.executemany, query, [(1,),])
  109. # Unfortunately, the following tests would be a good test to run on all
  110. # backends, but it breaks MySQL hard. Until #13711 is fixed, it can't be run
  111. # everywhere (although it would be an effective test of #13711).
  112. class LongNameTest(TestCase):
  113. """Long primary keys and model names can result in a sequence name
  114. that exceeds the database limits, which will result in truncation
  115. on certain databases (e.g., Postgres). The backend needs to use
  116. the correct sequence name in last_insert_id and other places, so
  117. check it is. Refs #8901.
  118. """
  119. @skipUnlessDBFeature('supports_long_model_names')
  120. def test_sequence_name_length_limits_create(self):
  121. """Test creation of model with long name and long pk name doesn't error. Ref #8901"""
  122. models.VeryLongModelNameZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ.objects.create()
  123. @skipUnlessDBFeature('supports_long_model_names')
  124. def test_sequence_name_length_limits_m2m(self):
  125. """Test an m2m save of a model with a long name and a long m2m field name doesn't error as on Django >=1.2 this now uses object saves. Ref #8901"""
  126. obj = models.VeryLongModelNameZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ.objects.create()
  127. rel_obj = models.Person.objects.create(first_name='Django', last_name='Reinhardt')
  128. obj.m2m_also_quite_long_zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz.add(rel_obj)
  129. @skipUnlessDBFeature('supports_long_model_names')
  130. def test_sequence_name_length_limits_flush(self):
  131. """Test that sequence resetting as part of a flush with model with long name and long pk name doesn't error. Ref #8901"""
  132. # A full flush is expensive to the full test, so we dig into the
  133. # internals to generate the likely offending SQL and run it manually
  134. # Some convenience aliases
  135. VLM = models.VeryLongModelNameZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ
  136. VLM_m2m = VLM.m2m_also_quite_long_zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz.through
  137. tables = [
  138. VLM._meta.db_table,
  139. VLM_m2m._meta.db_table,
  140. ]
  141. sequences = [
  142. {
  143. 'column': VLM._meta.pk.column,
  144. 'table': VLM._meta.db_table
  145. },
  146. ]
  147. cursor = connection.cursor()
  148. for statement in connection.ops.sql_flush(no_style(), tables, sequences):
  149. cursor.execute(statement)
  150. class SequenceResetTest(TestCase):
  151. def test_generic_relation(self):
  152. "Sequence names are correct when resetting generic relations (Ref #13941)"
  153. # Create an object with a manually specified PK
  154. models.Post.objects.create(id=10, name='1st post', text='hello world')
  155. # Reset the sequences for the database
  156. cursor = connection.cursor()
  157. commands = connections[DEFAULT_DB_ALIAS].ops.sequence_reset_sql(no_style(), [models.Post])
  158. for sql in commands:
  159. cursor.execute(sql)
  160. # If we create a new object now, it should have a PK greater
  161. # than the PK we specified manually.
  162. obj = models.Post.objects.create(name='New post', text='goodbye world')
  163. self.assertTrue(obj.pk > 10)
  164. class PostgresVersionTest(TestCase):
  165. def assert_parses(self, version_string, version):
  166. self.assertEqual(pg_version._parse_version(version_string), version)
  167. def test_parsing(self):
  168. self.assert_parses("PostgreSQL 8.3 beta4", (8, 3, None))
  169. self.assert_parses("PostgreSQL 8.3", (8, 3, None))
  170. self.assert_parses("EnterpriseDB 8.3", (8, 3, None))
  171. self.assert_parses("PostgreSQL 8.3.6", (8, 3, 6))
  172. self.assert_parses("PostgreSQL 8.4beta1", (8, 4, None))
  173. self.assert_parses("PostgreSQL 8.3.1 on i386-apple-darwin9.2.2, compiled by GCC i686-apple-darwin9-gcc-4.0.1 (GCC) 4.0.1 (Apple Inc. build 5478)", (8, 3, 1))
  174. # Unfortunately with sqlite3 the in-memory test database cannot be
  175. # closed, and so it cannot be re-opened during testing, and so we
  176. # sadly disable this test for now.
  177. class ConnectionCreatedSignalTest(TestCase):
  178. @skipUnlessDBFeature('test_db_allows_multiple_connections')
  179. def test_signal(self):
  180. data = {}
  181. def receiver(sender, connection, **kwargs):
  182. data["connection"] = connection
  183. connection_created.connect(receiver)
  184. connection.close()
  185. cursor = connection.cursor()
  186. self.assertTrue(data["connection"] is connection)
  187. connection_created.disconnect(receiver)
  188. data.clear()
  189. cursor = connection.cursor()
  190. self.assertTrue(data == {})
  191. class BackendTestCase(TestCase):
  192. def test_cursor_executemany(self):
  193. #4896: Test cursor.executemany
  194. cursor = connection.cursor()
  195. qn = connection.ops.quote_name
  196. opts = models.Square._meta
  197. f1, f2 = opts.get_field('root'), opts.get_field('square')
  198. query = ('INSERT INTO %s (%s, %s) VALUES (%%s, %%s)'
  199. % (connection.introspection.table_name_converter(opts.db_table), qn(f1.column), qn(f2.column)))
  200. cursor.executemany(query, [(i, i**2) for i in range(-5, 6)])
  201. self.assertEqual(models.Square.objects.count(), 11)
  202. for i in range(-5, 6):
  203. square = models.Square.objects.get(root=i)
  204. self.assertEqual(square.square, i**2)
  205. #4765: executemany with params=[] does nothing
  206. cursor.executemany(query, [])
  207. self.assertEqual(models.Square.objects.count(), 11)
  208. def test_unicode_fetches(self):
  209. #6254: fetchone, fetchmany, fetchall return strings as unicode objects
  210. qn = connection.ops.quote_name
  211. models.Person(first_name="John", last_name="Doe").save()
  212. models.Person(first_name="Jane", last_name="Doe").save()
  213. models.Person(first_name="Mary", last_name="Agnelline").save()
  214. models.Person(first_name="Peter", last_name="Parker").save()
  215. models.Person(first_name="Clark", last_name="Kent").save()
  216. opts2 = models.Person._meta
  217. f3, f4 = opts2.get_field('first_name'), opts2.get_field('last_name')
  218. query2 = ('SELECT %s, %s FROM %s ORDER BY %s'
  219. % (qn(f3.column), qn(f4.column), connection.introspection.table_name_converter(opts2.db_table),
  220. qn(f3.column)))
  221. cursor = connection.cursor()
  222. cursor.execute(query2)
  223. self.assertEqual(cursor.fetchone(), (u'Clark', u'Kent'))
  224. self.assertEqual(list(cursor.fetchmany(2)), [(u'Jane', u'Doe'), (u'John', u'Doe')])
  225. self.assertEqual(list(cursor.fetchall()), [(u'Mary', u'Agnelline'), (u'Peter', u'Parker')])
  226. def test_database_operations_helper_class(self):
  227. # Ticket #13630
  228. self.assertTrue(hasattr(connection, 'ops'))
  229. self.assertTrue(hasattr(connection.ops, 'connection'))
  230. self.assertEqual(connection, connection.ops.connection)
  231. # We don't make these tests conditional because that means we would need to
  232. # check and differentiate between:
  233. # * MySQL+InnoDB, MySQL+MYISAM (something we currently can't do).
  234. # * if sqlite3 (if/once we get #14204 fixed) has referential integrity turned
  235. # on or not, something that would be controlled by runtime support and user
  236. # preference.
  237. # verify if its type is django.database.db.IntegrityError.
  238. class FkConstraintsTests(TransactionTestCase):
  239. def setUp(self):
  240. # Create a Reporter.
  241. self.r = models.Reporter.objects.create(first_name='John', last_name='Smith')
  242. def test_integrity_checks_on_creation(self):
  243. """
  244. Try to create a model instance that violates a FK constraint. If it
  245. fails it should fail with IntegrityError.
  246. """
  247. a = models.Article(headline="This is a test", pub_date=datetime.datetime(2005, 7, 27), reporter_id=30)
  248. try:
  249. a.save()
  250. except IntegrityError:
  251. pass
  252. def test_integrity_checks_on_update(self):
  253. """
  254. Try to update a model instance introducing a FK constraint violation.
  255. If it fails it should fail with IntegrityError.
  256. """
  257. # Create an Article.
  258. models.Article.objects.create(headline="Test article", pub_date=datetime.datetime(2010, 9, 4), reporter=self.r)
  259. # Retrive it from the DB
  260. a = models.Article.objects.get(headline="Test article")
  261. a.reporter_id = 30
  262. try:
  263. a.save()
  264. except IntegrityError:
  265. pass