diff options
| author | Andreas Pelme <andreas@pelme.se> | 2016-07-03 00:20:14 +0200 |
|---|---|---|
| committer | Tim Graham <timograham@gmail.com> | 2016-08-17 13:55:04 -0400 |
| commit | e76981b43325da60b8a7475661df6cbfa7fda37e (patch) | |
| tree | e8d521a2063177186b11ff330698905a2ba4e69e /django/test/utils.py | |
| parent | ff445f4c19a0fdf6696c99efefa38b1409b8709f (diff) | |
Fixed #26840 -- Added test.utils.setup/teardown_databases().
Diffstat (limited to 'django/test/utils.py')
| -rw-r--r-- | django/test/utils.py | 153 |
1 files changed, 152 insertions, 1 deletions
diff --git a/django/test/utils.py b/django/test/utils.py index 764f1c84a9..b846fa2655 100644 --- a/django/test/utils.py +++ b/django/test/utils.py @@ -1,3 +1,4 @@ +import collections import logging import re import sys @@ -12,8 +13,9 @@ from django.apps import apps from django.apps.registry import Apps from django.conf import UserSettingsHolder, settings from django.core import mail +from django.core.exceptions import ImproperlyConfigured from django.core.signals import request_started -from django.db import reset_queries +from django.db import DEFAULT_DB_ALIAS, connections, reset_queries from django.db.models.options import Options from django.template import Template from django.test.signals import setting_changed, template_rendered @@ -155,6 +157,155 @@ def teardown_test_environment(): del mail.outbox +def setup_databases(verbosity, interactive, keepdb=False, debug_sql=False, parallel=0, **kwargs): + """ + Create the test databases. + """ + test_databases, mirrored_aliases = get_unique_databases_and_mirrors() + + old_names = [] + + for signature, (db_name, aliases) in test_databases.items(): + first_alias = None + for alias in aliases: + connection = connections[alias] + old_names.append((connection, db_name, first_alias is None)) + + # Actually create the database for the first connection + if first_alias is None: + first_alias = alias + connection.creation.create_test_db( + verbosity=verbosity, + autoclobber=not interactive, + keepdb=keepdb, + serialize=connection.settings_dict.get('TEST', {}).get('SERIALIZE', True), + ) + if parallel > 1: + for index in range(parallel): + connection.creation.clone_test_db( + number=index + 1, + verbosity=verbosity, + keepdb=keepdb, + ) + # Configure all other connections as mirrors of the first one + else: + connections[alias].creation.set_as_test_mirror(connections[first_alias].settings_dict) + + # Configure the test mirrors. + for alias, mirror_alias in mirrored_aliases.items(): + connections[alias].creation.set_as_test_mirror( + connections[mirror_alias].settings_dict) + + if debug_sql: + for alias in connections: + connections[alias].force_debug_cursor = True + + return old_names + + +def dependency_ordered(test_databases, dependencies): + """ + Reorder test_databases into an order that honors the dependencies + described in TEST[DEPENDENCIES]. + """ + ordered_test_databases = [] + resolved_databases = set() + + # Maps db signature to dependencies of all its aliases + dependencies_map = {} + + # Check that no database depends on its own alias + for sig, (_, aliases) in test_databases: + all_deps = set() + for alias in aliases: + all_deps.update(dependencies.get(alias, [])) + if not all_deps.isdisjoint(aliases): + raise ImproperlyConfigured( + "Circular dependency: databases %r depend on each other, " + "but are aliases." % aliases + ) + dependencies_map[sig] = all_deps + + while test_databases: + changed = False + deferred = [] + + # Try to find a DB that has all its dependencies met + for signature, (db_name, aliases) in test_databases: + if dependencies_map[signature].issubset(resolved_databases): + resolved_databases.update(aliases) + ordered_test_databases.append((signature, (db_name, aliases))) + changed = True + else: + deferred.append((signature, (db_name, aliases))) + + if not changed: + raise ImproperlyConfigured("Circular dependency in TEST[DEPENDENCIES]") + test_databases = deferred + return ordered_test_databases + + +def get_unique_databases_and_mirrors(): + """ + Figure out which databases actually need to be created. + + Deduplicate entries in DATABASES that correspond the same database or are + configured as test mirrors. + + Return two values: + - test_databases: ordered mapping of signatures to (name, list of aliases) + where all aliases share the same underlying database. + - mirrored_aliases: mapping of mirror aliases to original aliases. + """ + mirrored_aliases = {} + test_databases = {} + dependencies = {} + default_sig = connections[DEFAULT_DB_ALIAS].creation.test_db_signature() + + for alias in connections: + connection = connections[alias] + test_settings = connection.settings_dict['TEST'] + + if test_settings['MIRROR']: + # If the database is marked as a test mirror, save the alias. + mirrored_aliases[alias] = test_settings['MIRROR'] + else: + # Store a tuple with DB parameters that uniquely identify it. + # If we have two aliases with the same values for that tuple, + # we only need to create the test database once. + item = test_databases.setdefault( + connection.creation.test_db_signature(), + (connection.settings_dict['NAME'], set()) + ) + item[1].add(alias) + + if 'DEPENDENCIES' in test_settings: + dependencies[alias] = test_settings['DEPENDENCIES'] + else: + if alias != DEFAULT_DB_ALIAS and connection.creation.test_db_signature() != default_sig: + dependencies[alias] = test_settings.get('DEPENDENCIES', [DEFAULT_DB_ALIAS]) + + test_databases = dependency_ordered(test_databases.items(), dependencies) + test_databases = collections.OrderedDict(test_databases) + return test_databases, mirrored_aliases + + +def teardown_databases(old_config, verbosity, parallel=0, keepdb=False): + """ + Destroy all the non-mirror databases. + """ + for connection, old_name, destroy in old_config: + if destroy: + if parallel > 1: + for index in range(parallel): + connection.creation.destroy_test_db( + number=index + 1, + verbosity=verbosity, + keepdb=keepdb, + ) + connection.creation.destroy_test_db(old_name, verbosity, keepdb) + + def get_runner(settings, test_runner_class=None): if not test_runner_class: test_runner_class = settings.TEST_RUNNER |
