diff options
| author | django-bot <ops@djangoproject.com> | 2022-02-03 20:24:19 +0100 |
|---|---|---|
| committer | Mariusz Felisiak <felisiak.mariusz@gmail.com> | 2022-02-07 20:37:05 +0100 |
| commit | 9c19aff7c7561e3a82978a272ecdaad40dda5c00 (patch) | |
| tree | f0506b668a013d0063e5fba3dbf4863b466713ba /tests/migrations/test_executor.py | |
| parent | f68fa8b45dfac545cfc4111d4e52804c86db68d3 (diff) | |
Refs #33476 -- Reformatted code with Black.
Diffstat (limited to 'tests/migrations/test_executor.py')
| -rw-r--r-- | tests/migrations/test_executor.py | 486 |
1 files changed, 285 insertions, 201 deletions
diff --git a/tests/migrations/test_executor.py b/tests/migrations/test_executor.py index cab6ac6cb5..6232b44b98 100644 --- a/tests/migrations/test_executor.py +++ b/tests/migrations/test_executor.py @@ -8,14 +8,17 @@ from django.db.migrations.graph import MigrationGraph from django.db.migrations.recorder import MigrationRecorder from django.db.migrations.state import ProjectState from django.test import ( - SimpleTestCase, modify_settings, override_settings, skipUnlessDBFeature, + SimpleTestCase, + modify_settings, + override_settings, + skipUnlessDBFeature, ) from django.test.utils import isolate_lru_cache from .test_base import MigrationTestBase -@modify_settings(INSTALLED_APPS={'append': 'migrations2'}) +@modify_settings(INSTALLED_APPS={"append": "migrations2"}) class ExecutorTests(MigrationTestBase): """ Tests the migration executor (full end-to-end running). @@ -24,7 +27,12 @@ class ExecutorTests(MigrationTestBase): test failures first, as they may be propagating into here. """ - available_apps = ["migrations", "migrations2", "django.contrib.auth", "django.contrib.contenttypes"] + available_apps = [ + "migrations", + "migrations2", + "django.contrib.auth", + "django.contrib.contenttypes", + ] @override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations"}) def test_run(self): @@ -65,21 +73,28 @@ class ExecutorTests(MigrationTestBase): self.assertTableNotExists("migrations_author") self.assertTableNotExists("migrations_book") - @override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations_squashed"}) + @override_settings( + MIGRATION_MODULES={"migrations": "migrations.test_migrations_squashed"} + ) def test_run_with_squashed(self): """ Tests running a squashed migration from zero (should ignore what it replaces) """ executor = MigrationExecutor(connection) # Check our leaf node is the squashed one - leaves = [key for key in executor.loader.graph.leaf_nodes() if key[0] == "migrations"] + leaves = [ + key for key in executor.loader.graph.leaf_nodes() if key[0] == "migrations" + ] self.assertEqual(leaves, [("migrations", "0001_squashed_0002")]) # Check the plan plan = executor.migration_plan([("migrations", "0001_squashed_0002")]) self.assertEqual( plan, [ - (executor.loader.graph.nodes["migrations", "0001_squashed_0002"], False), + ( + executor.loader.graph.nodes["migrations", "0001_squashed_0002"], + False, + ), ], ) # Were the tables there before? @@ -106,29 +121,31 @@ class ExecutorTests(MigrationTestBase): self.assertTableNotExists("migrations_book") @override_settings( - MIGRATION_MODULES={'migrations': 'migrations.test_migrations_squashed'}, + MIGRATION_MODULES={"migrations": "migrations.test_migrations_squashed"}, ) def test_migrate_backward_to_squashed_migration(self): executor = MigrationExecutor(connection) try: - self.assertTableNotExists('migrations_author') - self.assertTableNotExists('migrations_book') - executor.migrate([('migrations', '0001_squashed_0002')]) - self.assertTableExists('migrations_author') - self.assertTableExists('migrations_book') + self.assertTableNotExists("migrations_author") + self.assertTableNotExists("migrations_book") + executor.migrate([("migrations", "0001_squashed_0002")]) + self.assertTableExists("migrations_author") + self.assertTableExists("migrations_book") executor.loader.build_graph() # Migrate backward to a squashed migration. - executor.migrate([('migrations', '0001_initial')]) - self.assertTableExists('migrations_author') - self.assertTableNotExists('migrations_book') + executor.migrate([("migrations", "0001_initial")]) + self.assertTableExists("migrations_author") + self.assertTableNotExists("migrations_book") finally: # Unmigrate everything. executor = MigrationExecutor(connection) - executor.migrate([('migrations', None)]) - self.assertTableNotExists('migrations_author') - self.assertTableNotExists('migrations_book') + executor.migrate([("migrations", None)]) + self.assertTableNotExists("migrations_author") + self.assertTableNotExists("migrations_book") - @override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations_non_atomic"}) + @override_settings( + MIGRATION_MODULES={"migrations": "migrations.test_migrations_non_atomic"} + ) def test_non_atomic_migration(self): """ Applying a non-atomic migration works as expected. @@ -137,12 +154,16 @@ class ExecutorTests(MigrationTestBase): with self.assertRaisesMessage(RuntimeError, "Abort migration"): executor.migrate([("migrations", "0001_initial")]) self.assertTableExists("migrations_publisher") - migrations_apps = executor.loader.project_state(("migrations", "0001_initial")).apps + migrations_apps = executor.loader.project_state( + ("migrations", "0001_initial") + ).apps Publisher = migrations_apps.get_model("migrations", "Publisher") self.assertTrue(Publisher.objects.exists()) self.assertTableNotExists("migrations_book") - @override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations_atomic_operation"}) + @override_settings( + MIGRATION_MODULES={"migrations": "migrations.test_migrations_atomic_operation"} + ) def test_atomic_operation_in_non_atomic_migration(self): """ An atomic operation is properly rolled back inside a non-atomic @@ -151,7 +172,9 @@ class ExecutorTests(MigrationTestBase): executor = MigrationExecutor(connection) with self.assertRaisesMessage(RuntimeError, "Abort migration"): executor.migrate([("migrations", "0001_initial")]) - migrations_apps = executor.loader.project_state(("migrations", "0001_initial")).apps + migrations_apps = executor.loader.project_state( + ("migrations", "0001_initial") + ).apps Editor = migrations_apps.get_model("migrations", "Editor") self.assertFalse(Editor.objects.exists()) # Record previous migration as successful. @@ -163,10 +186,12 @@ class ExecutorTests(MigrationTestBase): executor.migrate([("migrations", None)]) self.assertFalse(Editor.objects.exists()) - @override_settings(MIGRATION_MODULES={ - "migrations": "migrations.test_migrations", - "migrations2": "migrations2.test_migrations_2", - }) + @override_settings( + MIGRATION_MODULES={ + "migrations": "migrations.test_migrations", + "migrations2": "migrations2.test_migrations_2", + } + ) def test_empty_plan(self): """ Re-planning a full migration of a fully-migrated set doesn't @@ -179,10 +204,12 @@ class ExecutorTests(MigrationTestBase): """ # Make the initial plan, check it executor = MigrationExecutor(connection) - plan = executor.migration_plan([ - ("migrations", "0002_second"), - ("migrations2", "0001_initial"), - ]) + plan = executor.migration_plan( + [ + ("migrations", "0002_second"), + ("migrations2", "0001_initial"), + ] + ) self.assertEqual( plan, [ @@ -192,35 +219,40 @@ class ExecutorTests(MigrationTestBase): ], ) # Fake-apply all migrations - executor.migrate([ - ("migrations", "0002_second"), - ("migrations2", "0001_initial") - ], fake=True) + executor.migrate( + [("migrations", "0002_second"), ("migrations2", "0001_initial")], fake=True + ) # Rebuild the graph to reflect the new DB state executor.loader.build_graph() # Now plan a second time and make sure it's empty - plan = executor.migration_plan([ - ("migrations", "0002_second"), - ("migrations2", "0001_initial"), - ]) + plan = executor.migration_plan( + [ + ("migrations", "0002_second"), + ("migrations2", "0001_initial"), + ] + ) self.assertEqual(plan, []) # The resulting state should include applied migrations. - state = executor.migrate([ - ("migrations", "0002_second"), - ("migrations2", "0001_initial"), - ]) - self.assertIn(('migrations', 'book'), state.models) - self.assertIn(('migrations', 'author'), state.models) - self.assertIn(('migrations2', 'otherauthor'), state.models) + state = executor.migrate( + [ + ("migrations", "0002_second"), + ("migrations2", "0001_initial"), + ] + ) + self.assertIn(("migrations", "book"), state.models) + self.assertIn(("migrations", "author"), state.models) + self.assertIn(("migrations2", "otherauthor"), state.models) # Erase all the fake records executor.recorder.record_unapplied("migrations2", "0001_initial") executor.recorder.record_unapplied("migrations", "0002_second") executor.recorder.record_unapplied("migrations", "0001_initial") - @override_settings(MIGRATION_MODULES={ - "migrations": "migrations.test_migrations", - "migrations2": "migrations2.test_migrations_2_no_deps", - }) + @override_settings( + MIGRATION_MODULES={ + "migrations": "migrations.test_migrations", + "migrations2": "migrations2.test_migrations_2_no_deps", + } + ) def test_mixed_plan_not_supported(self): """ Although the MigrationExecutor interfaces allows for mixed migration @@ -240,19 +272,25 @@ class ExecutorTests(MigrationTestBase): executor.migrate(None, plan) # Rebuild the graph to reflect the new DB state executor.loader.build_graph() - self.assertIn(('migrations', '0001_initial'), executor.loader.applied_migrations) - self.assertIn(('migrations', '0002_second'), executor.loader.applied_migrations) - self.assertNotIn(('migrations2', '0001_initial'), executor.loader.applied_migrations) + self.assertIn( + ("migrations", "0001_initial"), executor.loader.applied_migrations + ) + self.assertIn(("migrations", "0002_second"), executor.loader.applied_migrations) + self.assertNotIn( + ("migrations2", "0001_initial"), executor.loader.applied_migrations + ) # Generate mixed plan - plan = executor.migration_plan([ - ("migrations", None), - ("migrations2", "0001_initial"), - ]) + plan = executor.migration_plan( + [ + ("migrations", None), + ("migrations2", "0001_initial"), + ] + ) msg = ( - 'Migration plans with both forwards and backwards migrations are ' - 'not supported. Please split your migration process into separate ' - 'plans of only forwards OR backwards migrations.' + "Migration plans with both forwards and backwards migrations are " + "not supported. Please split your migration process into separate " + "plans of only forwards OR backwards migrations." ) with self.assertRaisesMessage(InvalidMigrationPlan, msg) as cm: executor.migrate(None, plan) @@ -266,10 +304,12 @@ class ExecutorTests(MigrationTestBase): ) # Rebuild the graph to reflect the new DB state executor.loader.build_graph() - executor.migrate([ - ("migrations", None), - ("migrations2", None), - ]) + executor.migrate( + [ + ("migrations", None), + ("migrations2", None), + ] + ) # Are the tables gone? self.assertTableNotExists("migrations_author") self.assertTableNotExists("migrations_book") @@ -284,6 +324,7 @@ class ExecutorTests(MigrationTestBase): def fake_storer(phase, migration=None, fake=None): state["faked"] = fake + executor = MigrationExecutor(connection, progress_callback=fake_storer) # Were the tables there before? self.assertTableNotExists("migrations_author") @@ -347,37 +388,37 @@ class ExecutorTests(MigrationTestBase): """ with isolate_lru_cache(global_apps.get_swappable_settings_name): executor = MigrationExecutor(connection) - self.assertTableNotExists('migrations_author') - self.assertTableNotExists('migrations_tribble') + self.assertTableNotExists("migrations_author") + self.assertTableNotExists("migrations_tribble") # Migrate forwards - executor.migrate([('migrations', '0001_initial')]) - self.assertTableExists('migrations_author') - self.assertTableExists('migrations_tribble') + executor.migrate([("migrations", "0001_initial")]) + self.assertTableExists("migrations_author") + self.assertTableExists("migrations_tribble") # The soft-application detection works. # Change table_names to not return auth_user during this as it # wouldn't be there in a normal run, and ensure migrations.Author # exists in the global app registry temporarily. old_table_names = connection.introspection.table_names connection.introspection.table_names = lambda c: [ - x for x in old_table_names(c) if x != 'auth_user' + x for x in old_table_names(c) if x != "auth_user" ] migrations_apps = executor.loader.project_state( - ('migrations', '0001_initial'), + ("migrations", "0001_initial"), ).apps - global_apps.get_app_config('migrations').models['author'] = ( - migrations_apps.get_model('migrations', 'author') - ) + global_apps.get_app_config("migrations").models[ + "author" + ] = migrations_apps.get_model("migrations", "author") try: - migration = executor.loader.get_migration('auth', '0001_initial') + migration = executor.loader.get_migration("auth", "0001_initial") self.assertIs(executor.detect_soft_applied(None, migration)[0], True) finally: connection.introspection.table_names = old_table_names - del global_apps.get_app_config('migrations').models['author'] + del global_apps.get_app_config("migrations").models["author"] # Migrate back to clean up the database. executor.loader.build_graph() - executor.migrate([('migrations', None)]) - self.assertTableNotExists('migrations_author') - self.assertTableNotExists('migrations_tribble') + executor.migrate([("migrations", None)]) + self.assertTableNotExists("migrations_author") + self.assertTableNotExists("migrations_tribble") @override_settings( MIGRATION_MODULES={ @@ -442,7 +483,7 @@ class ExecutorTests(MigrationTestBase): INSTALLED_APPS=[ "migrations.migrations_test_apps.lookuperror_a", "migrations.migrations_test_apps.lookuperror_b", - "migrations.migrations_test_apps.lookuperror_c" + "migrations.migrations_test_apps.lookuperror_c", ] ) def test_unrelated_model_lookups_forwards(self): @@ -463,10 +504,12 @@ class ExecutorTests(MigrationTestBase): # Migrate forwards -- This led to a lookup LookupErrors because # lookuperror_b.B2 is already applied - executor.migrate([ - ("lookuperror_a", "0004_a4"), - ("lookuperror_c", "0003_c3"), - ]) + executor.migrate( + [ + ("lookuperror_a", "0004_a4"), + ("lookuperror_c", "0003_c3"), + ] + ) self.assertTableExists("lookuperror_a_a4") self.assertTableExists("lookuperror_c_c3") @@ -474,11 +517,13 @@ class ExecutorTests(MigrationTestBase): executor.loader.build_graph() finally: # Cleanup - executor.migrate([ - ("lookuperror_a", None), - ("lookuperror_b", None), - ("lookuperror_c", None), - ]) + executor.migrate( + [ + ("lookuperror_a", None), + ("lookuperror_b", None), + ("lookuperror_c", None), + ] + ) self.assertTableNotExists("lookuperror_a_a1") self.assertTableNotExists("lookuperror_b_b1") self.assertTableNotExists("lookuperror_c_c1") @@ -487,7 +532,7 @@ class ExecutorTests(MigrationTestBase): INSTALLED_APPS=[ "migrations.migrations_test_apps.lookuperror_a", "migrations.migrations_test_apps.lookuperror_b", - "migrations.migrations_test_apps.lookuperror_c" + "migrations.migrations_test_apps.lookuperror_c", ] ) def test_unrelated_model_lookups_backwards(self): @@ -501,11 +546,13 @@ class ExecutorTests(MigrationTestBase): self.assertTableNotExists("lookuperror_a_a1") self.assertTableNotExists("lookuperror_b_b1") self.assertTableNotExists("lookuperror_c_c1") - executor.migrate([ - ("lookuperror_a", "0004_a4"), - ("lookuperror_b", "0003_b3"), - ("lookuperror_c", "0003_c3"), - ]) + executor.migrate( + [ + ("lookuperror_a", "0004_a4"), + ("lookuperror_b", "0003_b3"), + ("lookuperror_c", "0003_c3"), + ] + ) self.assertTableExists("lookuperror_b_b3") self.assertTableExists("lookuperror_a_a4") self.assertTableExists("lookuperror_c_c3") @@ -520,18 +567,15 @@ class ExecutorTests(MigrationTestBase): executor.loader.build_graph() finally: # Cleanup - executor.migrate([ - ("lookuperror_b", None), - ("lookuperror_c", None) - ]) + executor.migrate([("lookuperror_b", None), ("lookuperror_c", None)]) self.assertTableNotExists("lookuperror_a_a1") self.assertTableNotExists("lookuperror_b_b1") self.assertTableNotExists("lookuperror_c_c1") @override_settings( INSTALLED_APPS=[ - 'migrations.migrations_test_apps.mutate_state_a', - 'migrations.migrations_test_apps.mutate_state_b', + "migrations.migrations_test_apps.mutate_state_a", + "migrations.migrations_test_apps.mutate_state_b", ] ) def test_unrelated_applied_migrations_mutate_state(self): @@ -540,24 +584,32 @@ class ExecutorTests(MigrationTestBase): state in both directions. """ executor = MigrationExecutor(connection) - executor.migrate([ - ('mutate_state_b', '0002_add_field'), - ]) + executor.migrate( + [ + ("mutate_state_b", "0002_add_field"), + ] + ) # Migrate forward. executor.loader.build_graph() - state = executor.migrate([ - ('mutate_state_a', '0001_initial'), - ]) - self.assertIn('added', state.models['mutate_state_b', 'b'].fields) + state = executor.migrate( + [ + ("mutate_state_a", "0001_initial"), + ] + ) + self.assertIn("added", state.models["mutate_state_b", "b"].fields) executor.loader.build_graph() # Migrate backward. - state = executor.migrate([ - ('mutate_state_a', None), - ]) - self.assertIn('added', state.models['mutate_state_b', 'b'].fields) - executor.migrate([ - ('mutate_state_b', None), - ]) + state = executor.migrate( + [ + ("mutate_state_a", None), + ] + ) + self.assertIn("added", state.models["mutate_state_b", "b"].fields) + executor.migrate( + [ + ("mutate_state_b", None), + ] + ) @override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations"}) def test_process_callback(self): @@ -573,17 +625,21 @@ class ExecutorTests(MigrationTestBase): # Were the tables there before? self.assertTableNotExists("migrations_author") self.assertTableNotExists("migrations_tribble") - executor.migrate([ - ("migrations", "0001_initial"), - ("migrations", "0002_second"), - ]) + executor.migrate( + [ + ("migrations", "0001_initial"), + ("migrations", "0002_second"), + ] + ) # Rebuild the graph to reflect the new DB state executor.loader.build_graph() - executor.migrate([ - ("migrations", None), - ("migrations", None), - ]) + executor.migrate( + [ + ("migrations", None), + ("migrations", None), + ] + ) self.assertTableNotExists("migrations_author") self.assertTableNotExists("migrations_tribble") @@ -591,16 +647,16 @@ class ExecutorTests(MigrationTestBase): expected = [ ("render_start",), ("render_success",), - ("apply_start", migrations['migrations', '0001_initial'], False), - ("apply_success", migrations['migrations', '0001_initial'], False), - ("apply_start", migrations['migrations', '0002_second'], False), - ("apply_success", migrations['migrations', '0002_second'], False), + ("apply_start", migrations["migrations", "0001_initial"], False), + ("apply_success", migrations["migrations", "0001_initial"], False), + ("apply_start", migrations["migrations", "0002_second"], False), + ("apply_success", migrations["migrations", "0002_second"], False), ("render_start",), ("render_success",), - ("unapply_start", migrations['migrations', '0002_second'], False), - ("unapply_success", migrations['migrations', '0002_second'], False), - ("unapply_start", migrations['migrations', '0001_initial'], False), - ("unapply_success", migrations['migrations', '0001_initial'], False), + ("unapply_start", migrations["migrations", "0002_second"], False), + ("unapply_success", migrations["migrations", "0002_second"], False), + ("unapply_start", migrations["migrations", "0001_initial"], False), + ("unapply_success", migrations["migrations", "0001_initial"], False), ] self.assertEqual(call_args_list, expected) @@ -616,10 +672,12 @@ class ExecutorTests(MigrationTestBase): self.assertTableNotExists("author_app_author") self.assertTableNotExists("book_app_book") # Apply initial migrations - executor.migrate([ - ("author_app", "0001_initial"), - ("book_app", "0001_initial"), - ]) + executor.migrate( + [ + ("author_app", "0001_initial"), + ("book_app", "0001_initial"), + ] + ) self.assertTableExists("author_app_author") self.assertTableExists("book_app_book") # Rebuild the graph to reflect the new DB state @@ -640,7 +698,9 @@ class ExecutorTests(MigrationTestBase): self.assertTableNotExists("book_app_book") executor.migrate([("author_app", None)], fake=True) - @override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations_squashed"}) + @override_settings( + MIGRATION_MODULES={"migrations": "migrations.test_migrations_squashed"} + ) def test_apply_all_replaced_marks_replacement_as_applied(self): """ Applying all replaced migrations marks replacement as applied (#24628). @@ -663,7 +723,9 @@ class ExecutorTests(MigrationTestBase): recorder.applied_migrations(), ) - @override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations_squashed"}) + @override_settings( + MIGRATION_MODULES={"migrations": "migrations.test_migrations_squashed"} + ) def test_migrate_marks_replacement_applied_even_if_it_did_nothing(self): """ A new squash migration will be marked as applied even if all its @@ -684,59 +746,69 @@ class ExecutorTests(MigrationTestBase): recorder.applied_migrations(), ) - @override_settings(MIGRATION_MODULES={'migrations': 'migrations.test_migrations_squashed'}) + @override_settings( + MIGRATION_MODULES={"migrations": "migrations.test_migrations_squashed"} + ) def test_migrate_marks_replacement_unapplied(self): executor = MigrationExecutor(connection) - executor.migrate([('migrations', '0001_squashed_0002')]) + executor.migrate([("migrations", "0001_squashed_0002")]) try: self.assertIn( - ('migrations', '0001_squashed_0002'), + ("migrations", "0001_squashed_0002"), executor.recorder.applied_migrations(), ) finally: executor.loader.build_graph() - executor.migrate([('migrations', None)]) + executor.migrate([("migrations", None)]) self.assertNotIn( - ('migrations', '0001_squashed_0002'), + ("migrations", "0001_squashed_0002"), executor.recorder.applied_migrations(), ) # When the feature is False, the operation and the record won't be # performed in a transaction and the test will systematically pass. - @skipUnlessDBFeature('can_rollback_ddl') + @skipUnlessDBFeature("can_rollback_ddl") def test_migrations_applied_and_recorded_atomically(self): """Migrations are applied and recorded atomically.""" + class Migration(migrations.Migration): operations = [ - migrations.CreateModel('model', [ - ('id', models.AutoField(primary_key=True)), - ]), + migrations.CreateModel( + "model", + [ + ("id", models.AutoField(primary_key=True)), + ], + ), ] executor = MigrationExecutor(connection) - with mock.patch('django.db.migrations.executor.MigrationExecutor.record_migration') as record_migration: - record_migration.side_effect = RuntimeError('Recording migration failed.') - with self.assertRaisesMessage(RuntimeError, 'Recording migration failed.'): + with mock.patch( + "django.db.migrations.executor.MigrationExecutor.record_migration" + ) as record_migration: + record_migration.side_effect = RuntimeError("Recording migration failed.") + with self.assertRaisesMessage(RuntimeError, "Recording migration failed."): executor.apply_migration( ProjectState(), - Migration('0001_initial', 'record_migration'), + Migration("0001_initial", "record_migration"), ) - executor.migrate([('migrations', '0001_initial')]) + executor.migrate([("migrations", "0001_initial")]) # The migration isn't recorded as applied since it failed. migration_recorder = MigrationRecorder(connection) self.assertIs( migration_recorder.migration_qs.filter( - app='record_migration', name='0001_initial', + app="record_migration", + name="0001_initial", ).exists(), False, ) - self.assertTableNotExists('record_migration_model') + self.assertTableNotExists("record_migration_model") def test_migrations_not_applied_on_deferred_sql_failure(self): """Migrations are not recorded if deferred SQL application fails.""" + class DeferredSQL: def __str__(self): - raise DatabaseError('Failed to apply deferred SQL') + raise DatabaseError("Failed to apply deferred SQL") class Migration(migrations.Migration): atomic = False @@ -745,21 +817,22 @@ class ExecutorTests(MigrationTestBase): schema_editor.deferred_sql.append(DeferredSQL()) executor = MigrationExecutor(connection) - with self.assertRaisesMessage(DatabaseError, 'Failed to apply deferred SQL'): + with self.assertRaisesMessage(DatabaseError, "Failed to apply deferred SQL"): executor.apply_migration( ProjectState(), - Migration('0001_initial', 'deferred_sql'), + Migration("0001_initial", "deferred_sql"), ) # The migration isn't recorded as applied since it failed. migration_recorder = MigrationRecorder(connection) self.assertIs( migration_recorder.migration_qs.filter( - app='deferred_sql', name='0001_initial', + app="deferred_sql", + name="0001_initial", ).exists(), False, ) - @mock.patch.object(MigrationRecorder, 'has_table', return_value=False) + @mock.patch.object(MigrationRecorder, "has_table", return_value=False) def test_migrate_skips_schema_creation(self, mocked_has_table): """ The django_migrations table is not created if there are no migrations @@ -780,15 +853,17 @@ class FakeLoader: class FakeMigration: """Really all we need is any object with a debug-useful repr.""" + def __init__(self, name): self.name = name def __repr__(self): - return 'M<%s>' % self.name + return "M<%s>" % self.name class ExecutorUnitTests(SimpleTestCase): """(More) isolated unit tests for executor methods.""" + def test_minimize_rollbacks(self): """ Minimize unnecessary rollbacks in connected apps. @@ -799,12 +874,12 @@ class ExecutorUnitTests(SimpleTestCase): to be rolled back since we're not rolling back appA 0001), we migrate to just before appA-0002. """ - a1_impl = FakeMigration('a1') - a1 = ('a', '1') - a2_impl = FakeMigration('a2') - a2 = ('a', '2') - b1_impl = FakeMigration('b1') - b1 = ('b', '1') + a1_impl = FakeMigration("a1") + a1 = ("a", "1") + a2_impl = FakeMigration("a2") + a2 = ("a", "2") + b1_impl = FakeMigration("b1") + b1 = ("b", "1") graph = MigrationGraph() graph.add_node(a1, a1_impl) graph.add_node(a2, a2_impl) @@ -813,11 +888,14 @@ class ExecutorUnitTests(SimpleTestCase): graph.add_dependency(None, a2, a1) executor = MigrationExecutor(None) - executor.loader = FakeLoader(graph, { - a1: a1_impl, - b1: b1_impl, - a2: a2_impl, - }) + executor.loader = FakeLoader( + graph, + { + a1: a1_impl, + b1: b1_impl, + a2: a2_impl, + }, + ) plan = executor.migration_plan({a1}) @@ -832,18 +910,18 @@ class ExecutorUnitTests(SimpleTestCase): \ \ b: \- 1 <--- 2 """ - a1_impl = FakeMigration('a1') - a1 = ('a', '1') - a2_impl = FakeMigration('a2') - a2 = ('a', '2') - a3_impl = FakeMigration('a3') - a3 = ('a', '3') - a4_impl = FakeMigration('a4') - a4 = ('a', '4') - b1_impl = FakeMigration('b1') - b1 = ('b', '1') - b2_impl = FakeMigration('b2') - b2 = ('b', '2') + a1_impl = FakeMigration("a1") + a1 = ("a", "1") + a2_impl = FakeMigration("a2") + a2 = ("a", "2") + a3_impl = FakeMigration("a3") + a3 = ("a", "3") + a4_impl = FakeMigration("a4") + a4 = ("a", "4") + b1_impl = FakeMigration("b1") + b1 = ("b", "1") + b2_impl = FakeMigration("b2") + b2 = ("b", "2") graph = MigrationGraph() graph.add_node(a1, a1_impl) graph.add_node(a2, a2_impl) @@ -860,14 +938,17 @@ class ExecutorUnitTests(SimpleTestCase): graph.add_dependency(None, b2, a2) executor = MigrationExecutor(None) - executor.loader = FakeLoader(graph, { - a1: a1_impl, - b1: b1_impl, - a2: a2_impl, - b2: b2_impl, - a3: a3_impl, - a4: a4_impl, - }) + executor.loader = FakeLoader( + graph, + { + a1: a1_impl, + b1: b1_impl, + a2: a2_impl, + b2: b2_impl, + a3: a3_impl, + a4: a4_impl, + }, + ) plan = executor.migration_plan({a1}) @@ -886,14 +967,14 @@ class ExecutorUnitTests(SimpleTestCase): If a1 is applied already and a2 is not, and we're asked to migrate to a1, don't apply or unapply b1 or c1, regardless of their current state. """ - a1_impl = FakeMigration('a1') - a1 = ('a', '1') - a2_impl = FakeMigration('a2') - a2 = ('a', '2') - b1_impl = FakeMigration('b1') - b1 = ('b', '1') - c1_impl = FakeMigration('c1') - c1 = ('c', '1') + a1_impl = FakeMigration("a1") + a1 = ("a", "1") + a2_impl = FakeMigration("a2") + a2 = ("a", "2") + b1_impl = FakeMigration("b1") + b1 = ("b", "1") + c1_impl = FakeMigration("c1") + c1 = ("c", "1") graph = MigrationGraph() graph.add_node(a1, a1_impl) graph.add_node(a2, a2_impl) @@ -904,10 +985,13 @@ class ExecutorUnitTests(SimpleTestCase): graph.add_dependency(None, c1, a1) executor = MigrationExecutor(None) - executor.loader = FakeLoader(graph, { - a1: a1_impl, - b1: b1_impl, - }) + executor.loader = FakeLoader( + graph, + { + a1: a1_impl, + b1: b1_impl, + }, + ) plan = executor.migration_plan({a1}) |
