diff options
Diffstat (limited to 'tests/bulk_create/tests.py')
| -rw-r--r-- | tests/bulk_create/tests.py | 292 |
1 files changed, 285 insertions, 7 deletions
diff --git a/tests/bulk_create/tests.py b/tests/bulk_create/tests.py index 2ee54c382f..7e5ff32380 100644 --- a/tests/bulk_create/tests.py +++ b/tests/bulk_create/tests.py @@ -1,7 +1,11 @@ from math import ceil from operator import attrgetter -from django.db import IntegrityError, NotSupportedError, connection +from django.core.exceptions import FieldDoesNotExist +from django.db import ( + IntegrityError, NotSupportedError, OperationalError, ProgrammingError, + connection, +) from django.db.models import FileField, Value from django.db.models.functions import Lower from django.test import ( @@ -11,7 +15,8 @@ from django.test import ( from .models import ( BigAutoFieldModel, Country, NoFields, NullableFields, Pizzeria, ProxyCountry, ProxyMultiCountry, ProxyMultiProxyCountry, ProxyProxyCountry, - Restaurant, SmallAutoFieldModel, State, TwoFields, + RelatedModel, Restaurant, SmallAutoFieldModel, State, TwoFields, + UpsertConflict, ) @@ -53,10 +58,10 @@ class BulkCreateTests(TestCase): @skipUnlessDBFeature('has_bulk_insert') def test_long_and_short_text(self): Country.objects.bulk_create([ - Country(description='a' * 4001), - Country(description='a'), - Country(description='Ж' * 2001), - Country(description='Ж'), + Country(description='a' * 4001, iso_two_letter='A'), + Country(description='a', iso_two_letter='B'), + Country(description='Ж' * 2001, iso_two_letter='C'), + Country(description='Ж', iso_two_letter='D'), ]) self.assertEqual(Country.objects.count(), 4) @@ -218,7 +223,7 @@ class BulkCreateTests(TestCase): @skipUnlessDBFeature('has_bulk_insert') def test_explicit_batch_size_respects_max_batch_size(self): - objs = [Country() for i in range(1000)] + objs = [Country(name=f'Country {i}') for i in range(1000)] fields = ['name', 'iso_two_letter', 'description'] max_batch_size = max(connection.ops.bulk_batch_size(fields, objs), 1) with self.assertNumQueries(ceil(len(objs) / max_batch_size)): @@ -352,3 +357,276 @@ class BulkCreateTests(TestCase): msg = 'Batch size must be a positive integer.' with self.assertRaisesMessage(ValueError, msg): Country.objects.bulk_create([], batch_size=-1) + + @skipIfDBFeature('supports_update_conflicts') + def test_update_conflicts_unsupported(self): + msg = 'This database backend does not support updating conflicts.' + with self.assertRaisesMessage(NotSupportedError, msg): + Country.objects.bulk_create(self.data, update_conflicts=True) + + @skipUnlessDBFeature('supports_ignore_conflicts', 'supports_update_conflicts') + def test_ignore_update_conflicts_exclusive(self): + msg = 'ignore_conflicts and update_conflicts are mutually exclusive' + with self.assertRaisesMessage(ValueError, msg): + Country.objects.bulk_create( + self.data, + ignore_conflicts=True, + update_conflicts=True, + ) + + @skipUnlessDBFeature('supports_update_conflicts') + def test_update_conflicts_no_update_fields(self): + msg = ( + 'Fields that will be updated when a row insertion fails on ' + 'conflicts must be provided.' + ) + with self.assertRaisesMessage(ValueError, msg): + Country.objects.bulk_create(self.data, update_conflicts=True) + + @skipUnlessDBFeature('supports_update_conflicts') + @skipIfDBFeature('supports_update_conflicts_with_target') + def test_update_conflicts_unique_field_unsupported(self): + msg = ( + 'This database backend does not support updating conflicts with ' + 'specifying unique fields that can trigger the upsert.' + ) + with self.assertRaisesMessage(NotSupportedError, msg): + TwoFields.objects.bulk_create( + [TwoFields(f1=1, f2=1), TwoFields(f1=2, f2=2)], + update_conflicts=True, + update_fields=['f2'], + unique_fields=['f1'], + ) + + @skipUnlessDBFeature('supports_update_conflicts') + def test_update_conflicts_nonexistent_update_fields(self): + unique_fields = None + if connection.features.supports_update_conflicts_with_target: + unique_fields = ['f1'] + msg = "TwoFields has no field named 'nonexistent'" + with self.assertRaisesMessage(FieldDoesNotExist, msg): + TwoFields.objects.bulk_create( + [TwoFields(f1=1, f2=1), TwoFields(f1=2, f2=2)], + update_conflicts=True, + update_fields=['nonexistent'], + unique_fields=unique_fields, + ) + + @skipUnlessDBFeature( + 'supports_update_conflicts', 'supports_update_conflicts_with_target', + ) + def test_update_conflicts_unique_fields_required(self): + msg = 'Unique fields that can trigger the upsert must be provided.' + with self.assertRaisesMessage(ValueError, msg): + TwoFields.objects.bulk_create( + [TwoFields(f1=1, f2=1), TwoFields(f1=2, f2=2)], + update_conflicts=True, + update_fields=['f1'], + ) + + @skipUnlessDBFeature( + 'supports_update_conflicts', 'supports_update_conflicts_with_target', + ) + def test_update_conflicts_invalid_update_fields(self): + msg = ( + 'bulk_create() can only be used with concrete fields in ' + 'update_fields.' + ) + # Reverse one-to-one relationship. + with self.assertRaisesMessage(ValueError, msg): + Country.objects.bulk_create( + self.data, + update_conflicts=True, + update_fields=['relatedmodel'], + unique_fields=['pk'], + ) + # Many-to-many relationship. + with self.assertRaisesMessage(ValueError, msg): + RelatedModel.objects.bulk_create( + [RelatedModel(country=self.data[0])], + update_conflicts=True, + update_fields=['big_auto_fields'], + unique_fields=['country'], + ) + + @skipUnlessDBFeature( + 'supports_update_conflicts', 'supports_update_conflicts_with_target', + ) + def test_update_conflicts_pk_in_update_fields(self): + msg = 'bulk_create() cannot be used with primary keys in update_fields.' + with self.assertRaisesMessage(ValueError, msg): + BigAutoFieldModel.objects.bulk_create( + [BigAutoFieldModel()], + update_conflicts=True, + update_fields=['id'], + unique_fields=['id'], + ) + + @skipUnlessDBFeature( + 'supports_update_conflicts', 'supports_update_conflicts_with_target', + ) + def test_update_conflicts_invalid_unique_fields(self): + msg = ( + 'bulk_create() can only be used with concrete fields in ' + 'unique_fields.' + ) + # Reverse one-to-one relationship. + with self.assertRaisesMessage(ValueError, msg): + Country.objects.bulk_create( + self.data, + update_conflicts=True, + update_fields=['name'], + unique_fields=['relatedmodel'], + ) + # Many-to-many relationship. + with self.assertRaisesMessage(ValueError, msg): + RelatedModel.objects.bulk_create( + [RelatedModel(country=self.data[0])], + update_conflicts=True, + update_fields=['name'], + unique_fields=['big_auto_fields'], + ) + + def _test_update_conflicts_two_fields(self, unique_fields): + TwoFields.objects.bulk_create([ + TwoFields(f1=1, f2=1, name='a'), + TwoFields(f1=2, f2=2, name='b'), + ]) + self.assertEqual(TwoFields.objects.count(), 2) + + conflicting_objects = [ + TwoFields(f1=1, f2=1, name='c'), + TwoFields(f1=2, f2=2, name='d'), + ] + TwoFields.objects.bulk_create( + conflicting_objects, + update_conflicts=True, + unique_fields=unique_fields, + update_fields=['name'], + ) + self.assertEqual(TwoFields.objects.count(), 2) + self.assertCountEqual(TwoFields.objects.values('f1', 'f2', 'name'), [ + {'f1': 1, 'f2': 1, 'name': 'c'}, + {'f1': 2, 'f2': 2, 'name': 'd'}, + ]) + + @skipUnlessDBFeature('supports_update_conflicts', 'supports_update_conflicts_with_target') + def test_update_conflicts_two_fields_unique_fields_first(self): + self._test_update_conflicts_two_fields(['f1']) + + @skipUnlessDBFeature('supports_update_conflicts', 'supports_update_conflicts_with_target') + def test_update_conflicts_two_fields_unique_fields_second(self): + self._test_update_conflicts_two_fields(['f2']) + + @skipUnlessDBFeature('supports_update_conflicts', 'supports_update_conflicts_with_target') + def test_update_conflicts_two_fields_unique_fields_both(self): + with self.assertRaises((OperationalError, ProgrammingError)): + self._test_update_conflicts_two_fields(['f1', 'f2']) + + @skipUnlessDBFeature('supports_update_conflicts') + @skipIfDBFeature('supports_update_conflicts_with_target') + def test_update_conflicts_two_fields_no_unique_fields(self): + self._test_update_conflicts_two_fields([]) + + def _test_update_conflicts_unique_two_fields(self, unique_fields): + Country.objects.bulk_create(self.data) + self.assertEqual(Country.objects.count(), 4) + + new_data = [ + # Conflicting countries. + Country(name='Germany', iso_two_letter='DE', description=( + 'Germany is a country in Central Europe.' + )), + Country(name='Czech Republic', iso_two_letter='CZ', description=( + 'The Czech Republic is a landlocked country in Central Europe.' + )), + # New countries. + Country(name='Australia', iso_two_letter='AU'), + Country(name='Japan', iso_two_letter='JP', description=( + 'Japan is an island country in East Asia.' + )), + ] + Country.objects.bulk_create( + new_data, + update_conflicts=True, + update_fields=['description'], + unique_fields=unique_fields, + ) + self.assertEqual(Country.objects.count(), 6) + self.assertCountEqual(Country.objects.values('iso_two_letter', 'description'), [ + {'iso_two_letter': 'US', 'description': ''}, + {'iso_two_letter': 'NL', 'description': ''}, + {'iso_two_letter': 'DE', 'description': ( + 'Germany is a country in Central Europe.' + )}, + {'iso_two_letter': 'CZ', 'description': ( + 'The Czech Republic is a landlocked country in Central Europe.' + )}, + {'iso_two_letter': 'AU', 'description': ''}, + {'iso_two_letter': 'JP', 'description': ( + 'Japan is an island country in East Asia.' + )}, + ]) + + @skipUnlessDBFeature('supports_update_conflicts', 'supports_update_conflicts_with_target') + def test_update_conflicts_unique_two_fields_unique_fields_both(self): + self._test_update_conflicts_unique_two_fields(['iso_two_letter', 'name']) + + @skipUnlessDBFeature('supports_update_conflicts', 'supports_update_conflicts_with_target') + def test_update_conflicts_unique_two_fields_unique_fields_one(self): + with self.assertRaises((OperationalError, ProgrammingError)): + self._test_update_conflicts_unique_two_fields(['iso_two_letter']) + + @skipUnlessDBFeature('supports_update_conflicts') + @skipIfDBFeature('supports_update_conflicts_with_target') + def test_update_conflicts_unique_two_fields_unique_no_unique_fields(self): + self._test_update_conflicts_unique_two_fields([]) + + def _test_update_conflicts(self, unique_fields): + UpsertConflict.objects.bulk_create([ + UpsertConflict(number=1, rank=1, name='John'), + UpsertConflict(number=2, rank=2, name='Mary'), + UpsertConflict(number=3, rank=3, name='Hannah'), + ]) + self.assertEqual(UpsertConflict.objects.count(), 3) + + conflicting_objects = [ + UpsertConflict(number=1, rank=4, name='Steve'), + UpsertConflict(number=2, rank=2, name='Olivia'), + UpsertConflict(number=3, rank=1, name='Hannah'), + ] + UpsertConflict.objects.bulk_create( + conflicting_objects, + update_conflicts=True, + update_fields=['name', 'rank'], + unique_fields=unique_fields, + ) + self.assertEqual(UpsertConflict.objects.count(), 3) + self.assertCountEqual(UpsertConflict.objects.values('number', 'rank', 'name'), [ + {'number': 1, 'rank': 4, 'name': 'Steve'}, + {'number': 2, 'rank': 2, 'name': 'Olivia'}, + {'number': 3, 'rank': 1, 'name': 'Hannah'}, + ]) + + UpsertConflict.objects.bulk_create( + conflicting_objects + [UpsertConflict(number=4, rank=4, name='Mark')], + update_conflicts=True, + update_fields=['name', 'rank'], + unique_fields=unique_fields, + ) + self.assertEqual(UpsertConflict.objects.count(), 4) + self.assertCountEqual(UpsertConflict.objects.values('number', 'rank', 'name'), [ + {'number': 1, 'rank': 4, 'name': 'Steve'}, + {'number': 2, 'rank': 2, 'name': 'Olivia'}, + {'number': 3, 'rank': 1, 'name': 'Hannah'}, + {'number': 4, 'rank': 4, 'name': 'Mark'}, + ]) + + @skipUnlessDBFeature('supports_update_conflicts', 'supports_update_conflicts_with_target') + def test_update_conflicts_unique_fields(self): + self._test_update_conflicts(unique_fields=['number']) + + @skipUnlessDBFeature('supports_update_conflicts') + @skipIfDBFeature('supports_update_conflicts_with_target') + def test_update_conflicts_no_unique_fields(self): + self._test_update_conflicts([]) |
