123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251 |
- import unittest
- from django.core.exceptions import FieldError
- from django.db import IntegrityError, connection, transaction
- from django.db.models import CharField, Count, F, IntegerField, Max
- from django.db.models.functions import Abs, Concat, Lower
- from django.test import TestCase
- from django.test.utils import register_lookup
- from .models import A, B, Bar, D, DataPoint, Foo, RelatedPoint, UniqueNumber
- class SimpleTest(TestCase):
- @classmethod
- def setUpTestData(cls):
- cls.a1 = A.objects.create()
- cls.a2 = A.objects.create()
- for x in range(20):
- B.objects.create(a=cls.a1)
- D.objects.create(a=cls.a1)
- def test_nonempty_update(self):
- """
- Update changes the right number of rows for a nonempty queryset
- """
- num_updated = self.a1.b_set.update(y=100)
- self.assertEqual(num_updated, 20)
- cnt = B.objects.filter(y=100).count()
- self.assertEqual(cnt, 20)
- def test_empty_update(self):
- """
- Update changes the right number of rows for an empty queryset
- """
- num_updated = self.a2.b_set.update(y=100)
- self.assertEqual(num_updated, 0)
- cnt = B.objects.filter(y=100).count()
- self.assertEqual(cnt, 0)
- def test_nonempty_update_with_inheritance(self):
- """
- Update changes the right number of rows for an empty queryset
- when the update affects only a base table
- """
- num_updated = self.a1.d_set.update(y=100)
- self.assertEqual(num_updated, 20)
- cnt = D.objects.filter(y=100).count()
- self.assertEqual(cnt, 20)
- def test_empty_update_with_inheritance(self):
- """
- Update changes the right number of rows for an empty queryset
- when the update affects only a base table
- """
- num_updated = self.a2.d_set.update(y=100)
- self.assertEqual(num_updated, 0)
- cnt = D.objects.filter(y=100).count()
- self.assertEqual(cnt, 0)
- def test_foreign_key_update_with_id(self):
- """
- Update works using <field>_id for foreign keys
- """
- num_updated = self.a1.d_set.update(a_id=self.a2)
- self.assertEqual(num_updated, 20)
- self.assertEqual(self.a2.d_set.count(), 20)
- class AdvancedTests(TestCase):
- @classmethod
- def setUpTestData(cls):
- cls.d0 = DataPoint.objects.create(name="d0", value="apple")
- cls.d2 = DataPoint.objects.create(name="d2", value="banana")
- cls.d3 = DataPoint.objects.create(name="d3", value="banana")
- cls.r1 = RelatedPoint.objects.create(name="r1", data=cls.d3)
- def test_update(self):
- """
- Objects are updated by first filtering the candidates into a queryset
- and then calling the update() method. It executes immediately and
- returns nothing.
- """
- resp = DataPoint.objects.filter(value="apple").update(name="d1")
- self.assertEqual(resp, 1)
- resp = DataPoint.objects.filter(value="apple")
- self.assertEqual(list(resp), [self.d0])
- def test_update_multiple_objects(self):
- """
- We can update multiple objects at once.
- """
- resp = DataPoint.objects.filter(value='banana').update(value='pineapple')
- self.assertEqual(resp, 2)
- self.assertEqual(DataPoint.objects.get(name="d2").value, 'pineapple')
- def test_update_fk(self):
- """
- Foreign key fields can also be updated, although you can only update
- the object referred to, not anything inside the related object.
- """
- resp = RelatedPoint.objects.filter(name="r1").update(data=self.d0)
- self.assertEqual(resp, 1)
- resp = RelatedPoint.objects.filter(data__name="d0")
- self.assertEqual(list(resp), [self.r1])
- def test_update_multiple_fields(self):
- """
- Multiple fields can be updated at once
- """
- resp = DataPoint.objects.filter(value="apple").update(
- value="fruit", another_value="peach")
- self.assertEqual(resp, 1)
- d = DataPoint.objects.get(name="d0")
- self.assertEqual(d.value, 'fruit')
- self.assertEqual(d.another_value, 'peach')
- def test_update_all(self):
- """
- In the rare case you want to update every instance of a model, update()
- is also a manager method.
- """
- self.assertEqual(DataPoint.objects.update(value='thing'), 3)
- resp = DataPoint.objects.values('value').distinct()
- self.assertEqual(list(resp), [{'value': 'thing'}])
- def test_update_slice_fail(self):
- """
- We do not support update on already sliced query sets.
- """
- method = DataPoint.objects.all()[:2].update
- msg = 'Cannot update a query once a slice has been taken.'
- with self.assertRaisesMessage(AssertionError, msg):
- method(another_value='another thing')
- def test_update_respects_to_field(self):
- """
- Update of an FK field which specifies a to_field works.
- """
- a_foo = Foo.objects.create(target='aaa')
- b_foo = Foo.objects.create(target='bbb')
- bar = Bar.objects.create(foo=a_foo)
- self.assertEqual(bar.foo_id, a_foo.target)
- bar_qs = Bar.objects.filter(pk=bar.pk)
- self.assertEqual(bar_qs[0].foo_id, a_foo.target)
- bar_qs.update(foo=b_foo)
- self.assertEqual(bar_qs[0].foo_id, b_foo.target)
- def test_update_m2m_field(self):
- msg = (
- 'Cannot update model field '
- '<django.db.models.fields.related.ManyToManyField: m2m_foo> '
- '(only non-relations and foreign keys permitted).'
- )
- with self.assertRaisesMessage(FieldError, msg):
- Bar.objects.update(m2m_foo='whatever')
- def test_update_transformed_field(self):
- A.objects.create(x=5)
- A.objects.create(x=-6)
- with register_lookup(IntegerField, Abs):
- A.objects.update(x=F('x__abs'))
- self.assertCountEqual(A.objects.values_list('x', flat=True), [5, 6])
- def test_update_annotated_queryset(self):
- """
- Update of a queryset that's been annotated.
- """
-
- qs = DataPoint.objects.annotate(alias=F('value'))
- self.assertEqual(qs.update(another_value='foo'), 3)
-
- qs = DataPoint.objects.annotate(alias=F('value')).filter(alias='apple')
- self.assertEqual(qs.update(another_value='foo'), 1)
-
- qs = DataPoint.objects.annotate(alias=F('value'))
- self.assertEqual(qs.update(another_value=F('alias')), 3)
-
- qs = DataPoint.objects.annotate(max=Max('value'))
- msg = (
- 'Aggregate functions are not allowed in this query '
- '(another_value=Max(Col(update_datapoint, update.DataPoint.value))).'
- )
- with self.assertRaisesMessage(FieldError, msg):
- qs.update(another_value=F('max'))
- def test_update_annotated_multi_table_queryset(self):
- """
- Update of a queryset that's been annotated and involves multiple tables.
- """
-
- qs = DataPoint.objects.annotate(related_count=Count('relatedpoint'))
- self.assertEqual(qs.update(value='Foo'), 3)
-
- qs = DataPoint.objects.annotate(related_count=Count('relatedpoint'))
- self.assertEqual(qs.filter(related_count=1).update(value='Foo'), 1)
-
- qs = RelatedPoint.objects.annotate(max=Max('data__value'))
- msg = 'Joined field references are not permitted in this query'
- with self.assertRaisesMessage(FieldError, msg):
- qs.update(name=F('max'))
- def test_update_with_joined_field_annotation(self):
- msg = 'Joined field references are not permitted in this query'
- with register_lookup(CharField, Lower):
- for annotation in (
- F('data__name'),
- F('data__name__lower'),
- Lower('data__name'),
- Concat('data__name', 'data__value'),
- ):
- with self.subTest(annotation=annotation):
- with self.assertRaisesMessage(FieldError, msg):
- RelatedPoint.objects.annotate(
- new_name=annotation,
- ).update(name=F('new_name'))
- @unittest.skipUnless(
- connection.vendor == 'mysql',
- 'UPDATE...ORDER BY syntax is supported on MySQL/MariaDB',
- )
- class MySQLUpdateOrderByTest(TestCase):
- """Update field with a unique constraint using an ordered queryset."""
- @classmethod
- def setUpTestData(cls):
- UniqueNumber.objects.create(number=1)
- UniqueNumber.objects.create(number=2)
- def test_order_by_update_on_unique_constraint(self):
- tests = [
- ('-number', 'id'),
- (F('number').desc(), 'id'),
- (F('number') * -1, 'id'),
- ]
- for ordering in tests:
- with self.subTest(ordering=ordering), transaction.atomic():
- updated = UniqueNumber.objects.order_by(*ordering).update(
- number=F('number') + 1,
- )
- self.assertEqual(updated, 2)
- def test_order_by_update_on_unique_constraint_annotation(self):
-
-
- with self.assertRaises(IntegrityError):
- UniqueNumber.objects.annotate(
- number_inverse=F('number').desc(),
- ).order_by('number_inverse').update(
- number=F('number') + 1,
- )
|