tests.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. from __future__ import unicode_literals
  2. from operator import attrgetter
  3. from django.db import connection
  4. from django.db.models import Value
  5. from django.db.models.functions import Lower
  6. from django.test import (
  7. TestCase, override_settings, skipIfDBFeature, skipUnlessDBFeature,
  8. )
  9. from .models import (
  10. Country, NoFields, Pizzeria, ProxyCountry, ProxyMultiCountry,
  11. ProxyMultiProxyCountry, ProxyProxyCountry, Restaurant, State, TwoFields,
  12. )
  13. class BulkCreateTests(TestCase):
  14. def setUp(self):
  15. self.data = [
  16. Country(name="United States of America", iso_two_letter="US"),
  17. Country(name="The Netherlands", iso_two_letter="NL"),
  18. Country(name="Germany", iso_two_letter="DE"),
  19. Country(name="Czech Republic", iso_two_letter="CZ")
  20. ]
  21. def test_simple(self):
  22. created = Country.objects.bulk_create(self.data)
  23. self.assertEqual(len(created), 4)
  24. self.assertQuerysetEqual(Country.objects.order_by("-name"), [
  25. "United States of America", "The Netherlands", "Germany", "Czech Republic"
  26. ], attrgetter("name"))
  27. created = Country.objects.bulk_create([])
  28. self.assertEqual(created, [])
  29. self.assertEqual(Country.objects.count(), 4)
  30. @skipUnlessDBFeature('has_bulk_insert')
  31. def test_efficiency(self):
  32. with self.assertNumQueries(1):
  33. Country.objects.bulk_create(self.data)
  34. def test_multi_table_inheritance_unsupported(self):
  35. expected_message = "Can't bulk create a multi-table inherited model"
  36. with self.assertRaisesMessage(ValueError, expected_message):
  37. Pizzeria.objects.bulk_create([
  38. Pizzeria(name="The Art of Pizza"),
  39. ])
  40. with self.assertRaisesMessage(ValueError, expected_message):
  41. ProxyMultiCountry.objects.bulk_create([
  42. ProxyMultiCountry(name="Fillory", iso_two_letter="FL"),
  43. ])
  44. with self.assertRaisesMessage(ValueError, expected_message):
  45. ProxyMultiProxyCountry.objects.bulk_create([
  46. ProxyMultiProxyCountry(name="Fillory", iso_two_letter="FL"),
  47. ])
  48. def test_proxy_inheritance_supported(self):
  49. ProxyCountry.objects.bulk_create([
  50. ProxyCountry(name="Qwghlm", iso_two_letter="QW"),
  51. Country(name="Tortall", iso_two_letter="TA"),
  52. ])
  53. self.assertQuerysetEqual(ProxyCountry.objects.all(), {
  54. "Qwghlm", "Tortall"
  55. }, attrgetter("name"), ordered=False)
  56. ProxyProxyCountry.objects.bulk_create([
  57. ProxyProxyCountry(name="Netherlands", iso_two_letter="NT"),
  58. ])
  59. self.assertQuerysetEqual(ProxyProxyCountry.objects.all(), {
  60. "Qwghlm", "Tortall", "Netherlands",
  61. }, attrgetter("name"), ordered=False)
  62. def test_non_auto_increment_pk(self):
  63. State.objects.bulk_create([
  64. State(two_letter_code=s)
  65. for s in ["IL", "NY", "CA", "ME"]
  66. ])
  67. self.assertQuerysetEqual(State.objects.order_by("two_letter_code"), [
  68. "CA", "IL", "ME", "NY",
  69. ], attrgetter("two_letter_code"))
  70. @skipUnlessDBFeature('has_bulk_insert')
  71. def test_non_auto_increment_pk_efficiency(self):
  72. with self.assertNumQueries(1):
  73. State.objects.bulk_create([
  74. State(two_letter_code=s)
  75. for s in ["IL", "NY", "CA", "ME"]
  76. ])
  77. self.assertQuerysetEqual(State.objects.order_by("two_letter_code"), [
  78. "CA", "IL", "ME", "NY",
  79. ], attrgetter("two_letter_code"))
  80. @skipIfDBFeature('allows_auto_pk_0')
  81. def test_zero_as_autoval(self):
  82. """
  83. Zero as id for AutoField should raise exception in MySQL, because MySQL
  84. does not allow zero for automatic primary key.
  85. """
  86. valid_country = Country(name='Germany', iso_two_letter='DE')
  87. invalid_country = Country(id=0, name='Poland', iso_two_letter='PL')
  88. with self.assertRaises(ValueError):
  89. Country.objects.bulk_create([valid_country, invalid_country])
  90. def test_batch_same_vals(self):
  91. # Sqlite had a problem where all the same-valued models were
  92. # collapsed to one insert.
  93. Restaurant.objects.bulk_create([
  94. Restaurant(name='foo') for i in range(0, 2)
  95. ])
  96. self.assertEqual(Restaurant.objects.count(), 2)
  97. def test_large_batch(self):
  98. with override_settings(DEBUG=True):
  99. connection.queries_log.clear()
  100. TwoFields.objects.bulk_create([
  101. TwoFields(f1=i, f2=i + 1) for i in range(0, 1001)
  102. ])
  103. self.assertEqual(TwoFields.objects.count(), 1001)
  104. self.assertEqual(
  105. TwoFields.objects.filter(f1__gte=450, f1__lte=550).count(),
  106. 101)
  107. self.assertEqual(TwoFields.objects.filter(f2__gte=901).count(), 101)
  108. @skipUnlessDBFeature('has_bulk_insert')
  109. def test_large_single_field_batch(self):
  110. # SQLite had a problem with more than 500 UNIONed selects in single
  111. # query.
  112. Restaurant.objects.bulk_create([
  113. Restaurant() for i in range(0, 501)
  114. ])
  115. @skipUnlessDBFeature('has_bulk_insert')
  116. def test_large_batch_efficiency(self):
  117. with override_settings(DEBUG=True):
  118. connection.queries_log.clear()
  119. TwoFields.objects.bulk_create([
  120. TwoFields(f1=i, f2=i + 1) for i in range(0, 1001)
  121. ])
  122. self.assertLess(len(connection.queries), 10)
  123. def test_large_batch_mixed(self):
  124. """
  125. Test inserting a large batch with objects having primary key set
  126. mixed together with objects without PK set.
  127. """
  128. with override_settings(DEBUG=True):
  129. connection.queries_log.clear()
  130. TwoFields.objects.bulk_create([
  131. TwoFields(id=i if i % 2 == 0 else None, f1=i, f2=i + 1)
  132. for i in range(100000, 101000)])
  133. self.assertEqual(TwoFields.objects.count(), 1000)
  134. # We can't assume much about the ID's created, except that the above
  135. # created IDs must exist.
  136. id_range = range(100000, 101000, 2)
  137. self.assertEqual(TwoFields.objects.filter(id__in=id_range).count(), 500)
  138. self.assertEqual(TwoFields.objects.exclude(id__in=id_range).count(), 500)
  139. @skipUnlessDBFeature('has_bulk_insert')
  140. def test_large_batch_mixed_efficiency(self):
  141. """
  142. Test inserting a large batch with objects having primary key set
  143. mixed together with objects without PK set.
  144. """
  145. with override_settings(DEBUG=True):
  146. connection.queries_log.clear()
  147. TwoFields.objects.bulk_create([
  148. TwoFields(id=i if i % 2 == 0 else None, f1=i, f2=i + 1)
  149. for i in range(100000, 101000)])
  150. self.assertLess(len(connection.queries), 10)
  151. def test_explicit_batch_size(self):
  152. objs = [TwoFields(f1=i, f2=i) for i in range(0, 4)]
  153. TwoFields.objects.bulk_create(objs, 2)
  154. self.assertEqual(TwoFields.objects.count(), len(objs))
  155. TwoFields.objects.all().delete()
  156. TwoFields.objects.bulk_create(objs, len(objs))
  157. self.assertEqual(TwoFields.objects.count(), len(objs))
  158. def test_empty_model(self):
  159. NoFields.objects.bulk_create([NoFields() for i in range(2)])
  160. self.assertEqual(NoFields.objects.count(), 2)
  161. @skipUnlessDBFeature('has_bulk_insert')
  162. def test_explicit_batch_size_efficiency(self):
  163. objs = [TwoFields(f1=i, f2=i) for i in range(0, 100)]
  164. with self.assertNumQueries(2):
  165. TwoFields.objects.bulk_create(objs, 50)
  166. TwoFields.objects.all().delete()
  167. with self.assertNumQueries(1):
  168. TwoFields.objects.bulk_create(objs, len(objs))
  169. @skipUnlessDBFeature('has_bulk_insert')
  170. def test_bulk_insert_expressions(self):
  171. Restaurant.objects.bulk_create([
  172. Restaurant(name="Sam's Shake Shack"),
  173. Restaurant(name=Lower(Value("Betty's Beetroot Bar")))
  174. ])
  175. bbb = Restaurant.objects.filter(name="betty's beetroot bar")
  176. self.assertEqual(bbb.count(), 1)