tests.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842
  1. from math import ceil
  2. from operator import attrgetter
  3. from django.core.exceptions import FieldDoesNotExist
  4. from django.db import (
  5. IntegrityError,
  6. NotSupportedError,
  7. OperationalError,
  8. ProgrammingError,
  9. connection,
  10. )
  11. from django.db.models import FileField, Value
  12. from django.db.models.functions import Lower, Now
  13. from django.test import (
  14. TestCase,
  15. override_settings,
  16. skipIfDBFeature,
  17. skipUnlessDBFeature,
  18. )
  19. from .models import (
  20. BigAutoFieldModel,
  21. Country,
  22. FieldsWithDbColumns,
  23. NoFields,
  24. NullableFields,
  25. Pizzeria,
  26. ProxyCountry,
  27. ProxyMultiCountry,
  28. ProxyMultiProxyCountry,
  29. ProxyProxyCountry,
  30. RelatedModel,
  31. Restaurant,
  32. SmallAutoFieldModel,
  33. State,
  34. TwoFields,
  35. UpsertConflict,
  36. )
  37. class BulkCreateTests(TestCase):
  38. def setUp(self):
  39. self.data = [
  40. Country(name="United States of America", iso_two_letter="US"),
  41. Country(name="The Netherlands", iso_two_letter="NL"),
  42. Country(name="Germany", iso_two_letter="DE"),
  43. Country(name="Czech Republic", iso_two_letter="CZ"),
  44. ]
  45. def test_simple(self):
  46. created = Country.objects.bulk_create(self.data)
  47. self.assertEqual(created, self.data)
  48. self.assertQuerySetEqual(
  49. Country.objects.order_by("-name"),
  50. [
  51. "United States of America",
  52. "The Netherlands",
  53. "Germany",
  54. "Czech Republic",
  55. ],
  56. attrgetter("name"),
  57. )
  58. created = Country.objects.bulk_create([])
  59. self.assertEqual(created, [])
  60. self.assertEqual(Country.objects.count(), 4)
  61. @skipUnlessDBFeature("has_bulk_insert")
  62. def test_efficiency(self):
  63. with self.assertNumQueries(1):
  64. Country.objects.bulk_create(self.data)
  65. @skipUnlessDBFeature("has_bulk_insert")
  66. def test_long_non_ascii_text(self):
  67. """
  68. Inserting non-ASCII values with a length in the range 2001 to 4000
  69. characters, i.e. 4002 to 8000 bytes, must be set as a CLOB on Oracle
  70. (#22144).
  71. """
  72. Country.objects.bulk_create([Country(description="Ж" * 3000)])
  73. self.assertEqual(Country.objects.count(), 1)
  74. @skipUnlessDBFeature("has_bulk_insert")
  75. def test_long_and_short_text(self):
  76. Country.objects.bulk_create(
  77. [
  78. Country(description="a" * 4001, iso_two_letter="A"),
  79. Country(description="a", iso_two_letter="B"),
  80. Country(description="Ж" * 2001, iso_two_letter="C"),
  81. Country(description="Ж", iso_two_letter="D"),
  82. ]
  83. )
  84. self.assertEqual(Country.objects.count(), 4)
  85. def test_multi_table_inheritance_unsupported(self):
  86. expected_message = "Can't bulk create a multi-table inherited model"
  87. with self.assertRaisesMessage(ValueError, expected_message):
  88. Pizzeria.objects.bulk_create(
  89. [
  90. Pizzeria(name="The Art of Pizza"),
  91. ]
  92. )
  93. with self.assertRaisesMessage(ValueError, expected_message):
  94. ProxyMultiCountry.objects.bulk_create(
  95. [
  96. ProxyMultiCountry(name="Fillory", iso_two_letter="FL"),
  97. ]
  98. )
  99. with self.assertRaisesMessage(ValueError, expected_message):
  100. ProxyMultiProxyCountry.objects.bulk_create(
  101. [
  102. ProxyMultiProxyCountry(name="Fillory", iso_two_letter="FL"),
  103. ]
  104. )
  105. def test_proxy_inheritance_supported(self):
  106. ProxyCountry.objects.bulk_create(
  107. [
  108. ProxyCountry(name="Qwghlm", iso_two_letter="QW"),
  109. Country(name="Tortall", iso_two_letter="TA"),
  110. ]
  111. )
  112. self.assertQuerySetEqual(
  113. ProxyCountry.objects.all(),
  114. {"Qwghlm", "Tortall"},
  115. attrgetter("name"),
  116. ordered=False,
  117. )
  118. ProxyProxyCountry.objects.bulk_create(
  119. [
  120. ProxyProxyCountry(name="Netherlands", iso_two_letter="NT"),
  121. ]
  122. )
  123. self.assertQuerySetEqual(
  124. ProxyProxyCountry.objects.all(),
  125. {
  126. "Qwghlm",
  127. "Tortall",
  128. "Netherlands",
  129. },
  130. attrgetter("name"),
  131. ordered=False,
  132. )
  133. def test_non_auto_increment_pk(self):
  134. State.objects.bulk_create(
  135. [State(two_letter_code=s) for s in ["IL", "NY", "CA", "ME"]]
  136. )
  137. self.assertQuerySetEqual(
  138. State.objects.order_by("two_letter_code"),
  139. [
  140. "CA",
  141. "IL",
  142. "ME",
  143. "NY",
  144. ],
  145. attrgetter("two_letter_code"),
  146. )
  147. @skipUnlessDBFeature("has_bulk_insert")
  148. def test_non_auto_increment_pk_efficiency(self):
  149. with self.assertNumQueries(1):
  150. State.objects.bulk_create(
  151. [State(two_letter_code=s) for s in ["IL", "NY", "CA", "ME"]]
  152. )
  153. self.assertQuerySetEqual(
  154. State.objects.order_by("two_letter_code"),
  155. [
  156. "CA",
  157. "IL",
  158. "ME",
  159. "NY",
  160. ],
  161. attrgetter("two_letter_code"),
  162. )
  163. @skipIfDBFeature("allows_auto_pk_0")
  164. def test_zero_as_autoval(self):
  165. """
  166. Zero as id for AutoField should raise exception in MySQL, because MySQL
  167. does not allow zero for automatic primary key if the
  168. NO_AUTO_VALUE_ON_ZERO SQL mode is not enabled.
  169. """
  170. valid_country = Country(name="Germany", iso_two_letter="DE")
  171. invalid_country = Country(id=0, name="Poland", iso_two_letter="PL")
  172. msg = "The database backend does not accept 0 as a value for AutoField."
  173. with self.assertRaisesMessage(ValueError, msg):
  174. Country.objects.bulk_create([valid_country, invalid_country])
  175. def test_batch_same_vals(self):
  176. # SQLite had a problem where all the same-valued models were
  177. # collapsed to one insert.
  178. Restaurant.objects.bulk_create([Restaurant(name="foo") for i in range(0, 2)])
  179. self.assertEqual(Restaurant.objects.count(), 2)
  180. def test_large_batch(self):
  181. TwoFields.objects.bulk_create(
  182. [TwoFields(f1=i, f2=i + 1) for i in range(0, 1001)]
  183. )
  184. self.assertEqual(TwoFields.objects.count(), 1001)
  185. self.assertEqual(
  186. TwoFields.objects.filter(f1__gte=450, f1__lte=550).count(), 101
  187. )
  188. self.assertEqual(TwoFields.objects.filter(f2__gte=901).count(), 101)
  189. @skipUnlessDBFeature("has_bulk_insert")
  190. def test_large_single_field_batch(self):
  191. # SQLite had a problem with more than 500 UNIONed selects in single
  192. # query.
  193. Restaurant.objects.bulk_create([Restaurant() for i in range(0, 501)])
  194. @skipUnlessDBFeature("has_bulk_insert")
  195. def test_large_batch_efficiency(self):
  196. with override_settings(DEBUG=True):
  197. connection.queries_log.clear()
  198. TwoFields.objects.bulk_create(
  199. [TwoFields(f1=i, f2=i + 1) for i in range(0, 1001)]
  200. )
  201. self.assertLess(len(connection.queries), 10)
  202. def test_large_batch_mixed(self):
  203. """
  204. Test inserting a large batch with objects having primary key set
  205. mixed together with objects without PK set.
  206. """
  207. TwoFields.objects.bulk_create(
  208. [
  209. TwoFields(id=i if i % 2 == 0 else None, f1=i, f2=i + 1)
  210. for i in range(100000, 101000)
  211. ]
  212. )
  213. self.assertEqual(TwoFields.objects.count(), 1000)
  214. # We can't assume much about the ID's created, except that the above
  215. # created IDs must exist.
  216. id_range = range(100000, 101000, 2)
  217. self.assertEqual(TwoFields.objects.filter(id__in=id_range).count(), 500)
  218. self.assertEqual(TwoFields.objects.exclude(id__in=id_range).count(), 500)
  219. @skipUnlessDBFeature("has_bulk_insert")
  220. def test_large_batch_mixed_efficiency(self):
  221. """
  222. Test inserting a large batch with objects having primary key set
  223. mixed together with objects without PK set.
  224. """
  225. with override_settings(DEBUG=True):
  226. connection.queries_log.clear()
  227. TwoFields.objects.bulk_create(
  228. [
  229. TwoFields(id=i if i % 2 == 0 else None, f1=i, f2=i + 1)
  230. for i in range(100000, 101000)
  231. ]
  232. )
  233. self.assertLess(len(connection.queries), 10)
  234. def test_explicit_batch_size(self):
  235. objs = [TwoFields(f1=i, f2=i) for i in range(0, 4)]
  236. num_objs = len(objs)
  237. TwoFields.objects.bulk_create(objs, batch_size=1)
  238. self.assertEqual(TwoFields.objects.count(), num_objs)
  239. TwoFields.objects.all().delete()
  240. TwoFields.objects.bulk_create(objs, batch_size=2)
  241. self.assertEqual(TwoFields.objects.count(), num_objs)
  242. TwoFields.objects.all().delete()
  243. TwoFields.objects.bulk_create(objs, batch_size=3)
  244. self.assertEqual(TwoFields.objects.count(), num_objs)
  245. TwoFields.objects.all().delete()
  246. TwoFields.objects.bulk_create(objs, batch_size=num_objs)
  247. self.assertEqual(TwoFields.objects.count(), num_objs)
  248. def test_empty_model(self):
  249. NoFields.objects.bulk_create([NoFields() for i in range(2)])
  250. self.assertEqual(NoFields.objects.count(), 2)
  251. @skipUnlessDBFeature("has_bulk_insert")
  252. def test_explicit_batch_size_efficiency(self):
  253. objs = [TwoFields(f1=i, f2=i) for i in range(0, 100)]
  254. with self.assertNumQueries(2):
  255. TwoFields.objects.bulk_create(objs, 50)
  256. TwoFields.objects.all().delete()
  257. with self.assertNumQueries(1):
  258. TwoFields.objects.bulk_create(objs, len(objs))
  259. @skipUnlessDBFeature("has_bulk_insert")
  260. def test_explicit_batch_size_respects_max_batch_size(self):
  261. objs = [Country(name=f"Country {i}") for i in range(1000)]
  262. fields = ["name", "iso_two_letter", "description"]
  263. max_batch_size = max(connection.ops.bulk_batch_size(fields, objs), 1)
  264. with self.assertNumQueries(ceil(len(objs) / max_batch_size)):
  265. Country.objects.bulk_create(objs, batch_size=max_batch_size + 1)
  266. @skipUnlessDBFeature("has_bulk_insert")
  267. def test_bulk_insert_expressions(self):
  268. Restaurant.objects.bulk_create(
  269. [
  270. Restaurant(name="Sam's Shake Shack"),
  271. Restaurant(name=Lower(Value("Betty's Beetroot Bar"))),
  272. ]
  273. )
  274. bbb = Restaurant.objects.filter(name="betty's beetroot bar")
  275. self.assertEqual(bbb.count(), 1)
  276. @skipUnlessDBFeature("has_bulk_insert")
  277. def test_bulk_insert_now(self):
  278. NullableFields.objects.bulk_create(
  279. [
  280. NullableFields(datetime_field=Now()),
  281. NullableFields(datetime_field=Now()),
  282. ]
  283. )
  284. self.assertEqual(
  285. NullableFields.objects.filter(datetime_field__isnull=False).count(),
  286. 2,
  287. )
  288. @skipUnlessDBFeature("has_bulk_insert")
  289. def test_bulk_insert_nullable_fields(self):
  290. fk_to_auto_fields = {
  291. "auto_field": NoFields.objects.create(),
  292. "small_auto_field": SmallAutoFieldModel.objects.create(),
  293. "big_auto_field": BigAutoFieldModel.objects.create(),
  294. }
  295. # NULL can be mixed with other values in nullable fields
  296. nullable_fields = [
  297. field for field in NullableFields._meta.get_fields() if field.name != "id"
  298. ]
  299. NullableFields.objects.bulk_create(
  300. [
  301. NullableFields(**{**fk_to_auto_fields, field.name: None})
  302. for field in nullable_fields
  303. ]
  304. )
  305. self.assertEqual(NullableFields.objects.count(), len(nullable_fields))
  306. for field in nullable_fields:
  307. with self.subTest(field=field):
  308. field_value = "" if isinstance(field, FileField) else None
  309. self.assertEqual(
  310. NullableFields.objects.filter(**{field.name: field_value}).count(),
  311. 1,
  312. )
  313. @skipUnlessDBFeature("can_return_rows_from_bulk_insert")
  314. def test_set_pk_and_insert_single_item(self):
  315. with self.assertNumQueries(1):
  316. countries = Country.objects.bulk_create([self.data[0]])
  317. self.assertEqual(len(countries), 1)
  318. self.assertEqual(Country.objects.get(pk=countries[0].pk), countries[0])
  319. @skipUnlessDBFeature("can_return_rows_from_bulk_insert")
  320. def test_set_pk_and_query_efficiency(self):
  321. with self.assertNumQueries(1):
  322. countries = Country.objects.bulk_create(self.data)
  323. self.assertEqual(len(countries), 4)
  324. self.assertEqual(Country.objects.get(pk=countries[0].pk), countries[0])
  325. self.assertEqual(Country.objects.get(pk=countries[1].pk), countries[1])
  326. self.assertEqual(Country.objects.get(pk=countries[2].pk), countries[2])
  327. self.assertEqual(Country.objects.get(pk=countries[3].pk), countries[3])
  328. @skipUnlessDBFeature("can_return_rows_from_bulk_insert")
  329. def test_set_state(self):
  330. country_nl = Country(name="Netherlands", iso_two_letter="NL")
  331. country_be = Country(name="Belgium", iso_two_letter="BE")
  332. Country.objects.bulk_create([country_nl])
  333. country_be.save()
  334. # Objects save via bulk_create() and save() should have equal state.
  335. self.assertEqual(country_nl._state.adding, country_be._state.adding)
  336. self.assertEqual(country_nl._state.db, country_be._state.db)
  337. def test_set_state_with_pk_specified(self):
  338. state_ca = State(two_letter_code="CA")
  339. state_ny = State(two_letter_code="NY")
  340. State.objects.bulk_create([state_ca])
  341. state_ny.save()
  342. # Objects save via bulk_create() and save() should have equal state.
  343. self.assertEqual(state_ca._state.adding, state_ny._state.adding)
  344. self.assertEqual(state_ca._state.db, state_ny._state.db)
  345. @skipIfDBFeature("supports_ignore_conflicts")
  346. def test_ignore_conflicts_value_error(self):
  347. message = "This database backend does not support ignoring conflicts."
  348. with self.assertRaisesMessage(NotSupportedError, message):
  349. TwoFields.objects.bulk_create(self.data, ignore_conflicts=True)
  350. @skipUnlessDBFeature("supports_ignore_conflicts")
  351. def test_ignore_conflicts_ignore(self):
  352. data = [
  353. TwoFields(f1=1, f2=1),
  354. TwoFields(f1=2, f2=2),
  355. TwoFields(f1=3, f2=3),
  356. ]
  357. TwoFields.objects.bulk_create(data)
  358. self.assertEqual(TwoFields.objects.count(), 3)
  359. # With ignore_conflicts=True, conflicts are ignored.
  360. conflicting_objects = [
  361. TwoFields(f1=2, f2=2),
  362. TwoFields(f1=3, f2=3),
  363. ]
  364. TwoFields.objects.bulk_create([conflicting_objects[0]], ignore_conflicts=True)
  365. TwoFields.objects.bulk_create(conflicting_objects, ignore_conflicts=True)
  366. self.assertEqual(TwoFields.objects.count(), 3)
  367. self.assertIsNone(conflicting_objects[0].pk)
  368. self.assertIsNone(conflicting_objects[1].pk)
  369. # New objects are created and conflicts are ignored.
  370. new_object = TwoFields(f1=4, f2=4)
  371. TwoFields.objects.bulk_create(
  372. conflicting_objects + [new_object], ignore_conflicts=True
  373. )
  374. self.assertEqual(TwoFields.objects.count(), 4)
  375. self.assertIsNone(new_object.pk)
  376. # Without ignore_conflicts=True, there's a problem.
  377. with self.assertRaises(IntegrityError):
  378. TwoFields.objects.bulk_create(conflicting_objects)
  379. def test_nullable_fk_after_parent(self):
  380. parent = NoFields()
  381. child = NullableFields(auto_field=parent, integer_field=88)
  382. parent.save()
  383. NullableFields.objects.bulk_create([child])
  384. child = NullableFields.objects.get(integer_field=88)
  385. self.assertEqual(child.auto_field, parent)
  386. @skipUnlessDBFeature("can_return_rows_from_bulk_insert")
  387. def test_nullable_fk_after_parent_bulk_create(self):
  388. parent = NoFields()
  389. child = NullableFields(auto_field=parent, integer_field=88)
  390. NoFields.objects.bulk_create([parent])
  391. NullableFields.objects.bulk_create([child])
  392. child = NullableFields.objects.get(integer_field=88)
  393. self.assertEqual(child.auto_field, parent)
  394. def test_unsaved_parent(self):
  395. parent = NoFields()
  396. msg = (
  397. "bulk_create() prohibited to prevent data loss due to unsaved "
  398. "related object 'auto_field'."
  399. )
  400. with self.assertRaisesMessage(ValueError, msg):
  401. NullableFields.objects.bulk_create([NullableFields(auto_field=parent)])
  402. def test_invalid_batch_size_exception(self):
  403. msg = "Batch size must be a positive integer."
  404. with self.assertRaisesMessage(ValueError, msg):
  405. Country.objects.bulk_create([], batch_size=-1)
  406. @skipIfDBFeature("supports_update_conflicts")
  407. def test_update_conflicts_unsupported(self):
  408. msg = "This database backend does not support updating conflicts."
  409. with self.assertRaisesMessage(NotSupportedError, msg):
  410. Country.objects.bulk_create(self.data, update_conflicts=True)
  411. @skipUnlessDBFeature("supports_ignore_conflicts", "supports_update_conflicts")
  412. def test_ignore_update_conflicts_exclusive(self):
  413. msg = "ignore_conflicts and update_conflicts are mutually exclusive"
  414. with self.assertRaisesMessage(ValueError, msg):
  415. Country.objects.bulk_create(
  416. self.data,
  417. ignore_conflicts=True,
  418. update_conflicts=True,
  419. )
  420. @skipUnlessDBFeature("supports_update_conflicts")
  421. def test_update_conflicts_no_update_fields(self):
  422. msg = (
  423. "Fields that will be updated when a row insertion fails on "
  424. "conflicts must be provided."
  425. )
  426. with self.assertRaisesMessage(ValueError, msg):
  427. Country.objects.bulk_create(self.data, update_conflicts=True)
  428. @skipUnlessDBFeature("supports_update_conflicts")
  429. @skipIfDBFeature("supports_update_conflicts_with_target")
  430. def test_update_conflicts_unique_field_unsupported(self):
  431. msg = (
  432. "This database backend does not support updating conflicts with "
  433. "specifying unique fields that can trigger the upsert."
  434. )
  435. with self.assertRaisesMessage(NotSupportedError, msg):
  436. TwoFields.objects.bulk_create(
  437. [TwoFields(f1=1, f2=1), TwoFields(f1=2, f2=2)],
  438. update_conflicts=True,
  439. update_fields=["f2"],
  440. unique_fields=["f1"],
  441. )
  442. @skipUnlessDBFeature("supports_update_conflicts")
  443. def test_update_conflicts_nonexistent_update_fields(self):
  444. unique_fields = None
  445. if connection.features.supports_update_conflicts_with_target:
  446. unique_fields = ["f1"]
  447. msg = "TwoFields has no field named 'nonexistent'"
  448. with self.assertRaisesMessage(FieldDoesNotExist, msg):
  449. TwoFields.objects.bulk_create(
  450. [TwoFields(f1=1, f2=1), TwoFields(f1=2, f2=2)],
  451. update_conflicts=True,
  452. update_fields=["nonexistent"],
  453. unique_fields=unique_fields,
  454. )
  455. @skipUnlessDBFeature(
  456. "supports_update_conflicts",
  457. "supports_update_conflicts_with_target",
  458. )
  459. def test_update_conflicts_unique_fields_required(self):
  460. msg = "Unique fields that can trigger the upsert must be provided."
  461. with self.assertRaisesMessage(ValueError, msg):
  462. TwoFields.objects.bulk_create(
  463. [TwoFields(f1=1, f2=1), TwoFields(f1=2, f2=2)],
  464. update_conflicts=True,
  465. update_fields=["f1"],
  466. )
  467. @skipUnlessDBFeature(
  468. "supports_update_conflicts",
  469. "supports_update_conflicts_with_target",
  470. )
  471. def test_update_conflicts_invalid_update_fields(self):
  472. msg = "bulk_create() can only be used with concrete fields in update_fields."
  473. # Reverse one-to-one relationship.
  474. with self.assertRaisesMessage(ValueError, msg):
  475. Country.objects.bulk_create(
  476. self.data,
  477. update_conflicts=True,
  478. update_fields=["relatedmodel"],
  479. unique_fields=["pk"],
  480. )
  481. # Many-to-many relationship.
  482. with self.assertRaisesMessage(ValueError, msg):
  483. RelatedModel.objects.bulk_create(
  484. [RelatedModel(country=self.data[0])],
  485. update_conflicts=True,
  486. update_fields=["big_auto_fields"],
  487. unique_fields=["country"],
  488. )
  489. @skipUnlessDBFeature(
  490. "supports_update_conflicts",
  491. "supports_update_conflicts_with_target",
  492. )
  493. def test_update_conflicts_pk_in_update_fields(self):
  494. msg = "bulk_create() cannot be used with primary keys in update_fields."
  495. with self.assertRaisesMessage(ValueError, msg):
  496. BigAutoFieldModel.objects.bulk_create(
  497. [BigAutoFieldModel()],
  498. update_conflicts=True,
  499. update_fields=["id"],
  500. unique_fields=["id"],
  501. )
  502. @skipUnlessDBFeature(
  503. "supports_update_conflicts",
  504. "supports_update_conflicts_with_target",
  505. )
  506. def test_update_conflicts_invalid_unique_fields(self):
  507. msg = "bulk_create() can only be used with concrete fields in unique_fields."
  508. # Reverse one-to-one relationship.
  509. with self.assertRaisesMessage(ValueError, msg):
  510. Country.objects.bulk_create(
  511. self.data,
  512. update_conflicts=True,
  513. update_fields=["name"],
  514. unique_fields=["relatedmodel"],
  515. )
  516. # Many-to-many relationship.
  517. with self.assertRaisesMessage(ValueError, msg):
  518. RelatedModel.objects.bulk_create(
  519. [RelatedModel(country=self.data[0])],
  520. update_conflicts=True,
  521. update_fields=["name"],
  522. unique_fields=["big_auto_fields"],
  523. )
  524. def _test_update_conflicts_two_fields(self, unique_fields):
  525. TwoFields.objects.bulk_create(
  526. [
  527. TwoFields(f1=1, f2=1, name="a"),
  528. TwoFields(f1=2, f2=2, name="b"),
  529. ]
  530. )
  531. self.assertEqual(TwoFields.objects.count(), 2)
  532. conflicting_objects = [
  533. TwoFields(f1=1, f2=1, name="c"),
  534. TwoFields(f1=2, f2=2, name="d"),
  535. ]
  536. results = TwoFields.objects.bulk_create(
  537. conflicting_objects,
  538. update_conflicts=True,
  539. unique_fields=unique_fields,
  540. update_fields=["name"],
  541. )
  542. self.assertEqual(len(results), len(conflicting_objects))
  543. if connection.features.can_return_rows_from_bulk_insert:
  544. for instance in results:
  545. self.assertIsNotNone(instance.pk)
  546. self.assertEqual(TwoFields.objects.count(), 2)
  547. self.assertCountEqual(
  548. TwoFields.objects.values("f1", "f2", "name"),
  549. [
  550. {"f1": 1, "f2": 1, "name": "c"},
  551. {"f1": 2, "f2": 2, "name": "d"},
  552. ],
  553. )
  554. @skipUnlessDBFeature(
  555. "supports_update_conflicts", "supports_update_conflicts_with_target"
  556. )
  557. def test_update_conflicts_two_fields_unique_fields_first(self):
  558. self._test_update_conflicts_two_fields(["f1"])
  559. @skipUnlessDBFeature(
  560. "supports_update_conflicts", "supports_update_conflicts_with_target"
  561. )
  562. def test_update_conflicts_two_fields_unique_fields_second(self):
  563. self._test_update_conflicts_two_fields(["f2"])
  564. @skipUnlessDBFeature(
  565. "supports_update_conflicts", "supports_update_conflicts_with_target"
  566. )
  567. def test_update_conflicts_unique_fields_pk(self):
  568. TwoFields.objects.bulk_create(
  569. [
  570. TwoFields(f1=1, f2=1, name="a"),
  571. TwoFields(f1=2, f2=2, name="b"),
  572. ]
  573. )
  574. obj1 = TwoFields.objects.get(f1=1)
  575. obj2 = TwoFields.objects.get(f1=2)
  576. conflicting_objects = [
  577. TwoFields(pk=obj1.pk, f1=3, f2=3, name="c"),
  578. TwoFields(pk=obj2.pk, f1=4, f2=4, name="d"),
  579. ]
  580. results = TwoFields.objects.bulk_create(
  581. conflicting_objects,
  582. update_conflicts=True,
  583. unique_fields=["pk"],
  584. update_fields=["name"],
  585. )
  586. self.assertEqual(len(results), len(conflicting_objects))
  587. if connection.features.can_return_rows_from_bulk_insert:
  588. for instance in results:
  589. self.assertIsNotNone(instance.pk)
  590. self.assertEqual(TwoFields.objects.count(), 2)
  591. self.assertCountEqual(
  592. TwoFields.objects.values("f1", "f2", "name"),
  593. [
  594. {"f1": 1, "f2": 1, "name": "c"},
  595. {"f1": 2, "f2": 2, "name": "d"},
  596. ],
  597. )
  598. @skipUnlessDBFeature(
  599. "supports_update_conflicts", "supports_update_conflicts_with_target"
  600. )
  601. def test_update_conflicts_two_fields_unique_fields_both(self):
  602. with self.assertRaises((OperationalError, ProgrammingError)):
  603. self._test_update_conflicts_two_fields(["f1", "f2"])
  604. @skipUnlessDBFeature("supports_update_conflicts")
  605. @skipIfDBFeature("supports_update_conflicts_with_target")
  606. def test_update_conflicts_two_fields_no_unique_fields(self):
  607. self._test_update_conflicts_two_fields([])
  608. def _test_update_conflicts_unique_two_fields(self, unique_fields):
  609. Country.objects.bulk_create(self.data)
  610. self.assertEqual(Country.objects.count(), 4)
  611. new_data = [
  612. # Conflicting countries.
  613. Country(
  614. name="Germany",
  615. iso_two_letter="DE",
  616. description=("Germany is a country in Central Europe."),
  617. ),
  618. Country(
  619. name="Czech Republic",
  620. iso_two_letter="CZ",
  621. description=(
  622. "The Czech Republic is a landlocked country in Central Europe."
  623. ),
  624. ),
  625. # New countries.
  626. Country(name="Australia", iso_two_letter="AU"),
  627. Country(
  628. name="Japan",
  629. iso_two_letter="JP",
  630. description=("Japan is an island country in East Asia."),
  631. ),
  632. ]
  633. results = Country.objects.bulk_create(
  634. new_data,
  635. update_conflicts=True,
  636. update_fields=["description"],
  637. unique_fields=unique_fields,
  638. )
  639. self.assertEqual(len(results), len(new_data))
  640. if connection.features.can_return_rows_from_bulk_insert:
  641. for instance in results:
  642. self.assertIsNotNone(instance.pk)
  643. self.assertEqual(Country.objects.count(), 6)
  644. self.assertCountEqual(
  645. Country.objects.values("iso_two_letter", "description"),
  646. [
  647. {"iso_two_letter": "US", "description": ""},
  648. {"iso_two_letter": "NL", "description": ""},
  649. {
  650. "iso_two_letter": "DE",
  651. "description": ("Germany is a country in Central Europe."),
  652. },
  653. {
  654. "iso_two_letter": "CZ",
  655. "description": (
  656. "The Czech Republic is a landlocked country in Central Europe."
  657. ),
  658. },
  659. {"iso_two_letter": "AU", "description": ""},
  660. {
  661. "iso_two_letter": "JP",
  662. "description": ("Japan is an island country in East Asia."),
  663. },
  664. ],
  665. )
  666. @skipUnlessDBFeature(
  667. "supports_update_conflicts", "supports_update_conflicts_with_target"
  668. )
  669. def test_update_conflicts_unique_two_fields_unique_fields_both(self):
  670. self._test_update_conflicts_unique_two_fields(["iso_two_letter", "name"])
  671. @skipUnlessDBFeature(
  672. "supports_update_conflicts", "supports_update_conflicts_with_target"
  673. )
  674. def test_update_conflicts_unique_two_fields_unique_fields_one(self):
  675. with self.assertRaises((OperationalError, ProgrammingError)):
  676. self._test_update_conflicts_unique_two_fields(["iso_two_letter"])
  677. @skipUnlessDBFeature("supports_update_conflicts")
  678. @skipIfDBFeature("supports_update_conflicts_with_target")
  679. def test_update_conflicts_unique_two_fields_unique_no_unique_fields(self):
  680. self._test_update_conflicts_unique_two_fields([])
  681. def _test_update_conflicts(self, unique_fields):
  682. UpsertConflict.objects.bulk_create(
  683. [
  684. UpsertConflict(number=1, rank=1, name="John"),
  685. UpsertConflict(number=2, rank=2, name="Mary"),
  686. UpsertConflict(number=3, rank=3, name="Hannah"),
  687. ]
  688. )
  689. self.assertEqual(UpsertConflict.objects.count(), 3)
  690. conflicting_objects = [
  691. UpsertConflict(number=1, rank=4, name="Steve"),
  692. UpsertConflict(number=2, rank=2, name="Olivia"),
  693. UpsertConflict(number=3, rank=1, name="Hannah"),
  694. ]
  695. results = UpsertConflict.objects.bulk_create(
  696. conflicting_objects,
  697. update_conflicts=True,
  698. update_fields=["name", "rank"],
  699. unique_fields=unique_fields,
  700. )
  701. self.assertEqual(len(results), len(conflicting_objects))
  702. if connection.features.can_return_rows_from_bulk_insert:
  703. for instance in results:
  704. self.assertIsNotNone(instance.pk)
  705. self.assertEqual(UpsertConflict.objects.count(), 3)
  706. self.assertCountEqual(
  707. UpsertConflict.objects.values("number", "rank", "name"),
  708. [
  709. {"number": 1, "rank": 4, "name": "Steve"},
  710. {"number": 2, "rank": 2, "name": "Olivia"},
  711. {"number": 3, "rank": 1, "name": "Hannah"},
  712. ],
  713. )
  714. results = UpsertConflict.objects.bulk_create(
  715. conflicting_objects + [UpsertConflict(number=4, rank=4, name="Mark")],
  716. update_conflicts=True,
  717. update_fields=["name", "rank"],
  718. unique_fields=unique_fields,
  719. )
  720. self.assertEqual(len(results), 4)
  721. if connection.features.can_return_rows_from_bulk_insert:
  722. for instance in results:
  723. self.assertIsNotNone(instance.pk)
  724. self.assertEqual(UpsertConflict.objects.count(), 4)
  725. self.assertCountEqual(
  726. UpsertConflict.objects.values("number", "rank", "name"),
  727. [
  728. {"number": 1, "rank": 4, "name": "Steve"},
  729. {"number": 2, "rank": 2, "name": "Olivia"},
  730. {"number": 3, "rank": 1, "name": "Hannah"},
  731. {"number": 4, "rank": 4, "name": "Mark"},
  732. ],
  733. )
  734. @skipUnlessDBFeature(
  735. "supports_update_conflicts", "supports_update_conflicts_with_target"
  736. )
  737. def test_update_conflicts_unique_fields(self):
  738. self._test_update_conflicts(unique_fields=["number"])
  739. @skipUnlessDBFeature("supports_update_conflicts")
  740. @skipIfDBFeature("supports_update_conflicts_with_target")
  741. def test_update_conflicts_no_unique_fields(self):
  742. self._test_update_conflicts([])
  743. @skipUnlessDBFeature(
  744. "supports_update_conflicts", "supports_update_conflicts_with_target"
  745. )
  746. def test_update_conflicts_unique_fields_update_fields_db_column(self):
  747. FieldsWithDbColumns.objects.bulk_create(
  748. [
  749. FieldsWithDbColumns(rank=1, name="a"),
  750. FieldsWithDbColumns(rank=2, name="b"),
  751. ]
  752. )
  753. self.assertEqual(FieldsWithDbColumns.objects.count(), 2)
  754. conflicting_objects = [
  755. FieldsWithDbColumns(rank=1, name="c"),
  756. FieldsWithDbColumns(rank=2, name="d"),
  757. ]
  758. results = FieldsWithDbColumns.objects.bulk_create(
  759. conflicting_objects,
  760. update_conflicts=True,
  761. unique_fields=["rank"],
  762. update_fields=["name"],
  763. )
  764. self.assertEqual(len(results), len(conflicting_objects))
  765. if connection.features.can_return_rows_from_bulk_insert:
  766. for instance in results:
  767. self.assertIsNotNone(instance.pk)
  768. self.assertEqual(FieldsWithDbColumns.objects.count(), 2)
  769. self.assertCountEqual(
  770. FieldsWithDbColumns.objects.values("rank", "name"),
  771. [
  772. {"rank": 1, "name": "c"},
  773. {"rank": 2, "name": "d"},
  774. ],
  775. )