test_utils.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. """Tests for django.db.backends.utils"""
  2. from decimal import Decimal, Rounded
  3. from django.db import NotSupportedError, connection
  4. from django.db.backends.utils import (
  5. format_number,
  6. split_identifier,
  7. split_tzname_delta,
  8. truncate_name,
  9. )
  10. from django.test import (
  11. SimpleTestCase,
  12. TransactionTestCase,
  13. skipIfDBFeature,
  14. skipUnlessDBFeature,
  15. )
  16. class TestUtils(SimpleTestCase):
  17. def test_truncate_name(self):
  18. self.assertEqual(truncate_name("some_table", 10), "some_table")
  19. self.assertEqual(truncate_name("some_long_table", 10), "some_la38a")
  20. self.assertEqual(truncate_name("some_long_table", 10, 3), "some_loa38")
  21. self.assertEqual(truncate_name("some_long_table"), "some_long_table")
  22. # "user"."table" syntax
  23. self.assertEqual(
  24. truncate_name('username"."some_table', 10), 'username"."some_table'
  25. )
  26. self.assertEqual(
  27. truncate_name('username"."some_long_table', 10), 'username"."some_la38a'
  28. )
  29. self.assertEqual(
  30. truncate_name('username"."some_long_table', 10, 3), 'username"."some_loa38'
  31. )
  32. def test_split_identifier(self):
  33. self.assertEqual(split_identifier("some_table"), ("", "some_table"))
  34. self.assertEqual(split_identifier('"some_table"'), ("", "some_table"))
  35. self.assertEqual(
  36. split_identifier('namespace"."some_table'), ("namespace", "some_table")
  37. )
  38. self.assertEqual(
  39. split_identifier('"namespace"."some_table"'), ("namespace", "some_table")
  40. )
  41. def test_format_number(self):
  42. def equal(value, max_d, places, result):
  43. self.assertEqual(format_number(Decimal(value), max_d, places), result)
  44. equal("0", 12, 3, "0.000")
  45. equal("0", 12, 8, "0.00000000")
  46. equal("1", 12, 9, "1.000000000")
  47. equal("0.00000000", 12, 8, "0.00000000")
  48. equal("0.000000004", 12, 8, "0.00000000")
  49. equal("0.000000008", 12, 8, "0.00000001")
  50. equal("0.000000000000000000999", 10, 8, "0.00000000")
  51. equal("0.1234567890", 12, 10, "0.1234567890")
  52. equal("0.1234567890", 12, 9, "0.123456789")
  53. equal("0.1234567890", 12, 8, "0.12345679")
  54. equal("0.1234567890", 12, 5, "0.12346")
  55. equal("0.1234567890", 12, 3, "0.123")
  56. equal("0.1234567890", 12, 1, "0.1")
  57. equal("0.1234567890", 12, 0, "0")
  58. equal("0.1234567890", None, 0, "0")
  59. equal("1234567890.1234567890", None, 0, "1234567890")
  60. equal("1234567890.1234567890", None, 2, "1234567890.12")
  61. equal("0.1234", 5, None, "0.1234")
  62. equal("123.12", 5, None, "123.12")
  63. with self.assertRaises(Rounded):
  64. equal("0.1234567890", 5, None, "0.12346")
  65. with self.assertRaises(Rounded):
  66. equal("1234567890.1234", 5, None, "1234600000")
  67. def test_split_tzname_delta(self):
  68. tests = [
  69. ("Asia/Ust+Nera", ("Asia/Ust+Nera", None, None)),
  70. ("Asia/Ust-Nera", ("Asia/Ust-Nera", None, None)),
  71. ("Asia/Ust+Nera-02:00", ("Asia/Ust+Nera", "-", "02:00")),
  72. ("Asia/Ust-Nera+05:00", ("Asia/Ust-Nera", "+", "05:00")),
  73. ("America/Coral_Harbour-01:00", ("America/Coral_Harbour", "-", "01:00")),
  74. ("America/Coral_Harbour+02:30", ("America/Coral_Harbour", "+", "02:30")),
  75. ("UTC+15:00", ("UTC", "+", "15:00")),
  76. ("UTC-04:43", ("UTC", "-", "04:43")),
  77. ("UTC", ("UTC", None, None)),
  78. ("UTC+1", ("UTC+1", None, None)),
  79. ]
  80. for tzname, expected in tests:
  81. with self.subTest(tzname=tzname):
  82. self.assertEqual(split_tzname_delta(tzname), expected)
  83. class CursorWrapperTests(TransactionTestCase):
  84. available_apps = []
  85. def _test_procedure(self, procedure_sql, params, param_types, kparams=None):
  86. with connection.cursor() as cursor:
  87. cursor.execute(procedure_sql)
  88. # Use a new cursor because in MySQL a procedure can't be used in the
  89. # same cursor in which it was created.
  90. with connection.cursor() as cursor:
  91. cursor.callproc("test_procedure", params, kparams)
  92. with connection.schema_editor() as editor:
  93. editor.remove_procedure("test_procedure", param_types)
  94. @skipUnlessDBFeature("create_test_procedure_without_params_sql")
  95. def test_callproc_without_params(self):
  96. self._test_procedure(
  97. connection.features.create_test_procedure_without_params_sql, [], []
  98. )
  99. @skipUnlessDBFeature("create_test_procedure_with_int_param_sql")
  100. def test_callproc_with_int_params(self):
  101. self._test_procedure(
  102. connection.features.create_test_procedure_with_int_param_sql,
  103. [1],
  104. ["INTEGER"],
  105. )
  106. @skipUnlessDBFeature(
  107. "create_test_procedure_with_int_param_sql", "supports_callproc_kwargs"
  108. )
  109. def test_callproc_kparams(self):
  110. self._test_procedure(
  111. connection.features.create_test_procedure_with_int_param_sql,
  112. [],
  113. ["INTEGER"],
  114. {"P_I": 1},
  115. )
  116. @skipIfDBFeature("supports_callproc_kwargs")
  117. def test_unsupported_callproc_kparams_raises_error(self):
  118. msg = (
  119. "Keyword parameters for callproc are not supported on this database "
  120. "backend."
  121. )
  122. with self.assertRaisesMessage(NotSupportedError, msg):
  123. with connection.cursor() as cursor:
  124. cursor.callproc("test_procedure", [], {"P_I": 1})