summaryrefslogtreecommitdiff
path: root/tests/migrations/test_executor.py
diff options
context:
space:
mode:
authordjango-bot <ops@djangoproject.com>2022-02-03 20:24:19 +0100
committerMariusz Felisiak <felisiak.mariusz@gmail.com>2022-02-07 20:37:05 +0100
commit9c19aff7c7561e3a82978a272ecdaad40dda5c00 (patch)
treef0506b668a013d0063e5fba3dbf4863b466713ba /tests/migrations/test_executor.py
parentf68fa8b45dfac545cfc4111d4e52804c86db68d3 (diff)
Refs #33476 -- Reformatted code with Black.
Diffstat (limited to 'tests/migrations/test_executor.py')
-rw-r--r--tests/migrations/test_executor.py486
1 files changed, 285 insertions, 201 deletions
diff --git a/tests/migrations/test_executor.py b/tests/migrations/test_executor.py
index cab6ac6cb5..6232b44b98 100644
--- a/tests/migrations/test_executor.py
+++ b/tests/migrations/test_executor.py
@@ -8,14 +8,17 @@ from django.db.migrations.graph import MigrationGraph
from django.db.migrations.recorder import MigrationRecorder
from django.db.migrations.state import ProjectState
from django.test import (
- SimpleTestCase, modify_settings, override_settings, skipUnlessDBFeature,
+ SimpleTestCase,
+ modify_settings,
+ override_settings,
+ skipUnlessDBFeature,
)
from django.test.utils import isolate_lru_cache
from .test_base import MigrationTestBase
-@modify_settings(INSTALLED_APPS={'append': 'migrations2'})
+@modify_settings(INSTALLED_APPS={"append": "migrations2"})
class ExecutorTests(MigrationTestBase):
"""
Tests the migration executor (full end-to-end running).
@@ -24,7 +27,12 @@ class ExecutorTests(MigrationTestBase):
test failures first, as they may be propagating into here.
"""
- available_apps = ["migrations", "migrations2", "django.contrib.auth", "django.contrib.contenttypes"]
+ available_apps = [
+ "migrations",
+ "migrations2",
+ "django.contrib.auth",
+ "django.contrib.contenttypes",
+ ]
@override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations"})
def test_run(self):
@@ -65,21 +73,28 @@ class ExecutorTests(MigrationTestBase):
self.assertTableNotExists("migrations_author")
self.assertTableNotExists("migrations_book")
- @override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations_squashed"})
+ @override_settings(
+ MIGRATION_MODULES={"migrations": "migrations.test_migrations_squashed"}
+ )
def test_run_with_squashed(self):
"""
Tests running a squashed migration from zero (should ignore what it replaces)
"""
executor = MigrationExecutor(connection)
# Check our leaf node is the squashed one
- leaves = [key for key in executor.loader.graph.leaf_nodes() if key[0] == "migrations"]
+ leaves = [
+ key for key in executor.loader.graph.leaf_nodes() if key[0] == "migrations"
+ ]
self.assertEqual(leaves, [("migrations", "0001_squashed_0002")])
# Check the plan
plan = executor.migration_plan([("migrations", "0001_squashed_0002")])
self.assertEqual(
plan,
[
- (executor.loader.graph.nodes["migrations", "0001_squashed_0002"], False),
+ (
+ executor.loader.graph.nodes["migrations", "0001_squashed_0002"],
+ False,
+ ),
],
)
# Were the tables there before?
@@ -106,29 +121,31 @@ class ExecutorTests(MigrationTestBase):
self.assertTableNotExists("migrations_book")
@override_settings(
- MIGRATION_MODULES={'migrations': 'migrations.test_migrations_squashed'},
+ MIGRATION_MODULES={"migrations": "migrations.test_migrations_squashed"},
)
def test_migrate_backward_to_squashed_migration(self):
executor = MigrationExecutor(connection)
try:
- self.assertTableNotExists('migrations_author')
- self.assertTableNotExists('migrations_book')
- executor.migrate([('migrations', '0001_squashed_0002')])
- self.assertTableExists('migrations_author')
- self.assertTableExists('migrations_book')
+ self.assertTableNotExists("migrations_author")
+ self.assertTableNotExists("migrations_book")
+ executor.migrate([("migrations", "0001_squashed_0002")])
+ self.assertTableExists("migrations_author")
+ self.assertTableExists("migrations_book")
executor.loader.build_graph()
# Migrate backward to a squashed migration.
- executor.migrate([('migrations', '0001_initial')])
- self.assertTableExists('migrations_author')
- self.assertTableNotExists('migrations_book')
+ executor.migrate([("migrations", "0001_initial")])
+ self.assertTableExists("migrations_author")
+ self.assertTableNotExists("migrations_book")
finally:
# Unmigrate everything.
executor = MigrationExecutor(connection)
- executor.migrate([('migrations', None)])
- self.assertTableNotExists('migrations_author')
- self.assertTableNotExists('migrations_book')
+ executor.migrate([("migrations", None)])
+ self.assertTableNotExists("migrations_author")
+ self.assertTableNotExists("migrations_book")
- @override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations_non_atomic"})
+ @override_settings(
+ MIGRATION_MODULES={"migrations": "migrations.test_migrations_non_atomic"}
+ )
def test_non_atomic_migration(self):
"""
Applying a non-atomic migration works as expected.
@@ -137,12 +154,16 @@ class ExecutorTests(MigrationTestBase):
with self.assertRaisesMessage(RuntimeError, "Abort migration"):
executor.migrate([("migrations", "0001_initial")])
self.assertTableExists("migrations_publisher")
- migrations_apps = executor.loader.project_state(("migrations", "0001_initial")).apps
+ migrations_apps = executor.loader.project_state(
+ ("migrations", "0001_initial")
+ ).apps
Publisher = migrations_apps.get_model("migrations", "Publisher")
self.assertTrue(Publisher.objects.exists())
self.assertTableNotExists("migrations_book")
- @override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations_atomic_operation"})
+ @override_settings(
+ MIGRATION_MODULES={"migrations": "migrations.test_migrations_atomic_operation"}
+ )
def test_atomic_operation_in_non_atomic_migration(self):
"""
An atomic operation is properly rolled back inside a non-atomic
@@ -151,7 +172,9 @@ class ExecutorTests(MigrationTestBase):
executor = MigrationExecutor(connection)
with self.assertRaisesMessage(RuntimeError, "Abort migration"):
executor.migrate([("migrations", "0001_initial")])
- migrations_apps = executor.loader.project_state(("migrations", "0001_initial")).apps
+ migrations_apps = executor.loader.project_state(
+ ("migrations", "0001_initial")
+ ).apps
Editor = migrations_apps.get_model("migrations", "Editor")
self.assertFalse(Editor.objects.exists())
# Record previous migration as successful.
@@ -163,10 +186,12 @@ class ExecutorTests(MigrationTestBase):
executor.migrate([("migrations", None)])
self.assertFalse(Editor.objects.exists())
- @override_settings(MIGRATION_MODULES={
- "migrations": "migrations.test_migrations",
- "migrations2": "migrations2.test_migrations_2",
- })
+ @override_settings(
+ MIGRATION_MODULES={
+ "migrations": "migrations.test_migrations",
+ "migrations2": "migrations2.test_migrations_2",
+ }
+ )
def test_empty_plan(self):
"""
Re-planning a full migration of a fully-migrated set doesn't
@@ -179,10 +204,12 @@ class ExecutorTests(MigrationTestBase):
"""
# Make the initial plan, check it
executor = MigrationExecutor(connection)
- plan = executor.migration_plan([
- ("migrations", "0002_second"),
- ("migrations2", "0001_initial"),
- ])
+ plan = executor.migration_plan(
+ [
+ ("migrations", "0002_second"),
+ ("migrations2", "0001_initial"),
+ ]
+ )
self.assertEqual(
plan,
[
@@ -192,35 +219,40 @@ class ExecutorTests(MigrationTestBase):
],
)
# Fake-apply all migrations
- executor.migrate([
- ("migrations", "0002_second"),
- ("migrations2", "0001_initial")
- ], fake=True)
+ executor.migrate(
+ [("migrations", "0002_second"), ("migrations2", "0001_initial")], fake=True
+ )
# Rebuild the graph to reflect the new DB state
executor.loader.build_graph()
# Now plan a second time and make sure it's empty
- plan = executor.migration_plan([
- ("migrations", "0002_second"),
- ("migrations2", "0001_initial"),
- ])
+ plan = executor.migration_plan(
+ [
+ ("migrations", "0002_second"),
+ ("migrations2", "0001_initial"),
+ ]
+ )
self.assertEqual(plan, [])
# The resulting state should include applied migrations.
- state = executor.migrate([
- ("migrations", "0002_second"),
- ("migrations2", "0001_initial"),
- ])
- self.assertIn(('migrations', 'book'), state.models)
- self.assertIn(('migrations', 'author'), state.models)
- self.assertIn(('migrations2', 'otherauthor'), state.models)
+ state = executor.migrate(
+ [
+ ("migrations", "0002_second"),
+ ("migrations2", "0001_initial"),
+ ]
+ )
+ self.assertIn(("migrations", "book"), state.models)
+ self.assertIn(("migrations", "author"), state.models)
+ self.assertIn(("migrations2", "otherauthor"), state.models)
# Erase all the fake records
executor.recorder.record_unapplied("migrations2", "0001_initial")
executor.recorder.record_unapplied("migrations", "0002_second")
executor.recorder.record_unapplied("migrations", "0001_initial")
- @override_settings(MIGRATION_MODULES={
- "migrations": "migrations.test_migrations",
- "migrations2": "migrations2.test_migrations_2_no_deps",
- })
+ @override_settings(
+ MIGRATION_MODULES={
+ "migrations": "migrations.test_migrations",
+ "migrations2": "migrations2.test_migrations_2_no_deps",
+ }
+ )
def test_mixed_plan_not_supported(self):
"""
Although the MigrationExecutor interfaces allows for mixed migration
@@ -240,19 +272,25 @@ class ExecutorTests(MigrationTestBase):
executor.migrate(None, plan)
# Rebuild the graph to reflect the new DB state
executor.loader.build_graph()
- self.assertIn(('migrations', '0001_initial'), executor.loader.applied_migrations)
- self.assertIn(('migrations', '0002_second'), executor.loader.applied_migrations)
- self.assertNotIn(('migrations2', '0001_initial'), executor.loader.applied_migrations)
+ self.assertIn(
+ ("migrations", "0001_initial"), executor.loader.applied_migrations
+ )
+ self.assertIn(("migrations", "0002_second"), executor.loader.applied_migrations)
+ self.assertNotIn(
+ ("migrations2", "0001_initial"), executor.loader.applied_migrations
+ )
# Generate mixed plan
- plan = executor.migration_plan([
- ("migrations", None),
- ("migrations2", "0001_initial"),
- ])
+ plan = executor.migration_plan(
+ [
+ ("migrations", None),
+ ("migrations2", "0001_initial"),
+ ]
+ )
msg = (
- 'Migration plans with both forwards and backwards migrations are '
- 'not supported. Please split your migration process into separate '
- 'plans of only forwards OR backwards migrations.'
+ "Migration plans with both forwards and backwards migrations are "
+ "not supported. Please split your migration process into separate "
+ "plans of only forwards OR backwards migrations."
)
with self.assertRaisesMessage(InvalidMigrationPlan, msg) as cm:
executor.migrate(None, plan)
@@ -266,10 +304,12 @@ class ExecutorTests(MigrationTestBase):
)
# Rebuild the graph to reflect the new DB state
executor.loader.build_graph()
- executor.migrate([
- ("migrations", None),
- ("migrations2", None),
- ])
+ executor.migrate(
+ [
+ ("migrations", None),
+ ("migrations2", None),
+ ]
+ )
# Are the tables gone?
self.assertTableNotExists("migrations_author")
self.assertTableNotExists("migrations_book")
@@ -284,6 +324,7 @@ class ExecutorTests(MigrationTestBase):
def fake_storer(phase, migration=None, fake=None):
state["faked"] = fake
+
executor = MigrationExecutor(connection, progress_callback=fake_storer)
# Were the tables there before?
self.assertTableNotExists("migrations_author")
@@ -347,37 +388,37 @@ class ExecutorTests(MigrationTestBase):
"""
with isolate_lru_cache(global_apps.get_swappable_settings_name):
executor = MigrationExecutor(connection)
- self.assertTableNotExists('migrations_author')
- self.assertTableNotExists('migrations_tribble')
+ self.assertTableNotExists("migrations_author")
+ self.assertTableNotExists("migrations_tribble")
# Migrate forwards
- executor.migrate([('migrations', '0001_initial')])
- self.assertTableExists('migrations_author')
- self.assertTableExists('migrations_tribble')
+ executor.migrate([("migrations", "0001_initial")])
+ self.assertTableExists("migrations_author")
+ self.assertTableExists("migrations_tribble")
# The soft-application detection works.
# Change table_names to not return auth_user during this as it
# wouldn't be there in a normal run, and ensure migrations.Author
# exists in the global app registry temporarily.
old_table_names = connection.introspection.table_names
connection.introspection.table_names = lambda c: [
- x for x in old_table_names(c) if x != 'auth_user'
+ x for x in old_table_names(c) if x != "auth_user"
]
migrations_apps = executor.loader.project_state(
- ('migrations', '0001_initial'),
+ ("migrations", "0001_initial"),
).apps
- global_apps.get_app_config('migrations').models['author'] = (
- migrations_apps.get_model('migrations', 'author')
- )
+ global_apps.get_app_config("migrations").models[
+ "author"
+ ] = migrations_apps.get_model("migrations", "author")
try:
- migration = executor.loader.get_migration('auth', '0001_initial')
+ migration = executor.loader.get_migration("auth", "0001_initial")
self.assertIs(executor.detect_soft_applied(None, migration)[0], True)
finally:
connection.introspection.table_names = old_table_names
- del global_apps.get_app_config('migrations').models['author']
+ del global_apps.get_app_config("migrations").models["author"]
# Migrate back to clean up the database.
executor.loader.build_graph()
- executor.migrate([('migrations', None)])
- self.assertTableNotExists('migrations_author')
- self.assertTableNotExists('migrations_tribble')
+ executor.migrate([("migrations", None)])
+ self.assertTableNotExists("migrations_author")
+ self.assertTableNotExists("migrations_tribble")
@override_settings(
MIGRATION_MODULES={
@@ -442,7 +483,7 @@ class ExecutorTests(MigrationTestBase):
INSTALLED_APPS=[
"migrations.migrations_test_apps.lookuperror_a",
"migrations.migrations_test_apps.lookuperror_b",
- "migrations.migrations_test_apps.lookuperror_c"
+ "migrations.migrations_test_apps.lookuperror_c",
]
)
def test_unrelated_model_lookups_forwards(self):
@@ -463,10 +504,12 @@ class ExecutorTests(MigrationTestBase):
# Migrate forwards -- This led to a lookup LookupErrors because
# lookuperror_b.B2 is already applied
- executor.migrate([
- ("lookuperror_a", "0004_a4"),
- ("lookuperror_c", "0003_c3"),
- ])
+ executor.migrate(
+ [
+ ("lookuperror_a", "0004_a4"),
+ ("lookuperror_c", "0003_c3"),
+ ]
+ )
self.assertTableExists("lookuperror_a_a4")
self.assertTableExists("lookuperror_c_c3")
@@ -474,11 +517,13 @@ class ExecutorTests(MigrationTestBase):
executor.loader.build_graph()
finally:
# Cleanup
- executor.migrate([
- ("lookuperror_a", None),
- ("lookuperror_b", None),
- ("lookuperror_c", None),
- ])
+ executor.migrate(
+ [
+ ("lookuperror_a", None),
+ ("lookuperror_b", None),
+ ("lookuperror_c", None),
+ ]
+ )
self.assertTableNotExists("lookuperror_a_a1")
self.assertTableNotExists("lookuperror_b_b1")
self.assertTableNotExists("lookuperror_c_c1")
@@ -487,7 +532,7 @@ class ExecutorTests(MigrationTestBase):
INSTALLED_APPS=[
"migrations.migrations_test_apps.lookuperror_a",
"migrations.migrations_test_apps.lookuperror_b",
- "migrations.migrations_test_apps.lookuperror_c"
+ "migrations.migrations_test_apps.lookuperror_c",
]
)
def test_unrelated_model_lookups_backwards(self):
@@ -501,11 +546,13 @@ class ExecutorTests(MigrationTestBase):
self.assertTableNotExists("lookuperror_a_a1")
self.assertTableNotExists("lookuperror_b_b1")
self.assertTableNotExists("lookuperror_c_c1")
- executor.migrate([
- ("lookuperror_a", "0004_a4"),
- ("lookuperror_b", "0003_b3"),
- ("lookuperror_c", "0003_c3"),
- ])
+ executor.migrate(
+ [
+ ("lookuperror_a", "0004_a4"),
+ ("lookuperror_b", "0003_b3"),
+ ("lookuperror_c", "0003_c3"),
+ ]
+ )
self.assertTableExists("lookuperror_b_b3")
self.assertTableExists("lookuperror_a_a4")
self.assertTableExists("lookuperror_c_c3")
@@ -520,18 +567,15 @@ class ExecutorTests(MigrationTestBase):
executor.loader.build_graph()
finally:
# Cleanup
- executor.migrate([
- ("lookuperror_b", None),
- ("lookuperror_c", None)
- ])
+ executor.migrate([("lookuperror_b", None), ("lookuperror_c", None)])
self.assertTableNotExists("lookuperror_a_a1")
self.assertTableNotExists("lookuperror_b_b1")
self.assertTableNotExists("lookuperror_c_c1")
@override_settings(
INSTALLED_APPS=[
- 'migrations.migrations_test_apps.mutate_state_a',
- 'migrations.migrations_test_apps.mutate_state_b',
+ "migrations.migrations_test_apps.mutate_state_a",
+ "migrations.migrations_test_apps.mutate_state_b",
]
)
def test_unrelated_applied_migrations_mutate_state(self):
@@ -540,24 +584,32 @@ class ExecutorTests(MigrationTestBase):
state in both directions.
"""
executor = MigrationExecutor(connection)
- executor.migrate([
- ('mutate_state_b', '0002_add_field'),
- ])
+ executor.migrate(
+ [
+ ("mutate_state_b", "0002_add_field"),
+ ]
+ )
# Migrate forward.
executor.loader.build_graph()
- state = executor.migrate([
- ('mutate_state_a', '0001_initial'),
- ])
- self.assertIn('added', state.models['mutate_state_b', 'b'].fields)
+ state = executor.migrate(
+ [
+ ("mutate_state_a", "0001_initial"),
+ ]
+ )
+ self.assertIn("added", state.models["mutate_state_b", "b"].fields)
executor.loader.build_graph()
# Migrate backward.
- state = executor.migrate([
- ('mutate_state_a', None),
- ])
- self.assertIn('added', state.models['mutate_state_b', 'b'].fields)
- executor.migrate([
- ('mutate_state_b', None),
- ])
+ state = executor.migrate(
+ [
+ ("mutate_state_a", None),
+ ]
+ )
+ self.assertIn("added", state.models["mutate_state_b", "b"].fields)
+ executor.migrate(
+ [
+ ("mutate_state_b", None),
+ ]
+ )
@override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations"})
def test_process_callback(self):
@@ -573,17 +625,21 @@ class ExecutorTests(MigrationTestBase):
# Were the tables there before?
self.assertTableNotExists("migrations_author")
self.assertTableNotExists("migrations_tribble")
- executor.migrate([
- ("migrations", "0001_initial"),
- ("migrations", "0002_second"),
- ])
+ executor.migrate(
+ [
+ ("migrations", "0001_initial"),
+ ("migrations", "0002_second"),
+ ]
+ )
# Rebuild the graph to reflect the new DB state
executor.loader.build_graph()
- executor.migrate([
- ("migrations", None),
- ("migrations", None),
- ])
+ executor.migrate(
+ [
+ ("migrations", None),
+ ("migrations", None),
+ ]
+ )
self.assertTableNotExists("migrations_author")
self.assertTableNotExists("migrations_tribble")
@@ -591,16 +647,16 @@ class ExecutorTests(MigrationTestBase):
expected = [
("render_start",),
("render_success",),
- ("apply_start", migrations['migrations', '0001_initial'], False),
- ("apply_success", migrations['migrations', '0001_initial'], False),
- ("apply_start", migrations['migrations', '0002_second'], False),
- ("apply_success", migrations['migrations', '0002_second'], False),
+ ("apply_start", migrations["migrations", "0001_initial"], False),
+ ("apply_success", migrations["migrations", "0001_initial"], False),
+ ("apply_start", migrations["migrations", "0002_second"], False),
+ ("apply_success", migrations["migrations", "0002_second"], False),
("render_start",),
("render_success",),
- ("unapply_start", migrations['migrations', '0002_second'], False),
- ("unapply_success", migrations['migrations', '0002_second'], False),
- ("unapply_start", migrations['migrations', '0001_initial'], False),
- ("unapply_success", migrations['migrations', '0001_initial'], False),
+ ("unapply_start", migrations["migrations", "0002_second"], False),
+ ("unapply_success", migrations["migrations", "0002_second"], False),
+ ("unapply_start", migrations["migrations", "0001_initial"], False),
+ ("unapply_success", migrations["migrations", "0001_initial"], False),
]
self.assertEqual(call_args_list, expected)
@@ -616,10 +672,12 @@ class ExecutorTests(MigrationTestBase):
self.assertTableNotExists("author_app_author")
self.assertTableNotExists("book_app_book")
# Apply initial migrations
- executor.migrate([
- ("author_app", "0001_initial"),
- ("book_app", "0001_initial"),
- ])
+ executor.migrate(
+ [
+ ("author_app", "0001_initial"),
+ ("book_app", "0001_initial"),
+ ]
+ )
self.assertTableExists("author_app_author")
self.assertTableExists("book_app_book")
# Rebuild the graph to reflect the new DB state
@@ -640,7 +698,9 @@ class ExecutorTests(MigrationTestBase):
self.assertTableNotExists("book_app_book")
executor.migrate([("author_app", None)], fake=True)
- @override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations_squashed"})
+ @override_settings(
+ MIGRATION_MODULES={"migrations": "migrations.test_migrations_squashed"}
+ )
def test_apply_all_replaced_marks_replacement_as_applied(self):
"""
Applying all replaced migrations marks replacement as applied (#24628).
@@ -663,7 +723,9 @@ class ExecutorTests(MigrationTestBase):
recorder.applied_migrations(),
)
- @override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations_squashed"})
+ @override_settings(
+ MIGRATION_MODULES={"migrations": "migrations.test_migrations_squashed"}
+ )
def test_migrate_marks_replacement_applied_even_if_it_did_nothing(self):
"""
A new squash migration will be marked as applied even if all its
@@ -684,59 +746,69 @@ class ExecutorTests(MigrationTestBase):
recorder.applied_migrations(),
)
- @override_settings(MIGRATION_MODULES={'migrations': 'migrations.test_migrations_squashed'})
+ @override_settings(
+ MIGRATION_MODULES={"migrations": "migrations.test_migrations_squashed"}
+ )
def test_migrate_marks_replacement_unapplied(self):
executor = MigrationExecutor(connection)
- executor.migrate([('migrations', '0001_squashed_0002')])
+ executor.migrate([("migrations", "0001_squashed_0002")])
try:
self.assertIn(
- ('migrations', '0001_squashed_0002'),
+ ("migrations", "0001_squashed_0002"),
executor.recorder.applied_migrations(),
)
finally:
executor.loader.build_graph()
- executor.migrate([('migrations', None)])
+ executor.migrate([("migrations", None)])
self.assertNotIn(
- ('migrations', '0001_squashed_0002'),
+ ("migrations", "0001_squashed_0002"),
executor.recorder.applied_migrations(),
)
# When the feature is False, the operation and the record won't be
# performed in a transaction and the test will systematically pass.
- @skipUnlessDBFeature('can_rollback_ddl')
+ @skipUnlessDBFeature("can_rollback_ddl")
def test_migrations_applied_and_recorded_atomically(self):
"""Migrations are applied and recorded atomically."""
+
class Migration(migrations.Migration):
operations = [
- migrations.CreateModel('model', [
- ('id', models.AutoField(primary_key=True)),
- ]),
+ migrations.CreateModel(
+ "model",
+ [
+ ("id", models.AutoField(primary_key=True)),
+ ],
+ ),
]
executor = MigrationExecutor(connection)
- with mock.patch('django.db.migrations.executor.MigrationExecutor.record_migration') as record_migration:
- record_migration.side_effect = RuntimeError('Recording migration failed.')
- with self.assertRaisesMessage(RuntimeError, 'Recording migration failed.'):
+ with mock.patch(
+ "django.db.migrations.executor.MigrationExecutor.record_migration"
+ ) as record_migration:
+ record_migration.side_effect = RuntimeError("Recording migration failed.")
+ with self.assertRaisesMessage(RuntimeError, "Recording migration failed."):
executor.apply_migration(
ProjectState(),
- Migration('0001_initial', 'record_migration'),
+ Migration("0001_initial", "record_migration"),
)
- executor.migrate([('migrations', '0001_initial')])
+ executor.migrate([("migrations", "0001_initial")])
# The migration isn't recorded as applied since it failed.
migration_recorder = MigrationRecorder(connection)
self.assertIs(
migration_recorder.migration_qs.filter(
- app='record_migration', name='0001_initial',
+ app="record_migration",
+ name="0001_initial",
).exists(),
False,
)
- self.assertTableNotExists('record_migration_model')
+ self.assertTableNotExists("record_migration_model")
def test_migrations_not_applied_on_deferred_sql_failure(self):
"""Migrations are not recorded if deferred SQL application fails."""
+
class DeferredSQL:
def __str__(self):
- raise DatabaseError('Failed to apply deferred SQL')
+ raise DatabaseError("Failed to apply deferred SQL")
class Migration(migrations.Migration):
atomic = False
@@ -745,21 +817,22 @@ class ExecutorTests(MigrationTestBase):
schema_editor.deferred_sql.append(DeferredSQL())
executor = MigrationExecutor(connection)
- with self.assertRaisesMessage(DatabaseError, 'Failed to apply deferred SQL'):
+ with self.assertRaisesMessage(DatabaseError, "Failed to apply deferred SQL"):
executor.apply_migration(
ProjectState(),
- Migration('0001_initial', 'deferred_sql'),
+ Migration("0001_initial", "deferred_sql"),
)
# The migration isn't recorded as applied since it failed.
migration_recorder = MigrationRecorder(connection)
self.assertIs(
migration_recorder.migration_qs.filter(
- app='deferred_sql', name='0001_initial',
+ app="deferred_sql",
+ name="0001_initial",
).exists(),
False,
)
- @mock.patch.object(MigrationRecorder, 'has_table', return_value=False)
+ @mock.patch.object(MigrationRecorder, "has_table", return_value=False)
def test_migrate_skips_schema_creation(self, mocked_has_table):
"""
The django_migrations table is not created if there are no migrations
@@ -780,15 +853,17 @@ class FakeLoader:
class FakeMigration:
"""Really all we need is any object with a debug-useful repr."""
+
def __init__(self, name):
self.name = name
def __repr__(self):
- return 'M<%s>' % self.name
+ return "M<%s>" % self.name
class ExecutorUnitTests(SimpleTestCase):
"""(More) isolated unit tests for executor methods."""
+
def test_minimize_rollbacks(self):
"""
Minimize unnecessary rollbacks in connected apps.
@@ -799,12 +874,12 @@ class ExecutorUnitTests(SimpleTestCase):
to be rolled back since we're not rolling back appA 0001), we migrate
to just before appA-0002.
"""
- a1_impl = FakeMigration('a1')
- a1 = ('a', '1')
- a2_impl = FakeMigration('a2')
- a2 = ('a', '2')
- b1_impl = FakeMigration('b1')
- b1 = ('b', '1')
+ a1_impl = FakeMigration("a1")
+ a1 = ("a", "1")
+ a2_impl = FakeMigration("a2")
+ a2 = ("a", "2")
+ b1_impl = FakeMigration("b1")
+ b1 = ("b", "1")
graph = MigrationGraph()
graph.add_node(a1, a1_impl)
graph.add_node(a2, a2_impl)
@@ -813,11 +888,14 @@ class ExecutorUnitTests(SimpleTestCase):
graph.add_dependency(None, a2, a1)
executor = MigrationExecutor(None)
- executor.loader = FakeLoader(graph, {
- a1: a1_impl,
- b1: b1_impl,
- a2: a2_impl,
- })
+ executor.loader = FakeLoader(
+ graph,
+ {
+ a1: a1_impl,
+ b1: b1_impl,
+ a2: a2_impl,
+ },
+ )
plan = executor.migration_plan({a1})
@@ -832,18 +910,18 @@ class ExecutorUnitTests(SimpleTestCase):
\ \
b: \- 1 <--- 2
"""
- a1_impl = FakeMigration('a1')
- a1 = ('a', '1')
- a2_impl = FakeMigration('a2')
- a2 = ('a', '2')
- a3_impl = FakeMigration('a3')
- a3 = ('a', '3')
- a4_impl = FakeMigration('a4')
- a4 = ('a', '4')
- b1_impl = FakeMigration('b1')
- b1 = ('b', '1')
- b2_impl = FakeMigration('b2')
- b2 = ('b', '2')
+ a1_impl = FakeMigration("a1")
+ a1 = ("a", "1")
+ a2_impl = FakeMigration("a2")
+ a2 = ("a", "2")
+ a3_impl = FakeMigration("a3")
+ a3 = ("a", "3")
+ a4_impl = FakeMigration("a4")
+ a4 = ("a", "4")
+ b1_impl = FakeMigration("b1")
+ b1 = ("b", "1")
+ b2_impl = FakeMigration("b2")
+ b2 = ("b", "2")
graph = MigrationGraph()
graph.add_node(a1, a1_impl)
graph.add_node(a2, a2_impl)
@@ -860,14 +938,17 @@ class ExecutorUnitTests(SimpleTestCase):
graph.add_dependency(None, b2, a2)
executor = MigrationExecutor(None)
- executor.loader = FakeLoader(graph, {
- a1: a1_impl,
- b1: b1_impl,
- a2: a2_impl,
- b2: b2_impl,
- a3: a3_impl,
- a4: a4_impl,
- })
+ executor.loader = FakeLoader(
+ graph,
+ {
+ a1: a1_impl,
+ b1: b1_impl,
+ a2: a2_impl,
+ b2: b2_impl,
+ a3: a3_impl,
+ a4: a4_impl,
+ },
+ )
plan = executor.migration_plan({a1})
@@ -886,14 +967,14 @@ class ExecutorUnitTests(SimpleTestCase):
If a1 is applied already and a2 is not, and we're asked to migrate to
a1, don't apply or unapply b1 or c1, regardless of their current state.
"""
- a1_impl = FakeMigration('a1')
- a1 = ('a', '1')
- a2_impl = FakeMigration('a2')
- a2 = ('a', '2')
- b1_impl = FakeMigration('b1')
- b1 = ('b', '1')
- c1_impl = FakeMigration('c1')
- c1 = ('c', '1')
+ a1_impl = FakeMigration("a1")
+ a1 = ("a", "1")
+ a2_impl = FakeMigration("a2")
+ a2 = ("a", "2")
+ b1_impl = FakeMigration("b1")
+ b1 = ("b", "1")
+ c1_impl = FakeMigration("c1")
+ c1 = ("c", "1")
graph = MigrationGraph()
graph.add_node(a1, a1_impl)
graph.add_node(a2, a2_impl)
@@ -904,10 +985,13 @@ class ExecutorUnitTests(SimpleTestCase):
graph.add_dependency(None, c1, a1)
executor = MigrationExecutor(None)
- executor.loader = FakeLoader(graph, {
- a1: a1_impl,
- b1: b1_impl,
- })
+ executor.loader = FakeLoader(
+ graph,
+ {
+ a1: a1_impl,
+ b1: b1_impl,
+ },
+ )
plan = executor.migration_plan({a1})