summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--django/db/backends/mysql/creation.py35
-rw-r--r--tests/backends/test_creation.py38
2 files changed, 66 insertions, 7 deletions
diff --git a/django/db/backends/mysql/creation.py b/django/db/backends/mysql/creation.py
index c53b9f8335..546fb6a629 100644
--- a/django/db/backends/mysql/creation.py
+++ b/django/db/backends/mysql/creation.py
@@ -17,24 +17,45 @@ class DatabaseCreation(BaseDatabaseCreation):
suffix.append('COLLATE %s' % test_settings['COLLATION'])
return ' '.join(suffix)
+ def _execute_create_test_db(self, cursor, parameters, keepdb=False):
+ try:
+ if keepdb:
+ # If the database should be kept, add "IF NOT EXISTS" to avoid
+ # "database exists" error, also temporarily disable "database
+ # exists" warning.
+ cursor.execute('''
+ SET @_tmp_sql_notes := @@sql_notes, sql_notes = 0;
+ CREATE DATABASE IF NOT EXISTS %(dbname)s %(suffix)s;
+ SET sql_notes = @_tmp_sql_notes;
+ ''' % parameters)
+ else:
+ super()._execute_create_test_db(cursor, parameters, keepdb)
+ except Exception as e:
+ if len(e.args) < 1 or e.args[0] != 1007:
+ # All errors except "database exists" (1007) cancel tests.
+ sys.stderr.write('Got an error creating the test database: %s\n' % e)
+ sys.exit(2)
+ else:
+ raise e
+
def _clone_test_db(self, number, verbosity, keepdb=False):
- qn = self.connection.ops.quote_name
source_database_name = self.connection.settings_dict['NAME']
target_database_name = self.get_test_db_clone_settings(number)['NAME']
-
+ test_db_params = {
+ 'dbname': self.connection.ops.quote_name(target_database_name),
+ 'suffix': self.sql_table_creation_suffix(),
+ }
with self._nodb_connection.cursor() as cursor:
try:
- cursor.execute("CREATE DATABASE %s" % qn(target_database_name))
+ self._execute_create_test_db(cursor, test_db_params, keepdb)
except Exception:
- if keepdb:
- return
try:
if verbosity >= 1:
print("Destroying old test database for alias %s..." % (
self._get_database_display_str(verbosity, target_database_name),
))
- cursor.execute("DROP DATABASE %s" % qn(target_database_name))
- cursor.execute("CREATE DATABASE %s" % qn(target_database_name))
+ cursor.execute('DROP DATABASE %(dbname)s' % test_db_params)
+ self._execute_create_test_db(cursor, test_db_params, keepdb)
except Exception as e:
sys.stderr.write("Got an error recreating the test database: %s\n" % e)
sys.exit(2)
diff --git a/tests/backends/test_creation.py b/tests/backends/test_creation.py
index f96102117b..2ecd03ce2d 100644
--- a/tests/backends/test_creation.py
+++ b/tests/backends/test_creation.py
@@ -8,6 +8,8 @@ from django.db import DEFAULT_DB_ALIAS, connection, connections
from django.db.backends.base.creation import (
TEST_DATABASE_PREFIX, BaseDatabaseCreation,
)
+from django.db.backends.mysql.creation import \
+ DatabaseCreation as MySQLDatabaseCreation
from django.db.backends.oracle.creation import \
DatabaseCreation as OracleDatabaseCreation
from django.db.backends.postgresql.creation import \
@@ -191,3 +193,39 @@ class OracleDatabaseCreationTests(TestCase):
creation._create_test_db(verbosity=0, keepdb=False)
with self.assertRaises(SystemExit):
creation._create_test_db(verbosity=0, keepdb=True)
+
+
+@unittest.skipUnless(connection.vendor == 'mysql', "MySQL specific tests")
+class MySQLDatabaseCreationTests(SimpleTestCase):
+
+ def _execute_raise_database_exists(self, cursor, parameters, keepdb=False):
+ raise DatabaseError(1007, "Can't create database '%s'; database exists" % parameters['dbname'])
+
+ def _execute_raise_access_denied(self, cursor, parameters, keepdb=False):
+ raise DatabaseError(1044, "Access denied for user")
+
+ def patch_test_db_creation(self, execute_create_test_db):
+ return mock.patch.object(BaseDatabaseCreation, '_execute_create_test_db', execute_create_test_db)
+
+ @mock.patch('sys.stdout', new_callable=StringIO)
+ @mock.patch('sys.stderr', new_callable=StringIO)
+ def test_create_test_db_database_exists(self, *mocked_objects):
+ # Simulate test database creation raising "database exists"
+ creation = MySQLDatabaseCreation(connection)
+ with self.patch_test_db_creation(self._execute_raise_database_exists):
+ with mock.patch('builtins.input', return_value='no'):
+ with self.assertRaises(SystemExit):
+ # SystemExit is raised if the user answers "no" to the
+ # prompt asking if it's okay to delete the test database.
+ creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False)
+ # "Database exists" shouldn't appear when keepdb is on
+ creation._create_test_db(verbosity=0, autoclobber=False, keepdb=True)
+
+ @mock.patch('sys.stdout', new_callable=StringIO)
+ @mock.patch('sys.stderr', new_callable=StringIO)
+ def test_create_test_db_unexpected_error(self, *mocked_objects):
+ # Simulate test database creation raising unexpected error
+ creation = MySQLDatabaseCreation(connection)
+ with self.patch_test_db_creation(self._execute_raise_access_denied):
+ with self.assertRaises(SystemExit):
+ creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False)