test_utils.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. """Tests for django.db.backends.utils"""
  2. from decimal import Decimal, Rounded
  3. from django.db import NotSupportedError, connection
  4. from django.db.backends.utils import (
  5. format_number, split_identifier, split_tzname_delta, truncate_name,
  6. )
  7. from django.test import (
  8. SimpleTestCase, TransactionTestCase, skipIfDBFeature, skipUnlessDBFeature,
  9. )
  10. class TestUtils(SimpleTestCase):
  11. def test_truncate_name(self):
  12. self.assertEqual(truncate_name('some_table', 10), 'some_table')
  13. self.assertEqual(truncate_name('some_long_table', 10), 'some_la38a')
  14. self.assertEqual(truncate_name('some_long_table', 10, 3), 'some_loa38')
  15. self.assertEqual(truncate_name('some_long_table'), 'some_long_table')
  16. # "user"."table" syntax
  17. self.assertEqual(truncate_name('username"."some_table', 10), 'username"."some_table')
  18. self.assertEqual(truncate_name('username"."some_long_table', 10), 'username"."some_la38a')
  19. self.assertEqual(truncate_name('username"."some_long_table', 10, 3), 'username"."some_loa38')
  20. def test_split_identifier(self):
  21. self.assertEqual(split_identifier('some_table'), ('', 'some_table'))
  22. self.assertEqual(split_identifier('"some_table"'), ('', 'some_table'))
  23. self.assertEqual(split_identifier('namespace"."some_table'), ('namespace', 'some_table'))
  24. self.assertEqual(split_identifier('"namespace"."some_table"'), ('namespace', 'some_table'))
  25. def test_format_number(self):
  26. def equal(value, max_d, places, result):
  27. self.assertEqual(format_number(Decimal(value), max_d, places), result)
  28. equal('0', 12, 3, '0.000')
  29. equal('0', 12, 8, '0.00000000')
  30. equal('1', 12, 9, '1.000000000')
  31. equal('0.00000000', 12, 8, '0.00000000')
  32. equal('0.000000004', 12, 8, '0.00000000')
  33. equal('0.000000008', 12, 8, '0.00000001')
  34. equal('0.000000000000000000999', 10, 8, '0.00000000')
  35. equal('0.1234567890', 12, 10, '0.1234567890')
  36. equal('0.1234567890', 12, 9, '0.123456789')
  37. equal('0.1234567890', 12, 8, '0.12345679')
  38. equal('0.1234567890', 12, 5, '0.12346')
  39. equal('0.1234567890', 12, 3, '0.123')
  40. equal('0.1234567890', 12, 1, '0.1')
  41. equal('0.1234567890', 12, 0, '0')
  42. equal('0.1234567890', None, 0, '0')
  43. equal('1234567890.1234567890', None, 0, '1234567890')
  44. equal('1234567890.1234567890', None, 2, '1234567890.12')
  45. equal('0.1234', 5, None, '0.1234')
  46. equal('123.12', 5, None, '123.12')
  47. with self.assertRaises(Rounded):
  48. equal('0.1234567890', 5, None, '0.12346')
  49. with self.assertRaises(Rounded):
  50. equal('1234567890.1234', 5, None, '1234600000')
  51. def test_split_tzname_delta(self):
  52. tests = [
  53. ('Asia/Ust+Nera', ('Asia/Ust+Nera', None, None)),
  54. ('Asia/Ust-Nera', ('Asia/Ust-Nera', None, None)),
  55. ('Asia/Ust+Nera-02:00', ('Asia/Ust+Nera', '-', '02:00')),
  56. ('Asia/Ust-Nera+05:00', ('Asia/Ust-Nera', '+', '05:00')),
  57. ('America/Coral_Harbour-01:00', ('America/Coral_Harbour', '-', '01:00')),
  58. ('America/Coral_Harbour+02:30', ('America/Coral_Harbour', '+', '02:30')),
  59. ('UTC+15:00', ('UTC', '+', '15:00')),
  60. ('UTC-04:43', ('UTC', '-', '04:43')),
  61. ('UTC', ('UTC', None, None)),
  62. ('UTC+1', ('UTC+1', None, None)),
  63. ]
  64. for tzname, expected in tests:
  65. with self.subTest(tzname=tzname):
  66. self.assertEqual(split_tzname_delta(tzname), expected)
  67. class CursorWrapperTests(TransactionTestCase):
  68. available_apps = []
  69. def _test_procedure(self, procedure_sql, params, param_types, kparams=None):
  70. with connection.cursor() as cursor:
  71. cursor.execute(procedure_sql)
  72. # Use a new cursor because in MySQL a procedure can't be used in the
  73. # same cursor in which it was created.
  74. with connection.cursor() as cursor:
  75. cursor.callproc('test_procedure', params, kparams)
  76. with connection.schema_editor() as editor:
  77. editor.remove_procedure('test_procedure', param_types)
  78. @skipUnlessDBFeature('create_test_procedure_without_params_sql')
  79. def test_callproc_without_params(self):
  80. self._test_procedure(connection.features.create_test_procedure_without_params_sql, [], [])
  81. @skipUnlessDBFeature('create_test_procedure_with_int_param_sql')
  82. def test_callproc_with_int_params(self):
  83. self._test_procedure(connection.features.create_test_procedure_with_int_param_sql, [1], ['INTEGER'])
  84. @skipUnlessDBFeature('create_test_procedure_with_int_param_sql', 'supports_callproc_kwargs')
  85. def test_callproc_kparams(self):
  86. self._test_procedure(connection.features.create_test_procedure_with_int_param_sql, [], ['INTEGER'], {'P_I': 1})
  87. @skipIfDBFeature('supports_callproc_kwargs')
  88. def test_unsupported_callproc_kparams_raises_error(self):
  89. msg = 'Keyword parameters for callproc are not supported on this database backend.'
  90. with self.assertRaisesMessage(NotSupportedError, msg):
  91. with connection.cursor() as cursor:
  92. cursor.callproc('test_procedure', [], {'P_I': 1})