123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868 |
- from math import ceil
- from operator import attrgetter
- from django.core.exceptions import FieldDoesNotExist
- from django.db import (
- IntegrityError,
- NotSupportedError,
- OperationalError,
- ProgrammingError,
- connection,
- )
- from django.db.models import FileField, Value
- from django.db.models.functions import Lower, Now
- from django.test import (
- TestCase,
- override_settings,
- skipIfDBFeature,
- skipUnlessDBFeature,
- )
- from django.utils import timezone
- from .models import (
- BigAutoFieldModel,
- Country,
- DbDefaultModel,
- FieldsWithDbColumns,
- NoFields,
- NullableFields,
- Pizzeria,
- ProxyCountry,
- ProxyMultiCountry,
- ProxyMultiProxyCountry,
- ProxyProxyCountry,
- RelatedModel,
- Restaurant,
- SmallAutoFieldModel,
- State,
- TwoFields,
- UpsertConflict,
- )
- class BulkCreateTests(TestCase):
- def setUp(self):
- self.data = [
- Country(name="United States of America", iso_two_letter="US"),
- Country(name="The Netherlands", iso_two_letter="NL"),
- Country(name="Germany", iso_two_letter="DE"),
- Country(name="Czech Republic", iso_two_letter="CZ"),
- ]
- def test_simple(self):
- created = Country.objects.bulk_create(self.data)
- self.assertEqual(created, self.data)
- self.assertQuerySetEqual(
- Country.objects.order_by("-name"),
- [
- "United States of America",
- "The Netherlands",
- "Germany",
- "Czech Republic",
- ],
- attrgetter("name"),
- )
- created = Country.objects.bulk_create([])
- self.assertEqual(created, [])
- self.assertEqual(Country.objects.count(), 4)
- @skipUnlessDBFeature("has_bulk_insert")
- def test_efficiency(self):
- with self.assertNumQueries(1):
- Country.objects.bulk_create(self.data)
- @skipUnlessDBFeature("has_bulk_insert")
- def test_long_non_ascii_text(self):
- """
- Inserting non-ASCII values with a length in the range 2001 to 4000
- characters, i.e. 4002 to 8000 bytes, must be set as a CLOB on Oracle
- (#22144).
- """
- Country.objects.bulk_create([Country(description="Ж" * 3000)])
- self.assertEqual(Country.objects.count(), 1)
- @skipUnlessDBFeature("has_bulk_insert")
- def test_long_and_short_text(self):
- Country.objects.bulk_create(
- [
- Country(description="a" * 4001, iso_two_letter="A"),
- Country(description="a", iso_two_letter="B"),
- Country(description="Ж" * 2001, iso_two_letter="C"),
- Country(description="Ж", iso_two_letter="D"),
- ]
- )
- self.assertEqual(Country.objects.count(), 4)
- def test_multi_table_inheritance_unsupported(self):
- expected_message = "Can't bulk create a multi-table inherited model"
- with self.assertRaisesMessage(ValueError, expected_message):
- Pizzeria.objects.bulk_create(
- [
- Pizzeria(name="The Art of Pizza"),
- ]
- )
- with self.assertRaisesMessage(ValueError, expected_message):
- ProxyMultiCountry.objects.bulk_create(
- [
- ProxyMultiCountry(name="Fillory", iso_two_letter="FL"),
- ]
- )
- with self.assertRaisesMessage(ValueError, expected_message):
- ProxyMultiProxyCountry.objects.bulk_create(
- [
- ProxyMultiProxyCountry(name="Fillory", iso_two_letter="FL"),
- ]
- )
- def test_proxy_inheritance_supported(self):
- ProxyCountry.objects.bulk_create(
- [
- ProxyCountry(name="Qwghlm", iso_two_letter="QW"),
- Country(name="Tortall", iso_two_letter="TA"),
- ]
- )
- self.assertQuerySetEqual(
- ProxyCountry.objects.all(),
- {"Qwghlm", "Tortall"},
- attrgetter("name"),
- ordered=False,
- )
- ProxyProxyCountry.objects.bulk_create(
- [
- ProxyProxyCountry(name="Netherlands", iso_two_letter="NT"),
- ]
- )
- self.assertQuerySetEqual(
- ProxyProxyCountry.objects.all(),
- {
- "Qwghlm",
- "Tortall",
- "Netherlands",
- },
- attrgetter("name"),
- ordered=False,
- )
- def test_non_auto_increment_pk(self):
- State.objects.bulk_create(
- [State(two_letter_code=s) for s in ["IL", "NY", "CA", "ME"]]
- )
- self.assertQuerySetEqual(
- State.objects.order_by("two_letter_code"),
- [
- "CA",
- "IL",
- "ME",
- "NY",
- ],
- attrgetter("two_letter_code"),
- )
- @skipUnlessDBFeature("has_bulk_insert")
- def test_non_auto_increment_pk_efficiency(self):
- with self.assertNumQueries(1):
- State.objects.bulk_create(
- [State(two_letter_code=s) for s in ["IL", "NY", "CA", "ME"]]
- )
- self.assertQuerySetEqual(
- State.objects.order_by("two_letter_code"),
- [
- "CA",
- "IL",
- "ME",
- "NY",
- ],
- attrgetter("two_letter_code"),
- )
- @skipIfDBFeature("allows_auto_pk_0")
- def test_zero_as_autoval(self):
- """
- Zero as id for AutoField should raise exception in MySQL, because MySQL
- does not allow zero for automatic primary key if the
- NO_AUTO_VALUE_ON_ZERO SQL mode is not enabled.
- """
- valid_country = Country(name="Germany", iso_two_letter="DE")
- invalid_country = Country(id=0, name="Poland", iso_two_letter="PL")
- msg = "The database backend does not accept 0 as a value for AutoField."
- with self.assertRaisesMessage(ValueError, msg):
- Country.objects.bulk_create([valid_country, invalid_country])
- def test_batch_same_vals(self):
- # SQLite had a problem where all the same-valued models were
- # collapsed to one insert.
- Restaurant.objects.bulk_create([Restaurant(name="foo") for i in range(0, 2)])
- self.assertEqual(Restaurant.objects.count(), 2)
- def test_large_batch(self):
- TwoFields.objects.bulk_create(
- [TwoFields(f1=i, f2=i + 1) for i in range(0, 1001)]
- )
- self.assertEqual(TwoFields.objects.count(), 1001)
- self.assertEqual(
- TwoFields.objects.filter(f1__gte=450, f1__lte=550).count(), 101
- )
- self.assertEqual(TwoFields.objects.filter(f2__gte=901).count(), 101)
- @skipUnlessDBFeature("has_bulk_insert")
- def test_large_single_field_batch(self):
- # SQLite had a problem with more than 500 UNIONed selects in single
- # query.
- Restaurant.objects.bulk_create([Restaurant() for i in range(0, 501)])
- @skipUnlessDBFeature("has_bulk_insert")
- def test_large_batch_efficiency(self):
- with override_settings(DEBUG=True):
- connection.queries_log.clear()
- TwoFields.objects.bulk_create(
- [TwoFields(f1=i, f2=i + 1) for i in range(0, 1001)]
- )
- self.assertLess(len(connection.queries), 10)
- def test_large_batch_mixed(self):
- """
- Test inserting a large batch with objects having primary key set
- mixed together with objects without PK set.
- """
- TwoFields.objects.bulk_create(
- [
- TwoFields(id=i if i % 2 == 0 else None, f1=i, f2=i + 1)
- for i in range(100000, 101000)
- ]
- )
- self.assertEqual(TwoFields.objects.count(), 1000)
- # We can't assume much about the ID's created, except that the above
- # created IDs must exist.
- id_range = range(100000, 101000, 2)
- self.assertEqual(TwoFields.objects.filter(id__in=id_range).count(), 500)
- self.assertEqual(TwoFields.objects.exclude(id__in=id_range).count(), 500)
- @skipUnlessDBFeature("has_bulk_insert")
- def test_large_batch_mixed_efficiency(self):
- """
- Test inserting a large batch with objects having primary key set
- mixed together with objects without PK set.
- """
- with override_settings(DEBUG=True):
- connection.queries_log.clear()
- TwoFields.objects.bulk_create(
- [
- TwoFields(id=i if i % 2 == 0 else None, f1=i, f2=i + 1)
- for i in range(100000, 101000)
- ]
- )
- self.assertLess(len(connection.queries), 10)
- def test_explicit_batch_size(self):
- objs = [TwoFields(f1=i, f2=i) for i in range(0, 4)]
- num_objs = len(objs)
- TwoFields.objects.bulk_create(objs, batch_size=1)
- self.assertEqual(TwoFields.objects.count(), num_objs)
- TwoFields.objects.all().delete()
- TwoFields.objects.bulk_create(objs, batch_size=2)
- self.assertEqual(TwoFields.objects.count(), num_objs)
- TwoFields.objects.all().delete()
- TwoFields.objects.bulk_create(objs, batch_size=3)
- self.assertEqual(TwoFields.objects.count(), num_objs)
- TwoFields.objects.all().delete()
- TwoFields.objects.bulk_create(objs, batch_size=num_objs)
- self.assertEqual(TwoFields.objects.count(), num_objs)
- def test_empty_model(self):
- NoFields.objects.bulk_create([NoFields() for i in range(2)])
- self.assertEqual(NoFields.objects.count(), 2)
- @skipUnlessDBFeature("has_bulk_insert")
- def test_explicit_batch_size_efficiency(self):
- objs = [TwoFields(f1=i, f2=i) for i in range(0, 100)]
- with self.assertNumQueries(2):
- TwoFields.objects.bulk_create(objs, 50)
- TwoFields.objects.all().delete()
- with self.assertNumQueries(1):
- TwoFields.objects.bulk_create(objs, len(objs))
- @skipUnlessDBFeature("has_bulk_insert")
- def test_explicit_batch_size_respects_max_batch_size(self):
- objs = [Country(name=f"Country {i}") for i in range(1000)]
- fields = ["name", "iso_two_letter", "description"]
- max_batch_size = max(connection.ops.bulk_batch_size(fields, objs), 1)
- with self.assertNumQueries(ceil(len(objs) / max_batch_size)):
- Country.objects.bulk_create(objs, batch_size=max_batch_size + 1)
- @skipUnlessDBFeature("has_bulk_insert")
- def test_bulk_insert_expressions(self):
- Restaurant.objects.bulk_create(
- [
- Restaurant(name="Sam's Shake Shack"),
- Restaurant(name=Lower(Value("Betty's Beetroot Bar"))),
- ]
- )
- bbb = Restaurant.objects.filter(name="betty's beetroot bar")
- self.assertEqual(bbb.count(), 1)
- @skipUnlessDBFeature("has_bulk_insert")
- def test_bulk_insert_now(self):
- NullableFields.objects.bulk_create(
- [
- NullableFields(datetime_field=Now()),
- NullableFields(datetime_field=Now()),
- ]
- )
- self.assertEqual(
- NullableFields.objects.filter(datetime_field__isnull=False).count(),
- 2,
- )
- @skipUnlessDBFeature("has_bulk_insert")
- def test_bulk_insert_nullable_fields(self):
- fk_to_auto_fields = {
- "auto_field": NoFields.objects.create(),
- "small_auto_field": SmallAutoFieldModel.objects.create(),
- "big_auto_field": BigAutoFieldModel.objects.create(),
- }
- # NULL can be mixed with other values in nullable fields
- nullable_fields = [
- field for field in NullableFields._meta.get_fields() if field.name != "id"
- ]
- NullableFields.objects.bulk_create(
- [
- NullableFields(**{**fk_to_auto_fields, field.name: None})
- for field in nullable_fields
- ]
- )
- self.assertEqual(NullableFields.objects.count(), len(nullable_fields))
- for field in nullable_fields:
- with self.subTest(field=field):
- field_value = "" if isinstance(field, FileField) else None
- self.assertEqual(
- NullableFields.objects.filter(**{field.name: field_value}).count(),
- 1,
- )
- @skipUnlessDBFeature("can_return_rows_from_bulk_insert")
- def test_set_pk_and_insert_single_item(self):
- with self.assertNumQueries(1):
- countries = Country.objects.bulk_create([self.data[0]])
- self.assertEqual(len(countries), 1)
- self.assertEqual(Country.objects.get(pk=countries[0].pk), countries[0])
- @skipUnlessDBFeature("can_return_rows_from_bulk_insert")
- def test_set_pk_and_query_efficiency(self):
- with self.assertNumQueries(1):
- countries = Country.objects.bulk_create(self.data)
- self.assertEqual(len(countries), 4)
- self.assertEqual(Country.objects.get(pk=countries[0].pk), countries[0])
- self.assertEqual(Country.objects.get(pk=countries[1].pk), countries[1])
- self.assertEqual(Country.objects.get(pk=countries[2].pk), countries[2])
- self.assertEqual(Country.objects.get(pk=countries[3].pk), countries[3])
- @skipUnlessDBFeature("can_return_rows_from_bulk_insert")
- def test_set_state(self):
- country_nl = Country(name="Netherlands", iso_two_letter="NL")
- country_be = Country(name="Belgium", iso_two_letter="BE")
- Country.objects.bulk_create([country_nl])
- country_be.save()
- # Objects save via bulk_create() and save() should have equal state.
- self.assertEqual(country_nl._state.adding, country_be._state.adding)
- self.assertEqual(country_nl._state.db, country_be._state.db)
- def test_set_state_with_pk_specified(self):
- state_ca = State(two_letter_code="CA")
- state_ny = State(two_letter_code="NY")
- State.objects.bulk_create([state_ca])
- state_ny.save()
- # Objects save via bulk_create() and save() should have equal state.
- self.assertEqual(state_ca._state.adding, state_ny._state.adding)
- self.assertEqual(state_ca._state.db, state_ny._state.db)
- @skipIfDBFeature("supports_ignore_conflicts")
- def test_ignore_conflicts_value_error(self):
- message = "This database backend does not support ignoring conflicts."
- with self.assertRaisesMessage(NotSupportedError, message):
- TwoFields.objects.bulk_create(self.data, ignore_conflicts=True)
- @skipUnlessDBFeature("supports_ignore_conflicts")
- def test_ignore_conflicts_ignore(self):
- data = [
- TwoFields(f1=1, f2=1),
- TwoFields(f1=2, f2=2),
- TwoFields(f1=3, f2=3),
- ]
- TwoFields.objects.bulk_create(data)
- self.assertEqual(TwoFields.objects.count(), 3)
- # With ignore_conflicts=True, conflicts are ignored.
- conflicting_objects = [
- TwoFields(f1=2, f2=2),
- TwoFields(f1=3, f2=3),
- ]
- TwoFields.objects.bulk_create([conflicting_objects[0]], ignore_conflicts=True)
- TwoFields.objects.bulk_create(conflicting_objects, ignore_conflicts=True)
- self.assertEqual(TwoFields.objects.count(), 3)
- self.assertIsNone(conflicting_objects[0].pk)
- self.assertIsNone(conflicting_objects[1].pk)
- # New objects are created and conflicts are ignored.
- new_object = TwoFields(f1=4, f2=4)
- TwoFields.objects.bulk_create(
- conflicting_objects + [new_object], ignore_conflicts=True
- )
- self.assertEqual(TwoFields.objects.count(), 4)
- self.assertIsNone(new_object.pk)
- # Without ignore_conflicts=True, there's a problem.
- with self.assertRaises(IntegrityError):
- TwoFields.objects.bulk_create(conflicting_objects)
- def test_nullable_fk_after_parent(self):
- parent = NoFields()
- child = NullableFields(auto_field=parent, integer_field=88)
- parent.save()
- NullableFields.objects.bulk_create([child])
- child = NullableFields.objects.get(integer_field=88)
- self.assertEqual(child.auto_field, parent)
- @skipUnlessDBFeature("can_return_rows_from_bulk_insert")
- def test_nullable_fk_after_parent_bulk_create(self):
- parent = NoFields()
- child = NullableFields(auto_field=parent, integer_field=88)
- NoFields.objects.bulk_create([parent])
- NullableFields.objects.bulk_create([child])
- child = NullableFields.objects.get(integer_field=88)
- self.assertEqual(child.auto_field, parent)
- def test_unsaved_parent(self):
- parent = NoFields()
- msg = (
- "bulk_create() prohibited to prevent data loss due to unsaved "
- "related object 'auto_field'."
- )
- with self.assertRaisesMessage(ValueError, msg):
- NullableFields.objects.bulk_create([NullableFields(auto_field=parent)])
- def test_invalid_batch_size_exception(self):
- msg = "Batch size must be a positive integer."
- with self.assertRaisesMessage(ValueError, msg):
- Country.objects.bulk_create([], batch_size=-1)
- @skipIfDBFeature("supports_update_conflicts")
- def test_update_conflicts_unsupported(self):
- msg = "This database backend does not support updating conflicts."
- with self.assertRaisesMessage(NotSupportedError, msg):
- Country.objects.bulk_create(self.data, update_conflicts=True)
- @skipUnlessDBFeature("supports_ignore_conflicts", "supports_update_conflicts")
- def test_ignore_update_conflicts_exclusive(self):
- msg = "ignore_conflicts and update_conflicts are mutually exclusive"
- with self.assertRaisesMessage(ValueError, msg):
- Country.objects.bulk_create(
- self.data,
- ignore_conflicts=True,
- update_conflicts=True,
- )
- @skipUnlessDBFeature("supports_update_conflicts")
- def test_update_conflicts_no_update_fields(self):
- msg = (
- "Fields that will be updated when a row insertion fails on "
- "conflicts must be provided."
- )
- with self.assertRaisesMessage(ValueError, msg):
- Country.objects.bulk_create(self.data, update_conflicts=True)
- @skipUnlessDBFeature("supports_update_conflicts")
- @skipIfDBFeature("supports_update_conflicts_with_target")
- def test_update_conflicts_unique_field_unsupported(self):
- msg = (
- "This database backend does not support updating conflicts with "
- "specifying unique fields that can trigger the upsert."
- )
- with self.assertRaisesMessage(NotSupportedError, msg):
- TwoFields.objects.bulk_create(
- [TwoFields(f1=1, f2=1), TwoFields(f1=2, f2=2)],
- update_conflicts=True,
- update_fields=["f2"],
- unique_fields=["f1"],
- )
- @skipUnlessDBFeature("supports_update_conflicts")
- def test_update_conflicts_nonexistent_update_fields(self):
- unique_fields = None
- if connection.features.supports_update_conflicts_with_target:
- unique_fields = ["f1"]
- msg = "TwoFields has no field named 'nonexistent'"
- with self.assertRaisesMessage(FieldDoesNotExist, msg):
- TwoFields.objects.bulk_create(
- [TwoFields(f1=1, f2=1), TwoFields(f1=2, f2=2)],
- update_conflicts=True,
- update_fields=["nonexistent"],
- unique_fields=unique_fields,
- )
- @skipUnlessDBFeature(
- "supports_update_conflicts",
- "supports_update_conflicts_with_target",
- )
- def test_update_conflicts_unique_fields_required(self):
- msg = "Unique fields that can trigger the upsert must be provided."
- with self.assertRaisesMessage(ValueError, msg):
- TwoFields.objects.bulk_create(
- [TwoFields(f1=1, f2=1), TwoFields(f1=2, f2=2)],
- update_conflicts=True,
- update_fields=["f1"],
- )
- @skipUnlessDBFeature(
- "supports_update_conflicts",
- "supports_update_conflicts_with_target",
- )
- def test_update_conflicts_invalid_update_fields(self):
- msg = "bulk_create() can only be used with concrete fields in update_fields."
- # Reverse one-to-one relationship.
- with self.assertRaisesMessage(ValueError, msg):
- Country.objects.bulk_create(
- self.data,
- update_conflicts=True,
- update_fields=["relatedmodel"],
- unique_fields=["pk"],
- )
- # Many-to-many relationship.
- with self.assertRaisesMessage(ValueError, msg):
- RelatedModel.objects.bulk_create(
- [RelatedModel(country=self.data[0])],
- update_conflicts=True,
- update_fields=["big_auto_fields"],
- unique_fields=["country"],
- )
- @skipUnlessDBFeature(
- "supports_update_conflicts",
- "supports_update_conflicts_with_target",
- )
- def test_update_conflicts_pk_in_update_fields(self):
- msg = "bulk_create() cannot be used with primary keys in update_fields."
- with self.assertRaisesMessage(ValueError, msg):
- BigAutoFieldModel.objects.bulk_create(
- [BigAutoFieldModel()],
- update_conflicts=True,
- update_fields=["id"],
- unique_fields=["id"],
- )
- @skipUnlessDBFeature(
- "supports_update_conflicts",
- "supports_update_conflicts_with_target",
- )
- def test_update_conflicts_invalid_unique_fields(self):
- msg = "bulk_create() can only be used with concrete fields in unique_fields."
- # Reverse one-to-one relationship.
- with self.assertRaisesMessage(ValueError, msg):
- Country.objects.bulk_create(
- self.data,
- update_conflicts=True,
- update_fields=["name"],
- unique_fields=["relatedmodel"],
- )
- # Many-to-many relationship.
- with self.assertRaisesMessage(ValueError, msg):
- RelatedModel.objects.bulk_create(
- [RelatedModel(country=self.data[0])],
- update_conflicts=True,
- update_fields=["name"],
- unique_fields=["big_auto_fields"],
- )
- def _test_update_conflicts_two_fields(self, unique_fields):
- TwoFields.objects.bulk_create(
- [
- TwoFields(f1=1, f2=1, name="a"),
- TwoFields(f1=2, f2=2, name="b"),
- ]
- )
- self.assertEqual(TwoFields.objects.count(), 2)
- conflicting_objects = [
- TwoFields(f1=1, f2=1, name="c"),
- TwoFields(f1=2, f2=2, name="d"),
- ]
- results = TwoFields.objects.bulk_create(
- conflicting_objects,
- update_conflicts=True,
- unique_fields=unique_fields,
- update_fields=["name"],
- )
- self.assertEqual(len(results), len(conflicting_objects))
- if connection.features.can_return_rows_from_bulk_insert:
- for instance in results:
- self.assertIsNotNone(instance.pk)
- self.assertEqual(TwoFields.objects.count(), 2)
- self.assertCountEqual(
- TwoFields.objects.values("f1", "f2", "name"),
- [
- {"f1": 1, "f2": 1, "name": "c"},
- {"f1": 2, "f2": 2, "name": "d"},
- ],
- )
- @skipUnlessDBFeature(
- "supports_update_conflicts", "supports_update_conflicts_with_target"
- )
- def test_update_conflicts_two_fields_unique_fields_first(self):
- self._test_update_conflicts_two_fields(["f1"])
- @skipUnlessDBFeature(
- "supports_update_conflicts", "supports_update_conflicts_with_target"
- )
- def test_update_conflicts_two_fields_unique_fields_second(self):
- self._test_update_conflicts_two_fields(["f2"])
- @skipUnlessDBFeature(
- "supports_update_conflicts", "supports_update_conflicts_with_target"
- )
- def test_update_conflicts_unique_fields_pk(self):
- TwoFields.objects.bulk_create(
- [
- TwoFields(f1=1, f2=1, name="a"),
- TwoFields(f1=2, f2=2, name="b"),
- ]
- )
- obj1 = TwoFields.objects.get(f1=1)
- obj2 = TwoFields.objects.get(f1=2)
- conflicting_objects = [
- TwoFields(pk=obj1.pk, f1=3, f2=3, name="c"),
- TwoFields(pk=obj2.pk, f1=4, f2=4, name="d"),
- ]
- results = TwoFields.objects.bulk_create(
- conflicting_objects,
- update_conflicts=True,
- unique_fields=["pk"],
- update_fields=["name"],
- )
- self.assertEqual(len(results), len(conflicting_objects))
- if connection.features.can_return_rows_from_bulk_insert:
- for instance in results:
- self.assertIsNotNone(instance.pk)
- self.assertEqual(TwoFields.objects.count(), 2)
- self.assertCountEqual(
- TwoFields.objects.values("f1", "f2", "name"),
- [
- {"f1": 1, "f2": 1, "name": "c"},
- {"f1": 2, "f2": 2, "name": "d"},
- ],
- )
- @skipUnlessDBFeature(
- "supports_update_conflicts", "supports_update_conflicts_with_target"
- )
- def test_update_conflicts_two_fields_unique_fields_both(self):
- with self.assertRaises((OperationalError, ProgrammingError)):
- self._test_update_conflicts_two_fields(["f1", "f2"])
- @skipUnlessDBFeature("supports_update_conflicts")
- @skipIfDBFeature("supports_update_conflicts_with_target")
- def test_update_conflicts_two_fields_no_unique_fields(self):
- self._test_update_conflicts_two_fields([])
- def _test_update_conflicts_unique_two_fields(self, unique_fields):
- Country.objects.bulk_create(self.data)
- self.assertEqual(Country.objects.count(), 4)
- new_data = [
- # Conflicting countries.
- Country(
- name="Germany",
- iso_two_letter="DE",
- description=("Germany is a country in Central Europe."),
- ),
- Country(
- name="Czech Republic",
- iso_two_letter="CZ",
- description=(
- "The Czech Republic is a landlocked country in Central Europe."
- ),
- ),
- # New countries.
- Country(name="Australia", iso_two_letter="AU"),
- Country(
- name="Japan",
- iso_two_letter="JP",
- description=("Japan is an island country in East Asia."),
- ),
- ]
- results = Country.objects.bulk_create(
- new_data,
- update_conflicts=True,
- update_fields=["description"],
- unique_fields=unique_fields,
- )
- self.assertEqual(len(results), len(new_data))
- if connection.features.can_return_rows_from_bulk_insert:
- for instance in results:
- self.assertIsNotNone(instance.pk)
- self.assertEqual(Country.objects.count(), 6)
- self.assertCountEqual(
- Country.objects.values("iso_two_letter", "description"),
- [
- {"iso_two_letter": "US", "description": ""},
- {"iso_two_letter": "NL", "description": ""},
- {
- "iso_two_letter": "DE",
- "description": ("Germany is a country in Central Europe."),
- },
- {
- "iso_two_letter": "CZ",
- "description": (
- "The Czech Republic is a landlocked country in Central Europe."
- ),
- },
- {"iso_two_letter": "AU", "description": ""},
- {
- "iso_two_letter": "JP",
- "description": ("Japan is an island country in East Asia."),
- },
- ],
- )
- @skipUnlessDBFeature(
- "supports_update_conflicts", "supports_update_conflicts_with_target"
- )
- def test_update_conflicts_unique_two_fields_unique_fields_both(self):
- self._test_update_conflicts_unique_two_fields(["iso_two_letter", "name"])
- @skipUnlessDBFeature(
- "supports_update_conflicts", "supports_update_conflicts_with_target"
- )
- def test_update_conflicts_unique_two_fields_unique_fields_one(self):
- with self.assertRaises((OperationalError, ProgrammingError)):
- self._test_update_conflicts_unique_two_fields(["iso_two_letter"])
- @skipUnlessDBFeature("supports_update_conflicts")
- @skipIfDBFeature("supports_update_conflicts_with_target")
- def test_update_conflicts_unique_two_fields_unique_no_unique_fields(self):
- self._test_update_conflicts_unique_two_fields([])
- def _test_update_conflicts(self, unique_fields):
- UpsertConflict.objects.bulk_create(
- [
- UpsertConflict(number=1, rank=1, name="John"),
- UpsertConflict(number=2, rank=2, name="Mary"),
- UpsertConflict(number=3, rank=3, name="Hannah"),
- ]
- )
- self.assertEqual(UpsertConflict.objects.count(), 3)
- conflicting_objects = [
- UpsertConflict(number=1, rank=4, name="Steve"),
- UpsertConflict(number=2, rank=2, name="Olivia"),
- UpsertConflict(number=3, rank=1, name="Hannah"),
- ]
- results = UpsertConflict.objects.bulk_create(
- conflicting_objects,
- update_conflicts=True,
- update_fields=["name", "rank"],
- unique_fields=unique_fields,
- )
- self.assertEqual(len(results), len(conflicting_objects))
- if connection.features.can_return_rows_from_bulk_insert:
- for instance in results:
- self.assertIsNotNone(instance.pk)
- self.assertEqual(UpsertConflict.objects.count(), 3)
- self.assertCountEqual(
- UpsertConflict.objects.values("number", "rank", "name"),
- [
- {"number": 1, "rank": 4, "name": "Steve"},
- {"number": 2, "rank": 2, "name": "Olivia"},
- {"number": 3, "rank": 1, "name": "Hannah"},
- ],
- )
- results = UpsertConflict.objects.bulk_create(
- conflicting_objects + [UpsertConflict(number=4, rank=4, name="Mark")],
- update_conflicts=True,
- update_fields=["name", "rank"],
- unique_fields=unique_fields,
- )
- self.assertEqual(len(results), 4)
- if connection.features.can_return_rows_from_bulk_insert:
- for instance in results:
- self.assertIsNotNone(instance.pk)
- self.assertEqual(UpsertConflict.objects.count(), 4)
- self.assertCountEqual(
- UpsertConflict.objects.values("number", "rank", "name"),
- [
- {"number": 1, "rank": 4, "name": "Steve"},
- {"number": 2, "rank": 2, "name": "Olivia"},
- {"number": 3, "rank": 1, "name": "Hannah"},
- {"number": 4, "rank": 4, "name": "Mark"},
- ],
- )
- @skipUnlessDBFeature(
- "supports_update_conflicts", "supports_update_conflicts_with_target"
- )
- def test_update_conflicts_unique_fields(self):
- self._test_update_conflicts(unique_fields=["number"])
- @skipUnlessDBFeature("supports_update_conflicts")
- @skipIfDBFeature("supports_update_conflicts_with_target")
- def test_update_conflicts_no_unique_fields(self):
- self._test_update_conflicts([])
- @skipUnlessDBFeature(
- "supports_update_conflicts", "supports_update_conflicts_with_target"
- )
- def test_update_conflicts_unique_fields_update_fields_db_column(self):
- FieldsWithDbColumns.objects.bulk_create(
- [
- FieldsWithDbColumns(rank=1, name="a"),
- FieldsWithDbColumns(rank=2, name="b"),
- ]
- )
- self.assertEqual(FieldsWithDbColumns.objects.count(), 2)
- conflicting_objects = [
- FieldsWithDbColumns(rank=1, name="c"),
- FieldsWithDbColumns(rank=2, name="d"),
- ]
- results = FieldsWithDbColumns.objects.bulk_create(
- conflicting_objects,
- update_conflicts=True,
- unique_fields=["rank"],
- update_fields=["name"],
- )
- self.assertEqual(len(results), len(conflicting_objects))
- if connection.features.can_return_rows_from_bulk_insert:
- for instance in results:
- self.assertIsNotNone(instance.pk)
- self.assertEqual(FieldsWithDbColumns.objects.count(), 2)
- self.assertCountEqual(
- FieldsWithDbColumns.objects.values("rank", "name"),
- [
- {"rank": 1, "name": "c"},
- {"rank": 2, "name": "d"},
- ],
- )
- def test_db_default_field_excluded(self):
- # created_at is excluded when no db_default override is provided.
- with self.assertNumQueries(1) as ctx:
- DbDefaultModel.objects.bulk_create(
- [DbDefaultModel(name="foo"), DbDefaultModel(name="bar")]
- )
- created_at_quoted_name = connection.ops.quote_name("created_at")
- self.assertEqual(
- ctx[0]["sql"].count(created_at_quoted_name),
- 1 if connection.features.can_return_rows_from_bulk_insert else 0,
- )
- # created_at is included when a db_default override is provided.
- with self.assertNumQueries(1) as ctx:
- DbDefaultModel.objects.bulk_create(
- [
- DbDefaultModel(name="foo", created_at=timezone.now()),
- DbDefaultModel(name="bar"),
- ]
- )
- self.assertEqual(
- ctx[0]["sql"].count(created_at_quoted_name),
- 2 if connection.features.can_return_rows_from_bulk_insert else 1,
- )
|