diff options
Diffstat (limited to 'tests/delete/tests.py')
| -rw-r--r-- | tests/delete/tests.py | 178 |
1 files changed, 105 insertions, 73 deletions
diff --git a/tests/delete/tests.py b/tests/delete/tests.py index e5dde8047a..597589b836 100644 --- a/tests/delete/tests.py +++ b/tests/delete/tests.py @@ -7,10 +7,38 @@ from django.db.models.sql.constants import GET_ITERATOR_CHUNK_SIZE from django.test import TestCase, skipIfDBFeature, skipUnlessDBFeature from .models import ( - B1, B2, B3, MR, A, Avatar, B, Base, Child, DeleteBottom, DeleteTop, - GenericB1, GenericB2, GenericDeleteBottom, HiddenUser, HiddenUserProfile, - M, M2MFrom, M2MTo, MRNull, Origin, P, Parent, R, RChild, RChildChild, - Referrer, S, T, User, create_a, get_default_r, + B1, + B2, + B3, + MR, + A, + Avatar, + B, + Base, + Child, + DeleteBottom, + DeleteTop, + GenericB1, + GenericB2, + GenericDeleteBottom, + HiddenUser, + HiddenUserProfile, + M, + M2MFrom, + M2MTo, + MRNull, + Origin, + P, + Parent, + R, + RChild, + RChildChild, + Referrer, + S, + T, + User, + create_a, + get_default_r, ) @@ -19,58 +47,58 @@ class OnDeleteTests(TestCase): self.DEFAULT = get_default_r() def test_auto(self): - a = create_a('auto') + a = create_a("auto") a.auto.delete() - self.assertFalse(A.objects.filter(name='auto').exists()) + self.assertFalse(A.objects.filter(name="auto").exists()) def test_non_callable(self): - msg = 'on_delete must be callable.' + msg = "on_delete must be callable." with self.assertRaisesMessage(TypeError, msg): - models.ForeignKey('self', on_delete=None) + models.ForeignKey("self", on_delete=None) with self.assertRaisesMessage(TypeError, msg): - models.OneToOneField('self', on_delete=None) + models.OneToOneField("self", on_delete=None) def test_auto_nullable(self): - a = create_a('auto_nullable') + a = create_a("auto_nullable") a.auto_nullable.delete() - self.assertFalse(A.objects.filter(name='auto_nullable').exists()) + self.assertFalse(A.objects.filter(name="auto_nullable").exists()) def test_setvalue(self): - a = create_a('setvalue') + a = create_a("setvalue") a.setvalue.delete() a = A.objects.get(pk=a.pk) self.assertEqual(self.DEFAULT, a.setvalue.pk) def test_setnull(self): - a = create_a('setnull') + a = create_a("setnull") a.setnull.delete() a = A.objects.get(pk=a.pk) self.assertIsNone(a.setnull) def test_setdefault(self): - a = create_a('setdefault') + a = create_a("setdefault") a.setdefault.delete() a = A.objects.get(pk=a.pk) self.assertEqual(self.DEFAULT, a.setdefault.pk) def test_setdefault_none(self): - a = create_a('setdefault_none') + a = create_a("setdefault_none") a.setdefault_none.delete() a = A.objects.get(pk=a.pk) self.assertIsNone(a.setdefault_none) def test_cascade(self): - a = create_a('cascade') + a = create_a("cascade") a.cascade.delete() - self.assertFalse(A.objects.filter(name='cascade').exists()) + self.assertFalse(A.objects.filter(name="cascade").exists()) def test_cascade_nullable(self): - a = create_a('cascade_nullable') + a = create_a("cascade_nullable") a.cascade_nullable.delete() - self.assertFalse(A.objects.filter(name='cascade_nullable').exists()) + self.assertFalse(A.objects.filter(name="cascade_nullable").exists()) def test_protect(self): - a = create_a('protect') + a = create_a("protect") msg = ( "Cannot delete some instances of model 'R' because they are " "referenced through protected foreign keys: 'A.protect'." @@ -80,7 +108,7 @@ class OnDeleteTests(TestCase): self.assertEqual(cm.exception.protected_objects, {a}) def test_protect_multiple(self): - a = create_a('protect') + a = create_a("protect") b = B.objects.create(protect=a.protect) msg = ( "Cannot delete some instances of model 'R' because they are " @@ -92,7 +120,7 @@ class OnDeleteTests(TestCase): self.assertEqual(cm.exception.protected_objects, {a, b}) def test_protect_path(self): - a = create_a('protect') + a = create_a("protect") a.protect.p = P.objects.create() a.protect.save() msg = ( @@ -109,10 +137,11 @@ class OnDeleteTests(TestCase): replacement_r = R.objects.create() def check_do_nothing(sender, **kwargs): - obj = kwargs['instance'] + obj = kwargs["instance"] obj.donothing_set.update(donothing=replacement_r) + models.signals.pre_delete.connect(check_do_nothing) - a = create_a('do_nothing') + a = create_a("do_nothing") a.donothing.delete() a = A.objects.get(pk=a.pk) self.assertEqual(replacement_r, a.donothing) @@ -140,19 +169,19 @@ class OnDeleteTests(TestCase): self.assertFalse(RChild.objects.filter(pk=child.pk).exists()) def test_cascade_from_child(self): - a = create_a('child') + a = create_a("child") a.child.delete() - self.assertFalse(A.objects.filter(name='child').exists()) + self.assertFalse(A.objects.filter(name="child").exists()) self.assertFalse(R.objects.filter(pk=a.child_id).exists()) def test_cascade_from_parent(self): - a = create_a('child') + a = create_a("child") R.objects.get(pk=a.child_id).delete() - self.assertFalse(A.objects.filter(name='child').exists()) + self.assertFalse(A.objects.filter(name="child").exists()) self.assertFalse(RChild.objects.filter(pk=a.child_id).exists()) def test_setnull_from_child(self): - a = create_a('child_setnull') + a = create_a("child_setnull") a.child_setnull.delete() self.assertFalse(R.objects.filter(pk=a.child_setnull_id).exists()) @@ -160,7 +189,7 @@ class OnDeleteTests(TestCase): self.assertIsNone(a.child_setnull) def test_setnull_from_parent(self): - a = create_a('child_setnull') + a = create_a("child_setnull") R.objects.get(pk=a.child_setnull_id).delete() self.assertFalse(RChild.objects.filter(pk=a.child_setnull_id).exists()) @@ -168,13 +197,13 @@ class OnDeleteTests(TestCase): self.assertIsNone(a.child_setnull) def test_o2o_setnull(self): - a = create_a('o2o_setnull') + a = create_a("o2o_setnull") a.o2o_setnull.delete() a = A.objects.get(pk=a.pk) self.assertIsNone(a.o2o_setnull) def test_restrict(self): - a = create_a('restrict') + a = create_a("restrict") msg = ( "Cannot delete some instances of model 'R' because they are " "referenced through restricted foreign keys: 'A.restrict'." @@ -184,7 +213,7 @@ class OnDeleteTests(TestCase): self.assertEqual(cm.exception.restricted_objects, {a}) def test_restrict_multiple(self): - a = create_a('restrict') + a = create_a("restrict") b3 = B3.objects.create(restrict=a.restrict) msg = ( "Cannot delete some instances of model 'R' because they are " @@ -196,7 +225,7 @@ class OnDeleteTests(TestCase): self.assertEqual(cm.exception.restricted_objects, {a, b3}) def test_restrict_path_cascade_indirect(self): - a = create_a('restrict') + a = create_a("restrict") a.restrict.p = P.objects.create() a.restrict.save() msg = ( @@ -210,17 +239,17 @@ class OnDeleteTests(TestCase): a.cascade.p = a.restrict.p a.cascade.save() a.restrict.p.delete() - self.assertFalse(A.objects.filter(name='restrict').exists()) + self.assertFalse(A.objects.filter(name="restrict").exists()) self.assertFalse(R.objects.filter(pk=a.restrict_id).exists()) def test_restrict_path_cascade_direct(self): - a = create_a('restrict') + a = create_a("restrict") a.restrict.p = P.objects.create() a.restrict.save() a.cascade_p = a.restrict.p a.save() a.restrict.p.delete() - self.assertFalse(A.objects.filter(name='restrict').exists()) + self.assertFalse(A.objects.filter(name="restrict").exists()) self.assertFalse(R.objects.filter(pk=a.restrict_id).exists()) def test_restrict_path_cascade_indirect_diamond(self): @@ -302,7 +331,7 @@ class DeletionTests(TestCase): r = R.objects.create() m.m2m.add(r) r.delete() - through = M._meta.get_field('m2m').remote_field.through + through = M._meta.get_field("m2m").remote_field.through self.assertFalse(through.objects.exists()) r = R.objects.create() @@ -333,16 +362,16 @@ class DeletionTests(TestCase): related_setnull_sets = [] def pre_delete(sender, **kwargs): - obj = kwargs['instance'] + obj = kwargs["instance"] deleted.append(obj) if isinstance(obj, R): related_setnull_sets.append([a.pk for a in obj.setnull_set.all()]) models.signals.pre_delete.connect(pre_delete) - a = create_a('update_setnull') + a = create_a("update_setnull") a.setnull.delete() - a = create_a('update_cascade') + a = create_a("update_cascade") a.cascade.delete() for obj in deleted: @@ -359,10 +388,10 @@ class DeletionTests(TestCase): post_delete_order = [] def log_post_delete(sender, **kwargs): - pre_delete_order.append((sender, kwargs['instance'].pk)) + pre_delete_order.append((sender, kwargs["instance"].pk)) def log_pre_delete(sender, **kwargs): - post_delete_order.append((sender, kwargs['instance'].pk)) + post_delete_order.append((sender, kwargs["instance"].pk)) models.signals.post_delete.connect(log_post_delete) models.signals.pre_delete.connect(log_pre_delete) @@ -407,9 +436,7 @@ class DeletionTests(TestCase): @skipUnlessDBFeature("can_defer_constraint_checks") def test_can_defer_constraint_checks(self): - u = User.objects.create( - avatar=Avatar.objects.create() - ) + u = User.objects.create(avatar=Avatar.objects.create()) a = Avatar.objects.get(pk=u.avatar_id) # 1 query to find the users for the avatar. # 1 query to delete the user @@ -421,7 +448,8 @@ class DeletionTests(TestCase): calls = [] def noop(*args, **kwargs): - calls.append('') + calls.append("") + models.signals.post_delete.connect(noop, sender=User) self.assertNumQueries(3, a.delete) @@ -432,14 +460,13 @@ class DeletionTests(TestCase): @skipIfDBFeature("can_defer_constraint_checks") def test_cannot_defer_constraint_checks(self): - u = User.objects.create( - avatar=Avatar.objects.create() - ) + u = User.objects.create(avatar=Avatar.objects.create()) # Attach a signal to make sure we will not do fast_deletes. calls = [] def noop(*args, **kwargs): - calls.append('') + calls.append("") + models.signals.post_delete.connect(noop, sender=User) a = Avatar.objects.get(pk=u.avatar_id) @@ -469,7 +496,7 @@ class DeletionTests(TestCase): objs = [Avatar() for i in range(0, TEST_SIZE)] Avatar.objects.bulk_create(objs) # Calculate the number of queries needed. - batch_size = connection.ops.bulk_batch_size(['pk'], objs) + batch_size = connection.ops.bulk_batch_size(["pk"], objs) # The related fetches are done in batches. batches = ceil(len(objs) / batch_size) # One query for Avatar.objects.all() and then one related fast delete for @@ -486,7 +513,7 @@ class DeletionTests(TestCase): for i in range(TEST_SIZE): T.objects.create(s=s) - batch_size = max(connection.ops.bulk_batch_size(['pk'], range(TEST_SIZE)), 1) + batch_size = max(connection.ops.bulk_batch_size(["pk"], range(TEST_SIZE)), 1) # TEST_SIZE / batch_size (select related `T` instances) # + 1 (select related `U` instances) @@ -530,7 +557,9 @@ class DeletionTests(TestCase): QuerySet.delete() should return the number of deleted rows and a dictionary with the number of deletions for each object type. """ - Avatar.objects.bulk_create([Avatar(desc='a'), Avatar(desc='b'), Avatar(desc='c')]) + Avatar.objects.bulk_create( + [Avatar(desc="a"), Avatar(desc="b"), Avatar(desc="c")] + ) avatars_count = Avatar.objects.count() deleted, rows_count = Avatar.objects.all().delete() self.assertEqual(deleted, avatars_count) @@ -600,18 +629,21 @@ class DeletionTests(TestCase): expected_sql = str( Referrer.objects.only( # Both fields are referenced by SecondReferrer. - 'id', 'unique_field', - ).filter(origin__in=[origin]).query + "id", + "unique_field", + ) + .filter(origin__in=[origin]) + .query ) with self.assertNumQueries(2) as ctx: origin.delete() - self.assertEqual(ctx.captured_queries[0]['sql'], expected_sql) + self.assertEqual(ctx.captured_queries[0]["sql"], expected_sql) def receiver(instance, **kwargs): pass # All fields are selected if deletion signals are connected. - for signal_name in ('pre_delete', 'post_delete'): + for signal_name in ("pre_delete", "post_delete"): with self.subTest(signal=signal_name): origin = Origin.objects.create() signal = getattr(models.signals, signal_name) @@ -619,8 +651,8 @@ class DeletionTests(TestCase): with self.assertNumQueries(2) as ctx: origin.delete() self.assertIn( - connection.ops.quote_name('large_field'), - ctx.captured_queries[0]['sql'], + connection.ops.quote_name("large_field"), + ctx.captured_queries[0]["sql"], ) signal.disconnect(receiver, sender=Referrer) @@ -629,14 +661,12 @@ class FastDeleteTests(TestCase): def test_fast_delete_all(self): with self.assertNumQueries(1) as ctx: User.objects.all().delete() - sql = ctx.captured_queries[0]['sql'] + sql = ctx.captured_queries[0]["sql"] # No subqueries is used when performing a full delete. - self.assertNotIn('SELECT', sql) + self.assertNotIn("SELECT", sql) def test_fast_delete_fk(self): - u = User.objects.create( - avatar=Avatar.objects.create() - ) + u = User.objects.create(avatar=Avatar.objects.create()) a = Avatar.objects.get(pk=u.avatar_id) # 1 query to fast-delete the user # 1 query to delete the avatar @@ -668,16 +698,16 @@ class FastDeleteTests(TestCase): def test_fast_delete_instance_set_pk_none(self): u = User.objects.create() # User can be fast-deleted. - collector = Collector(using='default') + collector = Collector(using="default") self.assertTrue(collector.can_fast_delete(u)) u.delete() self.assertIsNone(u.pk) def test_fast_delete_joined_qs(self): - a = Avatar.objects.create(desc='a') + a = Avatar.objects.create(desc="a") User.objects.create(avatar=a) u2 = User.objects.create() - self.assertNumQueries(1, User.objects.filter(avatar__desc='a').delete) + self.assertNumQueries(1, User.objects.filter(avatar__desc="a").delete) self.assertEqual(User.objects.count(), 1) self.assertTrue(User.objects.filter(pk=u2.pk).exists()) @@ -704,7 +734,7 @@ class FastDeleteTests(TestCase): # No problems here - we aren't going to cascade, so we will fast # delete the objects in a single query. self.assertNumQueries(1, User.objects.all().delete) - a = Avatar.objects.create(desc='a') + a = Avatar.objects.create(desc="a") User.objects.bulk_create(User(avatar=a) for i in range(0, 2000)) # We don't hit parameter amount limits for a, so just one query for # that + fast delete of the related objs. @@ -718,7 +748,7 @@ class FastDeleteTests(TestCase): """ with self.assertNumQueries(1): self.assertEqual( - User.objects.filter(avatar__desc='missing').delete(), + User.objects.filter(avatar__desc="missing").delete(), (0, {}), ) @@ -737,8 +767,10 @@ class FastDeleteTests(TestCase): with self.assertNumQueries(1): self.assertEqual( Base.objects.annotate( - rels_count=models.Count('rels'), - ).filter(rels_count=0).delete(), - (1, {'delete.Base': 1}), + rels_count=models.Count("rels"), + ) + .filter(rels_count=0) + .delete(), + (1, {"delete.Base": 1}), ) self.assertIs(Base.objects.exists(), False) |
