123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350 |
- from __future__ import unicode_literals
- import datetime
- from django.conf import settings
- from django.db import DEFAULT_DB_ALIAS, models, transaction
- from django.db.utils import ConnectionHandler
- from django.test import TestCase, TransactionTestCase, skipUnlessDBFeature
- from .models import (
- Award, AwardNote, Book, Child, Eaten, Email, File, Food, FooFile,
- FooFileProxy, FooImage, FooPhoto, House, Image, Item, Location, Login,
- OrderedPerson, OrgUnit, Person, Photo, PlayedWith, PlayedWithNote, Policy,
- Researcher, Toy, Version,
- )
- # Can't run this test under SQLite, because you can't
- # get two connections to an in-memory database.
- @skipUnlessDBFeature('test_db_allows_multiple_connections')
- class DeleteLockingTest(TransactionTestCase):
- available_apps = ['delete_regress']
- def setUp(self):
- # Create a second connection to the default database
- new_connections = ConnectionHandler(settings.DATABASES)
- self.conn2 = new_connections[DEFAULT_DB_ALIAS]
- self.conn2.set_autocommit(False)
- def tearDown(self):
- # Close down the second connection.
- self.conn2.rollback()
- self.conn2.close()
- def test_concurrent_delete(self):
- """Concurrent deletes don't collide and lock the database (#9479)."""
- with transaction.atomic():
- Book.objects.create(id=1, pagecount=100)
- Book.objects.create(id=2, pagecount=200)
- Book.objects.create(id=3, pagecount=300)
- with transaction.atomic():
- # Start a transaction on the main connection.
- self.assertEqual(3, Book.objects.count())
- # Delete something using another database connection.
- with self.conn2.cursor() as cursor2:
- cursor2.execute("DELETE from delete_regress_book WHERE id = 1")
- self.conn2.commit()
- # In the same transaction on the main connection, perform a
- # queryset delete that covers the object deleted with the other
- # connection. This causes an infinite loop under MySQL InnoDB
- # unless we keep track of already deleted objects.
- Book.objects.filter(pagecount__lt=250).delete()
- self.assertEqual(1, Book.objects.count())
- class DeleteCascadeTests(TestCase):
- def test_generic_relation_cascade(self):
- """
- Django cascades deletes through generic-related objects to their
- reverse relations.
- """
- person = Person.objects.create(name='Nelson Mandela')
- award = Award.objects.create(name='Nobel', content_object=person)
- AwardNote.objects.create(note='a peace prize',
- award=award)
- self.assertEqual(AwardNote.objects.count(), 1)
- person.delete()
- self.assertEqual(Award.objects.count(), 0)
- # first two asserts are just sanity checks, this is the kicker:
- self.assertEqual(AwardNote.objects.count(), 0)
- def test_fk_to_m2m_through(self):
- """
- If an M2M relationship has an explicitly-specified through model, and
- some other model has an FK to that through model, deletion is cascaded
- from one of the participants in the M2M, to the through model, to its
- related model.
- """
- juan = Child.objects.create(name='Juan')
- paints = Toy.objects.create(name='Paints')
- played = PlayedWith.objects.create(child=juan, toy=paints,
- date=datetime.date.today())
- PlayedWithNote.objects.create(played=played,
- note='the next Jackson Pollock')
- self.assertEqual(PlayedWithNote.objects.count(), 1)
- paints.delete()
- self.assertEqual(PlayedWith.objects.count(), 0)
- # first two asserts just sanity checks, this is the kicker:
- self.assertEqual(PlayedWithNote.objects.count(), 0)
- def test_15776(self):
- policy = Policy.objects.create(pk=1, policy_number="1234")
- version = Version.objects.create(policy=policy)
- location = Location.objects.create(version=version)
- Item.objects.create(version=version, location=location)
- policy.delete()
- class DeleteCascadeTransactionTests(TransactionTestCase):
- available_apps = ['delete_regress']
- def test_inheritance(self):
- """
- Auto-created many-to-many through tables referencing a parent model are
- correctly found by the delete cascade when a child of that parent is
- deleted.
- Refs #14896.
- """
- r = Researcher.objects.create()
- email = Email.objects.create(
- label="office-email", email_address="carl@science.edu"
- )
- r.contacts.add(email)
- email.delete()
- def test_to_field(self):
- """
- Cascade deletion works with ForeignKey.to_field set to non-PK.
- """
- apple = Food.objects.create(name="apple")
- Eaten.objects.create(food=apple, meal="lunch")
- apple.delete()
- self.assertFalse(Food.objects.exists())
- self.assertFalse(Eaten.objects.exists())
- class LargeDeleteTests(TestCase):
- def test_large_deletes(self):
- "Regression for #13309 -- if the number of objects > chunk size, deletion still occurs"
- for x in range(300):
- Book.objects.create(pagecount=x + 100)
- # attach a signal to make sure we will not fast-delete
- def noop(*args, **kwargs):
- pass
- models.signals.post_delete.connect(noop, sender=Book)
- Book.objects.all().delete()
- models.signals.post_delete.disconnect(noop, sender=Book)
- self.assertEqual(Book.objects.count(), 0)
- class ProxyDeleteTest(TestCase):
- """
- Tests on_delete behavior for proxy models.
- See #16128.
- """
- def create_image(self):
- """Return an Image referenced by both a FooImage and a FooFile."""
- # Create an Image
- test_image = Image()
- test_image.save()
- foo_image = FooImage(my_image=test_image)
- foo_image.save()
- # Get the Image instance as a File
- test_file = File.objects.get(pk=test_image.pk)
- foo_file = FooFile(my_file=test_file)
- foo_file.save()
- return test_image
- def test_delete_proxy(self):
- """
- Deleting the *proxy* instance bubbles through to its non-proxy and
- *all* referring objects are deleted.
- """
- self.create_image()
- Image.objects.all().delete()
- # An Image deletion == File deletion
- self.assertEqual(len(Image.objects.all()), 0)
- self.assertEqual(len(File.objects.all()), 0)
- # The Image deletion cascaded and *all* references to it are deleted.
- self.assertEqual(len(FooImage.objects.all()), 0)
- self.assertEqual(len(FooFile.objects.all()), 0)
- def test_delete_proxy_of_proxy(self):
- """
- Deleting a proxy-of-proxy instance should bubble through to its proxy
- and non-proxy parents, deleting *all* referring objects.
- """
- test_image = self.create_image()
- # Get the Image as a Photo
- test_photo = Photo.objects.get(pk=test_image.pk)
- foo_photo = FooPhoto(my_photo=test_photo)
- foo_photo.save()
- Photo.objects.all().delete()
- # A Photo deletion == Image deletion == File deletion
- self.assertEqual(len(Photo.objects.all()), 0)
- self.assertEqual(len(Image.objects.all()), 0)
- self.assertEqual(len(File.objects.all()), 0)
- # The Photo deletion should have cascaded and deleted *all*
- # references to it.
- self.assertEqual(len(FooPhoto.objects.all()), 0)
- self.assertEqual(len(FooFile.objects.all()), 0)
- self.assertEqual(len(FooImage.objects.all()), 0)
- def test_delete_concrete_parent(self):
- """
- Deleting an instance of a concrete model should also delete objects
- referencing its proxy subclass.
- """
- self.create_image()
- File.objects.all().delete()
- # A File deletion == Image deletion
- self.assertEqual(len(File.objects.all()), 0)
- self.assertEqual(len(Image.objects.all()), 0)
- # The File deletion should have cascaded and deleted *all* references
- # to it.
- self.assertEqual(len(FooFile.objects.all()), 0)
- self.assertEqual(len(FooImage.objects.all()), 0)
- def test_delete_proxy_pair(self):
- """
- If a pair of proxy models are linked by an FK from one concrete parent
- to the other, deleting one proxy model cascade-deletes the other, and
- the deletion happens in the right order (not triggering an
- IntegrityError on databases unable to defer integrity checks).
- Refs #17918.
- """
- # Create an Image (proxy of File) and FooFileProxy (proxy of FooFile,
- # which has an FK to File)
- image = Image.objects.create()
- as_file = File.objects.get(pk=image.pk)
- FooFileProxy.objects.create(my_file=as_file)
- Image.objects.all().delete()
- self.assertEqual(len(FooFileProxy.objects.all()), 0)
- def test_19187_values(self):
- with self.assertRaises(TypeError):
- Image.objects.values().delete()
- with self.assertRaises(TypeError):
- Image.objects.values_list().delete()
- class Ticket19102Tests(TestCase):
- """
- Test different queries which alter the SELECT clause of the query. We
- also must be using a subquery for the deletion (that is, the original
- query has a join in it). The deletion should be done as "fast-path"
- deletion (that is, just one query for the .delete() call).
- Note that .values() is not tested here on purpose. .values().delete()
- doesn't work for non fast-path deletes at all.
- """
- def setUp(self):
- self.o1 = OrgUnit.objects.create(name='o1')
- self.o2 = OrgUnit.objects.create(name='o2')
- self.l1 = Login.objects.create(description='l1', orgunit=self.o1)
- self.l2 = Login.objects.create(description='l2', orgunit=self.o2)
- @skipUnlessDBFeature("update_can_self_select")
- def test_ticket_19102_annotate(self):
- with self.assertNumQueries(1):
- Login.objects.order_by('description').filter(
- orgunit__name__isnull=False
- ).annotate(
- n=models.Count('description')
- ).filter(
- n=1, pk=self.l1.pk
- ).delete()
- self.assertFalse(Login.objects.filter(pk=self.l1.pk).exists())
- self.assertTrue(Login.objects.filter(pk=self.l2.pk).exists())
- @skipUnlessDBFeature("update_can_self_select")
- def test_ticket_19102_extra(self):
- with self.assertNumQueries(1):
- Login.objects.order_by('description').filter(
- orgunit__name__isnull=False
- ).extra(
- select={'extraf': '1'}
- ).filter(
- pk=self.l1.pk
- ).delete()
- self.assertFalse(Login.objects.filter(pk=self.l1.pk).exists())
- self.assertTrue(Login.objects.filter(pk=self.l2.pk).exists())
- @skipUnlessDBFeature("update_can_self_select")
- @skipUnlessDBFeature('can_distinct_on_fields')
- def test_ticket_19102_distinct_on(self):
- # Both Login objs should have same description so that only the one
- # having smaller PK will be deleted.
- Login.objects.update(description='description')
- with self.assertNumQueries(1):
- Login.objects.distinct('description').order_by('pk').filter(
- orgunit__name__isnull=False
- ).delete()
- # Assumed that l1 which is created first has smaller PK.
- self.assertFalse(Login.objects.filter(pk=self.l1.pk).exists())
- self.assertTrue(Login.objects.filter(pk=self.l2.pk).exists())
- @skipUnlessDBFeature("update_can_self_select")
- def test_ticket_19102_select_related(self):
- with self.assertNumQueries(1):
- Login.objects.filter(
- pk=self.l1.pk
- ).filter(
- orgunit__name__isnull=False
- ).order_by(
- 'description'
- ).select_related('orgunit').delete()
- self.assertFalse(Login.objects.filter(pk=self.l1.pk).exists())
- self.assertTrue(Login.objects.filter(pk=self.l2.pk).exists())
- @skipUnlessDBFeature("update_can_self_select")
- def test_ticket_19102_defer(self):
- with self.assertNumQueries(1):
- Login.objects.filter(
- pk=self.l1.pk
- ).filter(
- orgunit__name__isnull=False
- ).order_by(
- 'description'
- ).only('id').delete()
- self.assertFalse(Login.objects.filter(pk=self.l1.pk).exists())
- self.assertTrue(Login.objects.filter(pk=self.l2.pk).exists())
- class OrderedDeleteTests(TestCase):
- def test_meta_ordered_delete(self):
- # When a subquery is performed by deletion code, the subquery must be
- # cleared of all ordering. There was a but that caused _meta ordering
- # to be used. Refs #19720.
- h = House.objects.create(address='Foo')
- OrderedPerson.objects.create(name='Jack', lives_in=h)
- OrderedPerson.objects.create(name='Bob', lives_in=h)
- OrderedPerson.objects.filter(lives_in__address='Foo').delete()
- self.assertEqual(OrderedPerson.objects.count(), 0)
|