summaryrefslogtreecommitdiff
path: root/django/test/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'django/test/utils.py')
-rw-r--r--django/test/utils.py153
1 files changed, 152 insertions, 1 deletions
diff --git a/django/test/utils.py b/django/test/utils.py
index 764f1c84a9..b846fa2655 100644
--- a/django/test/utils.py
+++ b/django/test/utils.py
@@ -1,3 +1,4 @@
+import collections
import logging
import re
import sys
@@ -12,8 +13,9 @@ from django.apps import apps
from django.apps.registry import Apps
from django.conf import UserSettingsHolder, settings
from django.core import mail
+from django.core.exceptions import ImproperlyConfigured
from django.core.signals import request_started
-from django.db import reset_queries
+from django.db import DEFAULT_DB_ALIAS, connections, reset_queries
from django.db.models.options import Options
from django.template import Template
from django.test.signals import setting_changed, template_rendered
@@ -155,6 +157,155 @@ def teardown_test_environment():
del mail.outbox
+def setup_databases(verbosity, interactive, keepdb=False, debug_sql=False, parallel=0, **kwargs):
+ """
+ Create the test databases.
+ """
+ test_databases, mirrored_aliases = get_unique_databases_and_mirrors()
+
+ old_names = []
+
+ for signature, (db_name, aliases) in test_databases.items():
+ first_alias = None
+ for alias in aliases:
+ connection = connections[alias]
+ old_names.append((connection, db_name, first_alias is None))
+
+ # Actually create the database for the first connection
+ if first_alias is None:
+ first_alias = alias
+ connection.creation.create_test_db(
+ verbosity=verbosity,
+ autoclobber=not interactive,
+ keepdb=keepdb,
+ serialize=connection.settings_dict.get('TEST', {}).get('SERIALIZE', True),
+ )
+ if parallel > 1:
+ for index in range(parallel):
+ connection.creation.clone_test_db(
+ number=index + 1,
+ verbosity=verbosity,
+ keepdb=keepdb,
+ )
+ # Configure all other connections as mirrors of the first one
+ else:
+ connections[alias].creation.set_as_test_mirror(connections[first_alias].settings_dict)
+
+ # Configure the test mirrors.
+ for alias, mirror_alias in mirrored_aliases.items():
+ connections[alias].creation.set_as_test_mirror(
+ connections[mirror_alias].settings_dict)
+
+ if debug_sql:
+ for alias in connections:
+ connections[alias].force_debug_cursor = True
+
+ return old_names
+
+
+def dependency_ordered(test_databases, dependencies):
+ """
+ Reorder test_databases into an order that honors the dependencies
+ described in TEST[DEPENDENCIES].
+ """
+ ordered_test_databases = []
+ resolved_databases = set()
+
+ # Maps db signature to dependencies of all its aliases
+ dependencies_map = {}
+
+ # Check that no database depends on its own alias
+ for sig, (_, aliases) in test_databases:
+ all_deps = set()
+ for alias in aliases:
+ all_deps.update(dependencies.get(alias, []))
+ if not all_deps.isdisjoint(aliases):
+ raise ImproperlyConfigured(
+ "Circular dependency: databases %r depend on each other, "
+ "but are aliases." % aliases
+ )
+ dependencies_map[sig] = all_deps
+
+ while test_databases:
+ changed = False
+ deferred = []
+
+ # Try to find a DB that has all its dependencies met
+ for signature, (db_name, aliases) in test_databases:
+ if dependencies_map[signature].issubset(resolved_databases):
+ resolved_databases.update(aliases)
+ ordered_test_databases.append((signature, (db_name, aliases)))
+ changed = True
+ else:
+ deferred.append((signature, (db_name, aliases)))
+
+ if not changed:
+ raise ImproperlyConfigured("Circular dependency in TEST[DEPENDENCIES]")
+ test_databases = deferred
+ return ordered_test_databases
+
+
+def get_unique_databases_and_mirrors():
+ """
+ Figure out which databases actually need to be created.
+
+ Deduplicate entries in DATABASES that correspond the same database or are
+ configured as test mirrors.
+
+ Return two values:
+ - test_databases: ordered mapping of signatures to (name, list of aliases)
+ where all aliases share the same underlying database.
+ - mirrored_aliases: mapping of mirror aliases to original aliases.
+ """
+ mirrored_aliases = {}
+ test_databases = {}
+ dependencies = {}
+ default_sig = connections[DEFAULT_DB_ALIAS].creation.test_db_signature()
+
+ for alias in connections:
+ connection = connections[alias]
+ test_settings = connection.settings_dict['TEST']
+
+ if test_settings['MIRROR']:
+ # If the database is marked as a test mirror, save the alias.
+ mirrored_aliases[alias] = test_settings['MIRROR']
+ else:
+ # Store a tuple with DB parameters that uniquely identify it.
+ # If we have two aliases with the same values for that tuple,
+ # we only need to create the test database once.
+ item = test_databases.setdefault(
+ connection.creation.test_db_signature(),
+ (connection.settings_dict['NAME'], set())
+ )
+ item[1].add(alias)
+
+ if 'DEPENDENCIES' in test_settings:
+ dependencies[alias] = test_settings['DEPENDENCIES']
+ else:
+ if alias != DEFAULT_DB_ALIAS and connection.creation.test_db_signature() != default_sig:
+ dependencies[alias] = test_settings.get('DEPENDENCIES', [DEFAULT_DB_ALIAS])
+
+ test_databases = dependency_ordered(test_databases.items(), dependencies)
+ test_databases = collections.OrderedDict(test_databases)
+ return test_databases, mirrored_aliases
+
+
+def teardown_databases(old_config, verbosity, parallel=0, keepdb=False):
+ """
+ Destroy all the non-mirror databases.
+ """
+ for connection, old_name, destroy in old_config:
+ if destroy:
+ if parallel > 1:
+ for index in range(parallel):
+ connection.creation.destroy_test_db(
+ number=index + 1,
+ verbosity=verbosity,
+ keepdb=keepdb,
+ )
+ connection.creation.destroy_test_db(old_name, verbosity, keepdb)
+
+
def get_runner(settings, test_runner_class=None):
if not test_runner_class:
test_runner_class = settings.TEST_RUNNER