diff options
Diffstat (limited to 'tests/postgres_tests/test_operations.py')
| -rw-r--r-- | tests/postgres_tests/test_operations.py | 434 |
1 files changed, 261 insertions, 173 deletions
diff --git a/tests/postgres_tests/test_operations.py b/tests/postgres_tests/test_operations.py index 790fef8332..a54f8811ba 100644 --- a/tests/postgres_tests/test_operations.py +++ b/tests/postgres_tests/test_operations.py @@ -3,9 +3,7 @@ from unittest import mock from migrations.test_base import OperationTestBase -from django.db import ( - IntegrityError, NotSupportedError, connection, transaction, -) +from django.db import IntegrityError, NotSupportedError, connection, transaction from django.db.migrations.state import ProjectState from django.db.models import CheckConstraint, Index, Q, UniqueConstraint from django.db.utils import ProgrammingError @@ -17,262 +15,315 @@ from . import PostgreSQLTestCase try: from django.contrib.postgres.indexes import BrinIndex, BTreeIndex from django.contrib.postgres.operations import ( - AddConstraintNotValid, AddIndexConcurrently, BloomExtension, - CreateCollation, CreateExtension, RemoveCollation, - RemoveIndexConcurrently, ValidateConstraint, + AddConstraintNotValid, + AddIndexConcurrently, + BloomExtension, + CreateCollation, + CreateExtension, + RemoveCollation, + RemoveIndexConcurrently, + ValidateConstraint, ) except ImportError: pass -@unittest.skipUnless(connection.vendor == 'postgresql', 'PostgreSQL specific tests.') -@modify_settings(INSTALLED_APPS={'append': 'migrations'}) +@unittest.skipUnless(connection.vendor == "postgresql", "PostgreSQL specific tests.") +@modify_settings(INSTALLED_APPS={"append": "migrations"}) class AddIndexConcurrentlyTests(OperationTestBase): - app_label = 'test_add_concurrently' + app_label = "test_add_concurrently" def test_requires_atomic_false(self): project_state = self.set_up_test_model(self.app_label) new_state = project_state.clone() operation = AddIndexConcurrently( - 'Pony', - Index(fields=['pink'], name='pony_pink_idx'), + "Pony", + Index(fields=["pink"], name="pony_pink_idx"), ) msg = ( - 'The AddIndexConcurrently operation cannot be executed inside ' - 'a transaction (set atomic = False on the migration).' + "The AddIndexConcurrently operation cannot be executed inside " + "a transaction (set atomic = False on the migration)." ) with self.assertRaisesMessage(NotSupportedError, msg): with connection.schema_editor(atomic=True) as editor: - operation.database_forwards(self.app_label, editor, project_state, new_state) + operation.database_forwards( + self.app_label, editor, project_state, new_state + ) def test_add(self): project_state = self.set_up_test_model(self.app_label, index=False) - table_name = '%s_pony' % self.app_label - index = Index(fields=['pink'], name='pony_pink_idx') + table_name = "%s_pony" % self.app_label + index = Index(fields=["pink"], name="pony_pink_idx") new_state = project_state.clone() - operation = AddIndexConcurrently('Pony', index) + operation = AddIndexConcurrently("Pony", index) self.assertEqual( operation.describe(), - 'Concurrently create index pony_pink_idx on field(s) pink of model Pony', + "Concurrently create index pony_pink_idx on field(s) pink of model Pony", ) operation.state_forwards(self.app_label, new_state) - self.assertEqual(len(new_state.models[self.app_label, 'pony'].options['indexes']), 1) - self.assertIndexNotExists(table_name, ['pink']) + self.assertEqual( + len(new_state.models[self.app_label, "pony"].options["indexes"]), 1 + ) + self.assertIndexNotExists(table_name, ["pink"]) # Add index. with connection.schema_editor(atomic=False) as editor: - operation.database_forwards(self.app_label, editor, project_state, new_state) - self.assertIndexExists(table_name, ['pink']) + operation.database_forwards( + self.app_label, editor, project_state, new_state + ) + self.assertIndexExists(table_name, ["pink"]) # Reversal. with connection.schema_editor(atomic=False) as editor: - operation.database_backwards(self.app_label, editor, new_state, project_state) - self.assertIndexNotExists(table_name, ['pink']) + operation.database_backwards( + self.app_label, editor, new_state, project_state + ) + self.assertIndexNotExists(table_name, ["pink"]) # Deconstruction. name, args, kwargs = operation.deconstruct() - self.assertEqual(name, 'AddIndexConcurrently') + self.assertEqual(name, "AddIndexConcurrently") self.assertEqual(args, []) - self.assertEqual(kwargs, {'model_name': 'Pony', 'index': index}) + self.assertEqual(kwargs, {"model_name": "Pony", "index": index}) def test_add_other_index_type(self): project_state = self.set_up_test_model(self.app_label, index=False) - table_name = '%s_pony' % self.app_label + table_name = "%s_pony" % self.app_label new_state = project_state.clone() operation = AddIndexConcurrently( - 'Pony', - BrinIndex(fields=['pink'], name='pony_pink_brin_idx'), + "Pony", + BrinIndex(fields=["pink"], name="pony_pink_brin_idx"), ) - self.assertIndexNotExists(table_name, ['pink']) + self.assertIndexNotExists(table_name, ["pink"]) # Add index. with connection.schema_editor(atomic=False) as editor: - operation.database_forwards(self.app_label, editor, project_state, new_state) - self.assertIndexExists(table_name, ['pink'], index_type='brin') + operation.database_forwards( + self.app_label, editor, project_state, new_state + ) + self.assertIndexExists(table_name, ["pink"], index_type="brin") # Reversal. with connection.schema_editor(atomic=False) as editor: - operation.database_backwards(self.app_label, editor, new_state, project_state) - self.assertIndexNotExists(table_name, ['pink']) + operation.database_backwards( + self.app_label, editor, new_state, project_state + ) + self.assertIndexNotExists(table_name, ["pink"]) def test_add_with_options(self): project_state = self.set_up_test_model(self.app_label, index=False) - table_name = '%s_pony' % self.app_label + table_name = "%s_pony" % self.app_label new_state = project_state.clone() - index = BTreeIndex(fields=['pink'], name='pony_pink_btree_idx', fillfactor=70) - operation = AddIndexConcurrently('Pony', index) - self.assertIndexNotExists(table_name, ['pink']) + index = BTreeIndex(fields=["pink"], name="pony_pink_btree_idx", fillfactor=70) + operation = AddIndexConcurrently("Pony", index) + self.assertIndexNotExists(table_name, ["pink"]) # Add index. with connection.schema_editor(atomic=False) as editor: - operation.database_forwards(self.app_label, editor, project_state, new_state) - self.assertIndexExists(table_name, ['pink'], index_type='btree') + operation.database_forwards( + self.app_label, editor, project_state, new_state + ) + self.assertIndexExists(table_name, ["pink"], index_type="btree") # Reversal. with connection.schema_editor(atomic=False) as editor: - operation.database_backwards(self.app_label, editor, new_state, project_state) - self.assertIndexNotExists(table_name, ['pink']) + operation.database_backwards( + self.app_label, editor, new_state, project_state + ) + self.assertIndexNotExists(table_name, ["pink"]) -@unittest.skipUnless(connection.vendor == 'postgresql', 'PostgreSQL specific tests.') -@modify_settings(INSTALLED_APPS={'append': 'migrations'}) +@unittest.skipUnless(connection.vendor == "postgresql", "PostgreSQL specific tests.") +@modify_settings(INSTALLED_APPS={"append": "migrations"}) class RemoveIndexConcurrentlyTests(OperationTestBase): - app_label = 'test_rm_concurrently' + app_label = "test_rm_concurrently" def test_requires_atomic_false(self): project_state = self.set_up_test_model(self.app_label, index=True) new_state = project_state.clone() - operation = RemoveIndexConcurrently('Pony', 'pony_pink_idx') + operation = RemoveIndexConcurrently("Pony", "pony_pink_idx") msg = ( - 'The RemoveIndexConcurrently operation cannot be executed inside ' - 'a transaction (set atomic = False on the migration).' + "The RemoveIndexConcurrently operation cannot be executed inside " + "a transaction (set atomic = False on the migration)." ) with self.assertRaisesMessage(NotSupportedError, msg): with connection.schema_editor(atomic=True) as editor: - operation.database_forwards(self.app_label, editor, project_state, new_state) + operation.database_forwards( + self.app_label, editor, project_state, new_state + ) def test_remove(self): project_state = self.set_up_test_model(self.app_label, index=True) - table_name = '%s_pony' % self.app_label + table_name = "%s_pony" % self.app_label self.assertTableExists(table_name) new_state = project_state.clone() - operation = RemoveIndexConcurrently('Pony', 'pony_pink_idx') + operation = RemoveIndexConcurrently("Pony", "pony_pink_idx") self.assertEqual( operation.describe(), - 'Concurrently remove index pony_pink_idx from Pony', + "Concurrently remove index pony_pink_idx from Pony", ) operation.state_forwards(self.app_label, new_state) - self.assertEqual(len(new_state.models[self.app_label, 'pony'].options['indexes']), 0) - self.assertIndexExists(table_name, ['pink']) + self.assertEqual( + len(new_state.models[self.app_label, "pony"].options["indexes"]), 0 + ) + self.assertIndexExists(table_name, ["pink"]) # Remove index. with connection.schema_editor(atomic=False) as editor: - operation.database_forwards(self.app_label, editor, project_state, new_state) - self.assertIndexNotExists(table_name, ['pink']) + operation.database_forwards( + self.app_label, editor, project_state, new_state + ) + self.assertIndexNotExists(table_name, ["pink"]) # Reversal. with connection.schema_editor(atomic=False) as editor: - operation.database_backwards(self.app_label, editor, new_state, project_state) - self.assertIndexExists(table_name, ['pink']) + operation.database_backwards( + self.app_label, editor, new_state, project_state + ) + self.assertIndexExists(table_name, ["pink"]) # Deconstruction. name, args, kwargs = operation.deconstruct() - self.assertEqual(name, 'RemoveIndexConcurrently') + self.assertEqual(name, "RemoveIndexConcurrently") self.assertEqual(args, []) - self.assertEqual(kwargs, {'model_name': 'Pony', 'name': 'pony_pink_idx'}) + self.assertEqual(kwargs, {"model_name": "Pony", "name": "pony_pink_idx"}) -class NoMigrationRouter(): +class NoMigrationRouter: def allow_migrate(self, db, app_label, **hints): return False -@unittest.skipUnless(connection.vendor == 'postgresql', 'PostgreSQL specific tests.') +@unittest.skipUnless(connection.vendor == "postgresql", "PostgreSQL specific tests.") class CreateExtensionTests(PostgreSQLTestCase): - app_label = 'test_allow_create_extention' + app_label = "test_allow_create_extention" @override_settings(DATABASE_ROUTERS=[NoMigrationRouter()]) def test_no_allow_migrate(self): - operation = CreateExtension('tablefunc') + operation = CreateExtension("tablefunc") project_state = ProjectState() new_state = project_state.clone() # Don't create an extension. with CaptureQueriesContext(connection) as captured_queries: with connection.schema_editor(atomic=False) as editor: - operation.database_forwards(self.app_label, editor, project_state, new_state) + operation.database_forwards( + self.app_label, editor, project_state, new_state + ) self.assertEqual(len(captured_queries), 0) # Reversal. with CaptureQueriesContext(connection) as captured_queries: with connection.schema_editor(atomic=False) as editor: - operation.database_backwards(self.app_label, editor, new_state, project_state) + operation.database_backwards( + self.app_label, editor, new_state, project_state + ) self.assertEqual(len(captured_queries), 0) def test_allow_migrate(self): - operation = CreateExtension('tablefunc') - self.assertEqual(operation.migration_name_fragment, 'create_extension_tablefunc') + operation = CreateExtension("tablefunc") + self.assertEqual( + operation.migration_name_fragment, "create_extension_tablefunc" + ) project_state = ProjectState() new_state = project_state.clone() # Create an extension. with CaptureQueriesContext(connection) as captured_queries: with connection.schema_editor(atomic=False) as editor: - operation.database_forwards(self.app_label, editor, project_state, new_state) + operation.database_forwards( + self.app_label, editor, project_state, new_state + ) self.assertEqual(len(captured_queries), 4) - self.assertIn('CREATE EXTENSION IF NOT EXISTS', captured_queries[1]['sql']) + self.assertIn("CREATE EXTENSION IF NOT EXISTS", captured_queries[1]["sql"]) # Reversal. with CaptureQueriesContext(connection) as captured_queries: with connection.schema_editor(atomic=False) as editor: - operation.database_backwards(self.app_label, editor, new_state, project_state) + operation.database_backwards( + self.app_label, editor, new_state, project_state + ) self.assertEqual(len(captured_queries), 2) - self.assertIn('DROP EXTENSION IF EXISTS', captured_queries[1]['sql']) + self.assertIn("DROP EXTENSION IF EXISTS", captured_queries[1]["sql"]) def test_create_existing_extension(self): operation = BloomExtension() - self.assertEqual(operation.migration_name_fragment, 'create_extension_bloom') + self.assertEqual(operation.migration_name_fragment, "create_extension_bloom") project_state = ProjectState() new_state = project_state.clone() # Don't create an existing extension. with CaptureQueriesContext(connection) as captured_queries: with connection.schema_editor(atomic=False) as editor: - operation.database_forwards(self.app_label, editor, project_state, new_state) + operation.database_forwards( + self.app_label, editor, project_state, new_state + ) self.assertEqual(len(captured_queries), 3) - self.assertIn('SELECT', captured_queries[0]['sql']) + self.assertIn("SELECT", captured_queries[0]["sql"]) def test_drop_nonexistent_extension(self): - operation = CreateExtension('tablefunc') + operation = CreateExtension("tablefunc") project_state = ProjectState() new_state = project_state.clone() # Don't drop a nonexistent extension. with CaptureQueriesContext(connection) as captured_queries: with connection.schema_editor(atomic=False) as editor: - operation.database_backwards(self.app_label, editor, project_state, new_state) + operation.database_backwards( + self.app_label, editor, project_state, new_state + ) self.assertEqual(len(captured_queries), 1) - self.assertIn('SELECT', captured_queries[0]['sql']) + self.assertIn("SELECT", captured_queries[0]["sql"]) -@unittest.skipUnless(connection.vendor == 'postgresql', 'PostgreSQL specific tests.') +@unittest.skipUnless(connection.vendor == "postgresql", "PostgreSQL specific tests.") class CreateCollationTests(PostgreSQLTestCase): - app_label = 'test_allow_create_collation' + app_label = "test_allow_create_collation" @override_settings(DATABASE_ROUTERS=[NoMigrationRouter()]) def test_no_allow_migrate(self): - operation = CreateCollation('C_test', locale='C') + operation = CreateCollation("C_test", locale="C") project_state = ProjectState() new_state = project_state.clone() # Don't create a collation. with CaptureQueriesContext(connection) as captured_queries: with connection.schema_editor(atomic=False) as editor: - operation.database_forwards(self.app_label, editor, project_state, new_state) + operation.database_forwards( + self.app_label, editor, project_state, new_state + ) self.assertEqual(len(captured_queries), 0) # Reversal. with CaptureQueriesContext(connection) as captured_queries: with connection.schema_editor(atomic=False) as editor: - operation.database_backwards(self.app_label, editor, new_state, project_state) + operation.database_backwards( + self.app_label, editor, new_state, project_state + ) self.assertEqual(len(captured_queries), 0) def test_create(self): - operation = CreateCollation('C_test', locale='C') - self.assertEqual(operation.migration_name_fragment, 'create_collation_c_test') - self.assertEqual(operation.describe(), 'Create collation C_test') + operation = CreateCollation("C_test", locale="C") + self.assertEqual(operation.migration_name_fragment, "create_collation_c_test") + self.assertEqual(operation.describe(), "Create collation C_test") project_state = ProjectState() new_state = project_state.clone() # Create a collation. with CaptureQueriesContext(connection) as captured_queries: with connection.schema_editor(atomic=False) as editor: - operation.database_forwards(self.app_label, editor, project_state, new_state) + operation.database_forwards( + self.app_label, editor, project_state, new_state + ) self.assertEqual(len(captured_queries), 1) - self.assertIn('CREATE COLLATION', captured_queries[0]['sql']) + self.assertIn("CREATE COLLATION", captured_queries[0]["sql"]) # Creating the same collation raises an exception. - with self.assertRaisesMessage(ProgrammingError, 'already exists'): + with self.assertRaisesMessage(ProgrammingError, "already exists"): with connection.schema_editor(atomic=True) as editor: - operation.database_forwards(self.app_label, editor, project_state, new_state) + operation.database_forwards( + self.app_label, editor, project_state, new_state + ) # Reversal. with CaptureQueriesContext(connection) as captured_queries: with connection.schema_editor(atomic=False) as editor: - operation.database_backwards(self.app_label, editor, new_state, project_state) + operation.database_backwards( + self.app_label, editor, new_state, project_state + ) self.assertEqual(len(captured_queries), 1) - self.assertIn('DROP COLLATION', captured_queries[0]['sql']) + self.assertIn("DROP COLLATION", captured_queries[0]["sql"]) # Deconstruction. name, args, kwargs = operation.deconstruct() - self.assertEqual(name, 'CreateCollation') + self.assertEqual(name, "CreateCollation") self.assertEqual(args, []) - self.assertEqual(kwargs, {'name': 'C_test', 'locale': 'C'}) + self.assertEqual(kwargs, {"name": "C_test", "locale": "C"}) - @skipUnlessDBFeature('supports_non_deterministic_collations') + @skipUnlessDBFeature("supports_non_deterministic_collations") def test_create_non_deterministic_collation(self): operation = CreateCollation( - 'case_insensitive_test', - 'und-u-ks-level2', - provider='icu', + "case_insensitive_test", + "und-u-ks-level2", + provider="icu", deterministic=False, ) project_state = ProjectState() @@ -280,216 +331,253 @@ class CreateCollationTests(PostgreSQLTestCase): # Create a collation. with CaptureQueriesContext(connection) as captured_queries: with connection.schema_editor(atomic=False) as editor: - operation.database_forwards(self.app_label, editor, project_state, new_state) + operation.database_forwards( + self.app_label, editor, project_state, new_state + ) self.assertEqual(len(captured_queries), 1) - self.assertIn('CREATE COLLATION', captured_queries[0]['sql']) + self.assertIn("CREATE COLLATION", captured_queries[0]["sql"]) # Reversal. with CaptureQueriesContext(connection) as captured_queries: with connection.schema_editor(atomic=False) as editor: - operation.database_backwards(self.app_label, editor, new_state, project_state) + operation.database_backwards( + self.app_label, editor, new_state, project_state + ) self.assertEqual(len(captured_queries), 1) - self.assertIn('DROP COLLATION', captured_queries[0]['sql']) + self.assertIn("DROP COLLATION", captured_queries[0]["sql"]) # Deconstruction. name, args, kwargs = operation.deconstruct() - self.assertEqual(name, 'CreateCollation') + self.assertEqual(name, "CreateCollation") self.assertEqual(args, []) - self.assertEqual(kwargs, { - 'name': 'case_insensitive_test', - 'locale': 'und-u-ks-level2', - 'provider': 'icu', - 'deterministic': False, - }) + self.assertEqual( + kwargs, + { + "name": "case_insensitive_test", + "locale": "und-u-ks-level2", + "provider": "icu", + "deterministic": False, + }, + ) def test_create_collation_alternate_provider(self): operation = CreateCollation( - 'german_phonebook_test', - provider='icu', - locale='de-u-co-phonebk', + "german_phonebook_test", + provider="icu", + locale="de-u-co-phonebk", ) project_state = ProjectState() new_state = project_state.clone() # Create an collation. with CaptureQueriesContext(connection) as captured_queries: with connection.schema_editor(atomic=False) as editor: - operation.database_forwards(self.app_label, editor, project_state, new_state) + operation.database_forwards( + self.app_label, editor, project_state, new_state + ) self.assertEqual(len(captured_queries), 1) - self.assertIn('CREATE COLLATION', captured_queries[0]['sql']) + self.assertIn("CREATE COLLATION", captured_queries[0]["sql"]) # Reversal. with CaptureQueriesContext(connection) as captured_queries: with connection.schema_editor(atomic=False) as editor: - operation.database_backwards(self.app_label, editor, new_state, project_state) + operation.database_backwards( + self.app_label, editor, new_state, project_state + ) self.assertEqual(len(captured_queries), 1) - self.assertIn('DROP COLLATION', captured_queries[0]['sql']) + self.assertIn("DROP COLLATION", captured_queries[0]["sql"]) def test_nondeterministic_collation_not_supported(self): operation = CreateCollation( - 'case_insensitive_test', - provider='icu', - locale='und-u-ks-level2', + "case_insensitive_test", + provider="icu", + locale="und-u-ks-level2", deterministic=False, ) project_state = ProjectState() new_state = project_state.clone() - msg = 'Non-deterministic collations require PostgreSQL 12+.' + msg = "Non-deterministic collations require PostgreSQL 12+." with connection.schema_editor(atomic=False) as editor: with mock.patch( - 'django.db.backends.postgresql.features.DatabaseFeatures.' - 'supports_non_deterministic_collations', + "django.db.backends.postgresql.features.DatabaseFeatures." + "supports_non_deterministic_collations", False, ): with self.assertRaisesMessage(NotSupportedError, msg): - operation.database_forwards(self.app_label, editor, project_state, new_state) + operation.database_forwards( + self.app_label, editor, project_state, new_state + ) -@unittest.skipUnless(connection.vendor == 'postgresql', 'PostgreSQL specific tests.') +@unittest.skipUnless(connection.vendor == "postgresql", "PostgreSQL specific tests.") class RemoveCollationTests(PostgreSQLTestCase): - app_label = 'test_allow_remove_collation' + app_label = "test_allow_remove_collation" @override_settings(DATABASE_ROUTERS=[NoMigrationRouter()]) def test_no_allow_migrate(self): - operation = RemoveCollation('C_test', locale='C') + operation = RemoveCollation("C_test", locale="C") project_state = ProjectState() new_state = project_state.clone() # Don't create a collation. with CaptureQueriesContext(connection) as captured_queries: with connection.schema_editor(atomic=False) as editor: - operation.database_forwards(self.app_label, editor, project_state, new_state) + operation.database_forwards( + self.app_label, editor, project_state, new_state + ) self.assertEqual(len(captured_queries), 0) # Reversal. with CaptureQueriesContext(connection) as captured_queries: with connection.schema_editor(atomic=False) as editor: - operation.database_backwards(self.app_label, editor, new_state, project_state) + operation.database_backwards( + self.app_label, editor, new_state, project_state + ) self.assertEqual(len(captured_queries), 0) def test_remove(self): - operation = CreateCollation('C_test', locale='C') + operation = CreateCollation("C_test", locale="C") project_state = ProjectState() new_state = project_state.clone() with connection.schema_editor(atomic=False) as editor: - operation.database_forwards(self.app_label, editor, project_state, new_state) + operation.database_forwards( + self.app_label, editor, project_state, new_state + ) - operation = RemoveCollation('C_test', locale='C') - self.assertEqual(operation.migration_name_fragment, 'remove_collation_c_test') - self.assertEqual(operation.describe(), 'Remove collation C_test') + operation = RemoveCollation("C_test", locale="C") + self.assertEqual(operation.migration_name_fragment, "remove_collation_c_test") + self.assertEqual(operation.describe(), "Remove collation C_test") project_state = ProjectState() new_state = project_state.clone() # Remove a collation. with CaptureQueriesContext(connection) as captured_queries: with connection.schema_editor(atomic=False) as editor: - operation.database_forwards(self.app_label, editor, project_state, new_state) + operation.database_forwards( + self.app_label, editor, project_state, new_state + ) self.assertEqual(len(captured_queries), 1) - self.assertIn('DROP COLLATION', captured_queries[0]['sql']) + self.assertIn("DROP COLLATION", captured_queries[0]["sql"]) # Removing a nonexistent collation raises an exception. - with self.assertRaisesMessage(ProgrammingError, 'does not exist'): + with self.assertRaisesMessage(ProgrammingError, "does not exist"): with connection.schema_editor(atomic=True) as editor: - operation.database_forwards(self.app_label, editor, project_state, new_state) + operation.database_forwards( + self.app_label, editor, project_state, new_state + ) # Reversal. with CaptureQueriesContext(connection) as captured_queries: with connection.schema_editor(atomic=False) as editor: - operation.database_backwards(self.app_label, editor, new_state, project_state) + operation.database_backwards( + self.app_label, editor, new_state, project_state + ) self.assertEqual(len(captured_queries), 1) - self.assertIn('CREATE COLLATION', captured_queries[0]['sql']) + self.assertIn("CREATE COLLATION", captured_queries[0]["sql"]) # Deconstruction. name, args, kwargs = operation.deconstruct() - self.assertEqual(name, 'RemoveCollation') + self.assertEqual(name, "RemoveCollation") self.assertEqual(args, []) - self.assertEqual(kwargs, {'name': 'C_test', 'locale': 'C'}) + self.assertEqual(kwargs, {"name": "C_test", "locale": "C"}) -@unittest.skipUnless(connection.vendor == 'postgresql', 'PostgreSQL specific tests.') -@modify_settings(INSTALLED_APPS={'append': 'migrations'}) +@unittest.skipUnless(connection.vendor == "postgresql", "PostgreSQL specific tests.") +@modify_settings(INSTALLED_APPS={"append": "migrations"}) class AddConstraintNotValidTests(OperationTestBase): - app_label = 'test_add_constraint_not_valid' + app_label = "test_add_constraint_not_valid" def test_non_check_constraint_not_supported(self): - constraint = UniqueConstraint(fields=['pink'], name='pony_pink_uniq') - msg = 'AddConstraintNotValid.constraint must be a check constraint.' + constraint = UniqueConstraint(fields=["pink"], name="pony_pink_uniq") + msg = "AddConstraintNotValid.constraint must be a check constraint." with self.assertRaisesMessage(TypeError, msg): - AddConstraintNotValid(model_name='pony', constraint=constraint) + AddConstraintNotValid(model_name="pony", constraint=constraint) def test_add(self): - table_name = f'{self.app_label}_pony' - constraint_name = 'pony_pink_gte_check' + table_name = f"{self.app_label}_pony" + constraint_name = "pony_pink_gte_check" constraint = CheckConstraint(check=Q(pink__gte=4), name=constraint_name) - operation = AddConstraintNotValid('Pony', constraint=constraint) + operation = AddConstraintNotValid("Pony", constraint=constraint) project_state, new_state = self.make_test_state(self.app_label, operation) self.assertEqual( operation.describe(), - f'Create not valid constraint {constraint_name} on model Pony', + f"Create not valid constraint {constraint_name} on model Pony", ) self.assertEqual( operation.migration_name_fragment, - f'pony_{constraint_name}_not_valid', + f"pony_{constraint_name}_not_valid", ) self.assertEqual( - len(new_state.models[self.app_label, 'pony'].options['constraints']), + len(new_state.models[self.app_label, "pony"].options["constraints"]), 1, ) self.assertConstraintNotExists(table_name, constraint_name) - Pony = new_state.apps.get_model(self.app_label, 'Pony') + Pony = new_state.apps.get_model(self.app_label, "Pony") self.assertEqual(len(Pony._meta.constraints), 1) Pony.objects.create(pink=2, weight=1.0) # Add constraint. with connection.schema_editor(atomic=True) as editor: - operation.database_forwards(self.app_label, editor, project_state, new_state) + operation.database_forwards( + self.app_label, editor, project_state, new_state + ) msg = f'check constraint "{constraint_name}"' with self.assertRaisesMessage(IntegrityError, msg), transaction.atomic(): Pony.objects.create(pink=3, weight=1.0) self.assertConstraintExists(table_name, constraint_name) # Reversal. with connection.schema_editor(atomic=True) as editor: - operation.database_backwards(self.app_label, editor, project_state, new_state) + operation.database_backwards( + self.app_label, editor, project_state, new_state + ) self.assertConstraintNotExists(table_name, constraint_name) Pony.objects.create(pink=3, weight=1.0) # Deconstruction. name, args, kwargs = operation.deconstruct() - self.assertEqual(name, 'AddConstraintNotValid') + self.assertEqual(name, "AddConstraintNotValid") self.assertEqual(args, []) - self.assertEqual(kwargs, {'model_name': 'Pony', 'constraint': constraint}) + self.assertEqual(kwargs, {"model_name": "Pony", "constraint": constraint}) -@unittest.skipUnless(connection.vendor == 'postgresql', 'PostgreSQL specific tests.') -@modify_settings(INSTALLED_APPS={'append': 'migrations'}) +@unittest.skipUnless(connection.vendor == "postgresql", "PostgreSQL specific tests.") +@modify_settings(INSTALLED_APPS={"append": "migrations"}) class ValidateConstraintTests(OperationTestBase): - app_label = 'test_validate_constraint' + app_label = "test_validate_constraint" def test_validate(self): - constraint_name = 'pony_pink_gte_check' + constraint_name = "pony_pink_gte_check" constraint = CheckConstraint(check=Q(pink__gte=4), name=constraint_name) - operation = AddConstraintNotValid('Pony', constraint=constraint) + operation = AddConstraintNotValid("Pony", constraint=constraint) project_state, new_state = self.make_test_state(self.app_label, operation) - Pony = new_state.apps.get_model(self.app_label, 'Pony') + Pony = new_state.apps.get_model(self.app_label, "Pony") obj = Pony.objects.create(pink=2, weight=1.0) # Add constraint. with connection.schema_editor(atomic=True) as editor: - operation.database_forwards(self.app_label, editor, project_state, new_state) + operation.database_forwards( + self.app_label, editor, project_state, new_state + ) project_state = new_state new_state = new_state.clone() - operation = ValidateConstraint('Pony', name=constraint_name) + operation = ValidateConstraint("Pony", name=constraint_name) operation.state_forwards(self.app_label, new_state) self.assertEqual( operation.describe(), - f'Validate constraint {constraint_name} on model Pony', + f"Validate constraint {constraint_name} on model Pony", ) self.assertEqual( operation.migration_name_fragment, - f'pony_validate_{constraint_name}', + f"pony_validate_{constraint_name}", ) # Validate constraint. with connection.schema_editor(atomic=True) as editor: msg = f'check constraint "{constraint_name}"' with self.assertRaisesMessage(IntegrityError, msg): - operation.database_forwards(self.app_label, editor, project_state, new_state) + operation.database_forwards( + self.app_label, editor, project_state, new_state + ) obj.pink = 5 obj.save() with connection.schema_editor(atomic=True) as editor: - operation.database_forwards(self.app_label, editor, project_state, new_state) + operation.database_forwards( + self.app_label, editor, project_state, new_state + ) # Reversal is a noop. with connection.schema_editor() as editor: with self.assertNumQueries(0): - operation.database_backwards(self.app_label, editor, new_state, project_state) + operation.database_backwards( + self.app_label, editor, new_state, project_state + ) # Deconstruction. name, args, kwargs = operation.deconstruct() - self.assertEqual(name, 'ValidateConstraint') + self.assertEqual(name, "ValidateConstraint") self.assertEqual(args, []) - self.assertEqual(kwargs, {'model_name': 'Pony', 'name': constraint_name}) + self.assertEqual(kwargs, {"model_name": "Pony", "name": constraint_name}) |
