diff options
| author | django-bot <ops@djangoproject.com> | 2022-02-03 20:24:19 +0100 |
|---|---|---|
| committer | Mariusz Felisiak <felisiak.mariusz@gmail.com> | 2022-02-07 20:37:05 +0100 |
| commit | 9c19aff7c7561e3a82978a272ecdaad40dda5c00 (patch) | |
| tree | f0506b668a013d0063e5fba3dbf4863b466713ba /tests/migrations/test_base.py | |
| parent | f68fa8b45dfac545cfc4111d4e52804c86db68d3 (diff) | |
Refs #33476 -- Reformatted code with Black.
Diffstat (limited to 'tests/migrations/test_base.py')
| -rw-r--r-- | tests/migrations/test_base.py | 304 |
1 files changed, 183 insertions, 121 deletions
diff --git a/tests/migrations/test_base.py b/tests/migrations/test_base.py index 6f8081a462..955bfa99d0 100644 --- a/tests/migrations/test_base.py +++ b/tests/migrations/test_base.py @@ -20,52 +20,66 @@ class MigrationTestBase(TransactionTestCase): """ available_apps = ["migrations"] - databases = {'default', 'other'} + databases = {"default", "other"} def tearDown(self): # Reset applied-migrations state. for db in self.databases: recorder = MigrationRecorder(connections[db]) - recorder.migration_qs.filter(app='migrations').delete() + recorder.migration_qs.filter(app="migrations").delete() - def get_table_description(self, table, using='default'): + def get_table_description(self, table, using="default"): with connections[using].cursor() as cursor: return connections[using].introspection.get_table_description(cursor, table) - def assertTableExists(self, table, using='default'): + def assertTableExists(self, table, using="default"): with connections[using].cursor() as cursor: self.assertIn(table, connections[using].introspection.table_names(cursor)) - def assertTableNotExists(self, table, using='default'): + def assertTableNotExists(self, table, using="default"): with connections[using].cursor() as cursor: - self.assertNotIn(table, connections[using].introspection.table_names(cursor)) + self.assertNotIn( + table, connections[using].introspection.table_names(cursor) + ) - def assertColumnExists(self, table, column, using='default'): - self.assertIn(column, [c.name for c in self.get_table_description(table, using=using)]) + def assertColumnExists(self, table, column, using="default"): + self.assertIn( + column, [c.name for c in self.get_table_description(table, using=using)] + ) - def assertColumnNotExists(self, table, column, using='default'): - self.assertNotIn(column, [c.name for c in self.get_table_description(table, using=using)]) + def assertColumnNotExists(self, table, column, using="default"): + self.assertNotIn( + column, [c.name for c in self.get_table_description(table, using=using)] + ) def _get_column_allows_null(self, table, column, using): - return [c.null_ok for c in self.get_table_description(table, using=using) if c.name == column][0] + return [ + c.null_ok + for c in self.get_table_description(table, using=using) + if c.name == column + ][0] - def assertColumnNull(self, table, column, using='default'): + def assertColumnNull(self, table, column, using="default"): self.assertTrue(self._get_column_allows_null(table, column, using)) - def assertColumnNotNull(self, table, column, using='default'): + def assertColumnNotNull(self, table, column, using="default"): self.assertFalse(self._get_column_allows_null(table, column, using)) - def assertIndexExists(self, table, columns, value=True, using='default', index_type=None): + def assertIndexExists( + self, table, columns, value=True, using="default", index_type=None + ): with connections[using].cursor() as cursor: self.assertEqual( value, any( c["index"] - for c in connections[using].introspection.get_constraints(cursor, table).values() + for c in connections[using] + .introspection.get_constraints(cursor, table) + .values() if ( - c['columns'] == list(columns) and - (index_type is None or c['type'] == index_type) and - not c['unique'] + c["columns"] == list(columns) + and (index_type is None or c["type"] == index_type) + and not c["unique"] ) ), ) @@ -73,47 +87,53 @@ class MigrationTestBase(TransactionTestCase): def assertIndexNotExists(self, table, columns): return self.assertIndexExists(table, columns, False) - def assertIndexNameExists(self, table, index, using='default'): + def assertIndexNameExists(self, table, index, using="default"): with connections[using].cursor() as cursor: self.assertIn( index, connection.introspection.get_constraints(cursor, table), ) - def assertIndexNameNotExists(self, table, index, using='default'): + def assertIndexNameNotExists(self, table, index, using="default"): with connections[using].cursor() as cursor: self.assertNotIn( index, connection.introspection.get_constraints(cursor, table), ) - def assertConstraintExists(self, table, name, value=True, using='default'): + def assertConstraintExists(self, table, name, value=True, using="default"): with connections[using].cursor() as cursor: - constraints = connections[using].introspection.get_constraints(cursor, table).items() + constraints = ( + connections[using].introspection.get_constraints(cursor, table).items() + ) self.assertEqual( value, - any(c['check'] for n, c in constraints if n == name), + any(c["check"] for n, c in constraints if n == name), ) def assertConstraintNotExists(self, table, name): return self.assertConstraintExists(table, name, False) - def assertUniqueConstraintExists(self, table, columns, value=True, using='default'): + def assertUniqueConstraintExists(self, table, columns, value=True, using="default"): with connections[using].cursor() as cursor: - constraints = connections[using].introspection.get_constraints(cursor, table).values() + constraints = ( + connections[using].introspection.get_constraints(cursor, table).values() + ) self.assertEqual( value, - any(c['unique'] for c in constraints if c['columns'] == list(columns)), + any(c["unique"] for c in constraints if c["columns"] == list(columns)), ) - def assertFKExists(self, table, columns, to, value=True, using='default'): + def assertFKExists(self, table, columns, to, value=True, using="default"): with connections[using].cursor() as cursor: self.assertEqual( value, any( c["foreign_key"] == to - for c in connections[using].introspection.get_constraints(cursor, table).values() - if c['columns'] == list(columns) + for c in connections[using] + .introspection.get_constraints(cursor, table) + .values() + if c["columns"] == list(columns) ), ) @@ -121,7 +141,7 @@ class MigrationTestBase(TransactionTestCase): return self.assertFKExists(table, columns, to, False) @contextmanager - def temporary_migration_module(self, app_label='migrations', module=None): + def temporary_migration_module(self, app_label="migrations", module=None): """ Allows testing management commands in a temporary migrations module. @@ -140,12 +160,12 @@ class MigrationTestBase(TransactionTestCase): """ with tempfile.TemporaryDirectory() as temp_dir: target_dir = tempfile.mkdtemp(dir=temp_dir) - with open(os.path.join(target_dir, '__init__.py'), 'w'): + with open(os.path.join(target_dir, "__init__.py"), "w"): pass - target_migrations_dir = os.path.join(target_dir, 'migrations') + target_migrations_dir = os.path.join(target_dir, "migrations") if module is None: - module = apps.get_app_config(app_label).name + '.migrations' + module = apps.get_app_config(app_label).name + ".migrations" try: source_migrations_dir = module_dir(import_module(module)) @@ -155,7 +175,7 @@ class MigrationTestBase(TransactionTestCase): shutil.copytree(source_migrations_dir, target_migrations_dir) with extend_sys_path(temp_dir): - new_module = os.path.basename(target_dir) + '.migrations' + new_module = os.path.basename(target_dir) + ".migrations" with self.settings(MIGRATION_MODULES={app_label: new_module}): yield target_migrations_dir @@ -173,22 +193,28 @@ class OperationTestBase(MigrationTestBase): super().tearDown() def cleanup_test_tables(self): - table_names = frozenset(connection.introspection.table_names()) - self._initial_table_names + table_names = ( + frozenset(connection.introspection.table_names()) + - self._initial_table_names + ) with connection.schema_editor() as editor: with connection.constraint_checks_disabled(): for table_name in table_names: - editor.execute(editor.sql_delete_table % { - 'table': editor.quote_name(table_name), - }) + editor.execute( + editor.sql_delete_table + % { + "table": editor.quote_name(table_name), + } + ) def apply_operations(self, app_label, project_state, operations, atomic=True): - migration = Migration('name', app_label) + migration = Migration("name", app_label) migration.operations = operations with connection.schema_editor(atomic=atomic) as editor: return migration.apply(project_state, editor) def unapply_operations(self, app_label, project_state, operations, atomic=True): - migration = Migration('name', app_label) + migration = Migration("name", app_label) migration.operations = operations with connection.schema_editor(atomic=atomic) as editor: return migration.unapply(project_state, editor) @@ -204,106 +230,142 @@ class OperationTestBase(MigrationTestBase): return project_state, new_state def set_up_test_model( - self, app_label, second_model=False, third_model=False, index=False, - multicol_index=False, related_model=False, mti_model=False, - proxy_model=False, manager_model=False, unique_together=False, - options=False, db_table=None, index_together=False, constraints=None, + self, + app_label, + second_model=False, + third_model=False, + index=False, + multicol_index=False, + related_model=False, + mti_model=False, + proxy_model=False, + manager_model=False, + unique_together=False, + options=False, + db_table=None, + index_together=False, + constraints=None, indexes=None, ): """Creates a test model state and database table.""" # Make the "current" state. model_options = { - 'swappable': 'TEST_SWAP_MODEL', - 'index_together': [['weight', 'pink']] if index_together else [], - 'unique_together': [['pink', 'weight']] if unique_together else [], + "swappable": "TEST_SWAP_MODEL", + "index_together": [["weight", "pink"]] if index_together else [], + "unique_together": [["pink", "weight"]] if unique_together else [], } if options: - model_options['permissions'] = [('can_groom', 'Can groom')] + model_options["permissions"] = [("can_groom", "Can groom")] if db_table: - model_options['db_table'] = db_table - operations = [migrations.CreateModel( - 'Pony', - [ - ('id', models.AutoField(primary_key=True)), - ('pink', models.IntegerField(default=3)), - ('weight', models.FloatField()), - ], - options=model_options, - )] + model_options["db_table"] = db_table + operations = [ + migrations.CreateModel( + "Pony", + [ + ("id", models.AutoField(primary_key=True)), + ("pink", models.IntegerField(default=3)), + ("weight", models.FloatField()), + ], + options=model_options, + ) + ] if index: - operations.append(migrations.AddIndex( - 'Pony', - models.Index(fields=['pink'], name='pony_pink_idx'), - )) + operations.append( + migrations.AddIndex( + "Pony", + models.Index(fields=["pink"], name="pony_pink_idx"), + ) + ) if multicol_index: - operations.append(migrations.AddIndex( - 'Pony', - models.Index(fields=['pink', 'weight'], name='pony_test_idx'), - )) + operations.append( + migrations.AddIndex( + "Pony", + models.Index(fields=["pink", "weight"], name="pony_test_idx"), + ) + ) if indexes: for index in indexes: - operations.append(migrations.AddIndex('Pony', index)) + operations.append(migrations.AddIndex("Pony", index)) if constraints: for constraint in constraints: - operations.append(migrations.AddConstraint('Pony', constraint)) + operations.append(migrations.AddConstraint("Pony", constraint)) if second_model: - operations.append(migrations.CreateModel( - 'Stable', - [ - ('id', models.AutoField(primary_key=True)), - ] - )) + operations.append( + migrations.CreateModel( + "Stable", + [ + ("id", models.AutoField(primary_key=True)), + ], + ) + ) if third_model: - operations.append(migrations.CreateModel( - 'Van', - [ - ('id', models.AutoField(primary_key=True)), - ] - )) + operations.append( + migrations.CreateModel( + "Van", + [ + ("id", models.AutoField(primary_key=True)), + ], + ) + ) if related_model: - operations.append(migrations.CreateModel( - 'Rider', - [ - ('id', models.AutoField(primary_key=True)), - ('pony', models.ForeignKey('Pony', models.CASCADE)), - ('friend', models.ForeignKey('self', models.CASCADE, null=True)) - ], - )) + operations.append( + migrations.CreateModel( + "Rider", + [ + ("id", models.AutoField(primary_key=True)), + ("pony", models.ForeignKey("Pony", models.CASCADE)), + ( + "friend", + models.ForeignKey("self", models.CASCADE, null=True), + ), + ], + ) + ) if mti_model: - operations.append(migrations.CreateModel( - 'ShetlandPony', - fields=[ - ('pony_ptr', models.OneToOneField( - 'Pony', - models.CASCADE, - auto_created=True, - parent_link=True, - primary_key=True, - to_field='id', - serialize=False, - )), - ('cuteness', models.IntegerField(default=1)), - ], - bases=['%s.Pony' % app_label], - )) + operations.append( + migrations.CreateModel( + "ShetlandPony", + fields=[ + ( + "pony_ptr", + models.OneToOneField( + "Pony", + models.CASCADE, + auto_created=True, + parent_link=True, + primary_key=True, + to_field="id", + serialize=False, + ), + ), + ("cuteness", models.IntegerField(default=1)), + ], + bases=["%s.Pony" % app_label], + ) + ) if proxy_model: - operations.append(migrations.CreateModel( - 'ProxyPony', - fields=[], - options={'proxy': True}, - bases=['%s.Pony' % app_label], - )) + operations.append( + migrations.CreateModel( + "ProxyPony", + fields=[], + options={"proxy": True}, + bases=["%s.Pony" % app_label], + ) + ) if manager_model: from .models import FoodManager, FoodQuerySet - operations.append(migrations.CreateModel( - 'Food', - fields=[ - ('id', models.AutoField(primary_key=True)), - ], - managers=[ - ('food_qs', FoodQuerySet.as_manager()), - ('food_mgr', FoodManager('a', 'b')), - ('food_mgr_kwargs', FoodManager('x', 'y', 3, 4)), - ] - )) + + operations.append( + migrations.CreateModel( + "Food", + fields=[ + ("id", models.AutoField(primary_key=True)), + ], + managers=[ + ("food_qs", FoodQuerySet.as_manager()), + ("food_mgr", FoodManager("a", "b")), + ("food_mgr_kwargs", FoodManager("x", "y", 3, 4)), + ], + ) + ) return self.apply_operations(app_label, ProjectState(), operations) |
