summaryrefslogtreecommitdiff
path: root/tests/backends/sqlite
diff options
context:
space:
mode:
Diffstat (limited to 'tests/backends/sqlite')
-rw-r--r--tests/backends/sqlite/test_creation.py24
-rw-r--r--tests/backends/sqlite/test_features.py8
-rw-r--r--tests/backends/sqlite/test_functions.py10
-rw-r--r--tests/backends/sqlite/test_introspection.py125
-rw-r--r--tests/backends/sqlite/test_operations.py17
-rw-r--r--tests/backends/sqlite/tests.py116
6 files changed, 183 insertions, 117 deletions
diff --git a/tests/backends/sqlite/test_creation.py b/tests/backends/sqlite/test_creation.py
index 6ec4262f73..ab1640c04e 100644
--- a/tests/backends/sqlite/test_creation.py
+++ b/tests/backends/sqlite/test_creation.py
@@ -5,15 +5,17 @@ from django.db import DEFAULT_DB_ALIAS, connection, connections
from django.test import SimpleTestCase
-@unittest.skipUnless(connection.vendor == 'sqlite', 'SQLite tests')
+@unittest.skipUnless(connection.vendor == "sqlite", "SQLite tests")
class TestDbSignatureTests(SimpleTestCase):
def test_custom_test_name(self):
test_connection = copy.copy(connections[DEFAULT_DB_ALIAS])
- test_connection.settings_dict = copy.deepcopy(connections[DEFAULT_DB_ALIAS].settings_dict)
- test_connection.settings_dict['NAME'] = None
- test_connection.settings_dict['TEST']['NAME'] = 'custom.sqlite.db'
+ test_connection.settings_dict = copy.deepcopy(
+ connections[DEFAULT_DB_ALIAS].settings_dict
+ )
+ test_connection.settings_dict["NAME"] = None
+ test_connection.settings_dict["TEST"]["NAME"] = "custom.sqlite.db"
signature = test_connection.creation_class(test_connection).test_db_signature()
- self.assertEqual(signature, (None, 'custom.sqlite.db'))
+ self.assertEqual(signature, (None, "custom.sqlite.db"))
def test_get_test_db_clone_settings_name(self):
test_connection = copy.copy(connections[DEFAULT_DB_ALIAS])
@@ -21,13 +23,13 @@ class TestDbSignatureTests(SimpleTestCase):
connections[DEFAULT_DB_ALIAS].settings_dict,
)
tests = [
- ('test.sqlite3', 'test_1.sqlite3'),
- ('test', 'test_1'),
+ ("test.sqlite3", "test_1.sqlite3"),
+ ("test", "test_1"),
]
for test_db_name, expected_clone_name in tests:
with self.subTest(test_db_name=test_db_name):
- test_connection.settings_dict['NAME'] = test_db_name
- test_connection.settings_dict['TEST']['NAME'] = test_db_name
+ test_connection.settings_dict["NAME"] = test_db_name
+ test_connection.settings_dict["TEST"]["NAME"] = test_db_name
creation_class = test_connection.creation_class(test_connection)
- clone_settings_dict = creation_class.get_test_db_clone_settings('1')
- self.assertEqual(clone_settings_dict['NAME'], expected_clone_name)
+ clone_settings_dict = creation_class.get_test_db_clone_settings("1")
+ self.assertEqual(clone_settings_dict["NAME"], expected_clone_name)
diff --git a/tests/backends/sqlite/test_features.py b/tests/backends/sqlite/test_features.py
index 9b74794408..50ccbbd3cc 100644
--- a/tests/backends/sqlite/test_features.py
+++ b/tests/backends/sqlite/test_features.py
@@ -4,14 +4,14 @@ from django.db import OperationalError, connection
from django.test import TestCase
-@skipUnless(connection.vendor == 'sqlite', 'SQLite tests.')
+@skipUnless(connection.vendor == "sqlite", "SQLite tests.")
class FeaturesTests(TestCase):
def test_supports_json_field_operational_error(self):
- if hasattr(connection.features, 'supports_json_field'):
+ if hasattr(connection.features, "supports_json_field"):
del connection.features.supports_json_field
- msg = 'unable to open database file'
+ msg = "unable to open database file"
with mock.patch(
- 'django.db.backends.base.base.BaseDatabaseWrapper.cursor',
+ "django.db.backends.base.base.BaseDatabaseWrapper.cursor",
side_effect=OperationalError(msg),
):
with self.assertRaisesMessage(OperationalError, msg):
diff --git a/tests/backends/sqlite/test_functions.py b/tests/backends/sqlite/test_functions.py
index 0659717799..1102d5873e 100644
--- a/tests/backends/sqlite/test_functions.py
+++ b/tests/backends/sqlite/test_functions.py
@@ -1,5 +1,7 @@
from django.db.backends.sqlite3._functions import (
- _sqlite_date_trunc, _sqlite_datetime_trunc, _sqlite_time_trunc,
+ _sqlite_date_trunc,
+ _sqlite_datetime_trunc,
+ _sqlite_time_trunc,
)
from django.test import SimpleTestCase
@@ -8,14 +10,14 @@ class FunctionTests(SimpleTestCase):
def test_sqlite_date_trunc(self):
msg = "Unsupported lookup type: 'unknown-lookup'"
with self.assertRaisesMessage(ValueError, msg):
- _sqlite_date_trunc('unknown-lookup', '2005-08-11', None, None)
+ _sqlite_date_trunc("unknown-lookup", "2005-08-11", None, None)
def test_sqlite_datetime_trunc(self):
msg = "Unsupported lookup type: 'unknown-lookup'"
with self.assertRaisesMessage(ValueError, msg):
- _sqlite_datetime_trunc('unknown-lookup', '2005-08-11 1:00:00', None, None)
+ _sqlite_datetime_trunc("unknown-lookup", "2005-08-11 1:00:00", None, None)
def test_sqlite_time_trunc(self):
msg = "Unsupported lookup type: 'unknown-lookup'"
with self.assertRaisesMessage(ValueError, msg):
- _sqlite_time_trunc('unknown-lookup', '2005-08-11 1:00:00', None, None)
+ _sqlite_time_trunc("unknown-lookup", "2005-08-11 1:00:00", None, None)
diff --git a/tests/backends/sqlite/test_introspection.py b/tests/backends/sqlite/test_introspection.py
index 9331b5bb1a..2997ac9595 100644
--- a/tests/backends/sqlite/test_introspection.py
+++ b/tests/backends/sqlite/test_introspection.py
@@ -6,7 +6,7 @@ from django.db import connection
from django.test import TestCase
-@unittest.skipUnless(connection.vendor == 'sqlite', 'SQLite tests')
+@unittest.skipUnless(connection.vendor == "sqlite", "SQLite tests")
class IntrospectionTests(TestCase):
def test_get_primary_key_column(self):
"""
@@ -14,19 +14,26 @@ class IntrospectionTests(TestCase):
quotation.
"""
testable_column_strings = (
- ('id', 'id'), ('[id]', 'id'), ('`id`', 'id'), ('"id"', 'id'),
- ('[id col]', 'id col'), ('`id col`', 'id col'), ('"id col"', 'id col')
+ ("id", "id"),
+ ("[id]", "id"),
+ ("`id`", "id"),
+ ('"id"', "id"),
+ ("[id col]", "id col"),
+ ("`id col`", "id col"),
+ ('"id col"', "id col"),
)
with connection.cursor() as cursor:
for column, expected_string in testable_column_strings:
- sql = 'CREATE TABLE test_primary (%s int PRIMARY KEY NOT NULL)' % column
+ sql = "CREATE TABLE test_primary (%s int PRIMARY KEY NOT NULL)" % column
with self.subTest(column=column):
try:
cursor.execute(sql)
- field = connection.introspection.get_primary_key_column(cursor, 'test_primary')
+ field = connection.introspection.get_primary_key_column(
+ cursor, "test_primary"
+ )
self.assertEqual(field, expected_string)
finally:
- cursor.execute('DROP TABLE test_primary')
+ cursor.execute("DROP TABLE test_primary")
def test_get_primary_key_column_pk_constraint(self):
sql = """
@@ -41,38 +48,43 @@ class IntrospectionTests(TestCase):
cursor.execute(sql)
field = connection.introspection.get_primary_key_column(
cursor,
- 'test_primary',
+ "test_primary",
)
- self.assertEqual(field, 'id')
+ self.assertEqual(field, "id")
finally:
- cursor.execute('DROP TABLE test_primary')
+ cursor.execute("DROP TABLE test_primary")
-@unittest.skipUnless(connection.vendor == 'sqlite', 'SQLite tests')
+@unittest.skipUnless(connection.vendor == "sqlite", "SQLite tests")
class ParsingTests(TestCase):
def parse_definition(self, sql, columns):
"""Parse a column or constraint definition."""
statement = sqlparse.parse(sql)[0]
tokens = (token for token in statement.flatten() if not token.is_whitespace)
with connection.cursor():
- return connection.introspection._parse_column_or_constraint_definition(tokens, set(columns))
+ return connection.introspection._parse_column_or_constraint_definition(
+ tokens, set(columns)
+ )
def assertConstraint(self, constraint_details, cols, unique=False, check=False):
- self.assertEqual(constraint_details, {
- 'unique': unique,
- 'columns': cols,
- 'primary_key': False,
- 'foreign_key': None,
- 'check': check,
- 'index': False,
- })
+ self.assertEqual(
+ constraint_details,
+ {
+ "unique": unique,
+ "columns": cols,
+ "primary_key": False,
+ "foreign_key": None,
+ "check": check,
+ "index": False,
+ },
+ )
def test_unique_column(self):
tests = (
- ('"ref" integer UNIQUE,', ['ref']),
- ('ref integer UNIQUE,', ['ref']),
- ('"customname" integer UNIQUE,', ['customname']),
- ('customname integer UNIQUE,', ['customname']),
+ ('"ref" integer UNIQUE,', ["ref"]),
+ ("ref integer UNIQUE,", ["ref"]),
+ ('"customname" integer UNIQUE,', ["customname"]),
+ ("customname integer UNIQUE,", ["customname"]),
)
for sql, columns in tests:
with self.subTest(sql=sql):
@@ -83,10 +95,18 @@ class ParsingTests(TestCase):
def test_unique_constraint(self):
tests = (
- ('CONSTRAINT "ref" UNIQUE ("ref"),', 'ref', ['ref']),
- ('CONSTRAINT ref UNIQUE (ref),', 'ref', ['ref']),
- ('CONSTRAINT "customname1" UNIQUE ("customname2"),', 'customname1', ['customname2']),
- ('CONSTRAINT customname1 UNIQUE (customname2),', 'customname1', ['customname2']),
+ ('CONSTRAINT "ref" UNIQUE ("ref"),', "ref", ["ref"]),
+ ("CONSTRAINT ref UNIQUE (ref),", "ref", ["ref"]),
+ (
+ 'CONSTRAINT "customname1" UNIQUE ("customname2"),',
+ "customname1",
+ ["customname2"],
+ ),
+ (
+ "CONSTRAINT customname1 UNIQUE (customname2),",
+ "customname1",
+ ["customname2"],
+ ),
)
for sql, constraint_name, columns in tests:
with self.subTest(sql=sql):
@@ -97,8 +117,12 @@ class ParsingTests(TestCase):
def test_unique_constraint_multicolumn(self):
tests = (
- ('CONSTRAINT "ref" UNIQUE ("ref", "customname"),', 'ref', ['ref', 'customname']),
- ('CONSTRAINT ref UNIQUE (ref, customname),', 'ref', ['ref', 'customname']),
+ (
+ 'CONSTRAINT "ref" UNIQUE ("ref", "customname"),',
+ "ref",
+ ["ref", "customname"],
+ ),
+ ("CONSTRAINT ref UNIQUE (ref, customname),", "ref", ["ref", "customname"]),
)
for sql, constraint_name, columns in tests:
with self.subTest(sql=sql):
@@ -109,10 +133,16 @@ class ParsingTests(TestCase):
def test_check_column(self):
tests = (
- ('"ref" varchar(255) CHECK ("ref" != \'test\'),', ['ref']),
- ('ref varchar(255) CHECK (ref != \'test\'),', ['ref']),
- ('"customname1" varchar(255) CHECK ("customname2" != \'test\'),', ['customname2']),
- ('customname1 varchar(255) CHECK (customname2 != \'test\'),', ['customname2']),
+ ('"ref" varchar(255) CHECK ("ref" != \'test\'),', ["ref"]),
+ ("ref varchar(255) CHECK (ref != 'test'),", ["ref"]),
+ (
+ '"customname1" varchar(255) CHECK ("customname2" != \'test\'),',
+ ["customname2"],
+ ),
+ (
+ "customname1 varchar(255) CHECK (customname2 != 'test'),",
+ ["customname2"],
+ ),
)
for sql, columns in tests:
with self.subTest(sql=sql):
@@ -123,10 +153,18 @@ class ParsingTests(TestCase):
def test_check_constraint(self):
tests = (
- ('CONSTRAINT "ref" CHECK ("ref" != \'test\'),', 'ref', ['ref']),
- ('CONSTRAINT ref CHECK (ref != \'test\'),', 'ref', ['ref']),
- ('CONSTRAINT "customname1" CHECK ("customname2" != \'test\'),', 'customname1', ['customname2']),
- ('CONSTRAINT customname1 CHECK (customname2 != \'test\'),', 'customname1', ['customname2']),
+ ('CONSTRAINT "ref" CHECK ("ref" != \'test\'),', "ref", ["ref"]),
+ ("CONSTRAINT ref CHECK (ref != 'test'),", "ref", ["ref"]),
+ (
+ 'CONSTRAINT "customname1" CHECK ("customname2" != \'test\'),',
+ "customname1",
+ ["customname2"],
+ ),
+ (
+ "CONSTRAINT customname1 CHECK (customname2 != 'test'),",
+ "customname1",
+ ["customname2"],
+ ),
)
for sql, constraint_name, columns in tests:
with self.subTest(sql=sql):
@@ -137,9 +175,12 @@ class ParsingTests(TestCase):
def test_check_column_with_operators_and_functions(self):
tests = (
- ('"ref" integer CHECK ("ref" BETWEEN 1 AND 10),', ['ref']),
- ('"ref" varchar(255) CHECK ("ref" LIKE \'test%\'),', ['ref']),
- ('"ref" varchar(255) CHECK (LENGTH(ref) > "max_length"),', ['ref', 'max_length']),
+ ('"ref" integer CHECK ("ref" BETWEEN 1 AND 10),', ["ref"]),
+ ('"ref" varchar(255) CHECK ("ref" LIKE \'test%\'),', ["ref"]),
+ (
+ '"ref" varchar(255) CHECK (LENGTH(ref) > "max_length"),',
+ ["ref", "max_length"],
+ ),
)
for sql, columns in tests:
with self.subTest(sql=sql):
@@ -150,8 +191,8 @@ class ParsingTests(TestCase):
def test_check_and_unique_column(self):
tests = (
- ('"ref" varchar(255) CHECK ("ref" != \'test\') UNIQUE,', ['ref']),
- ('ref varchar(255) UNIQUE CHECK (ref != \'test\'),', ['ref']),
+ ('"ref" varchar(255) CHECK ("ref" != \'test\') UNIQUE,', ["ref"]),
+ ("ref varchar(255) UNIQUE CHECK (ref != 'test'),", ["ref"]),
)
for sql, columns in tests:
with self.subTest(sql=sql):
diff --git a/tests/backends/sqlite/test_operations.py b/tests/backends/sqlite/test_operations.py
index 863a978580..3ff055248d 100644
--- a/tests/backends/sqlite/test_operations.py
+++ b/tests/backends/sqlite/test_operations.py
@@ -7,7 +7,7 @@ from django.test import TestCase
from ..models import Person, Tag
-@unittest.skipUnless(connection.vendor == 'sqlite', 'SQLite tests.')
+@unittest.skipUnless(connection.vendor == "sqlite", "SQLite tests.")
class SQLiteOperationsTests(TestCase):
def test_sql_flush(self):
self.assertEqual(
@@ -34,7 +34,7 @@ class SQLiteOperationsTests(TestCase):
'DELETE FROM "backends_person";',
'DELETE FROM "backends_tag";',
'DELETE FROM "backends_verylongmodelnamezzzzzzzzzzzzzzzzzzzzzz'
- 'zzzzzzzzzzzzzzzzzzzz_m2m_also_quite_long_zzzzzzzzzzzzzzzzzzzz'
+ "zzzzzzzzzzzzzzzzzzzz_m2m_also_quite_long_zzzzzzzzzzzzzzzzzzzz"
'zzzzzzzzzzzzzzzzzzzzzzz";',
],
)
@@ -50,7 +50,7 @@ class SQLiteOperationsTests(TestCase):
'DELETE FROM "backends_person";',
'DELETE FROM "backends_tag";',
'UPDATE "sqlite_sequence" SET "seq" = 0 WHERE "name" IN '
- '(\'backends_person\', \'backends_tag\');',
+ "('backends_person', 'backends_tag');",
],
)
@@ -68,13 +68,16 @@ class SQLiteOperationsTests(TestCase):
'DELETE FROM "backends_person";',
'DELETE FROM "backends_tag";',
'DELETE FROM "backends_verylongmodelnamezzzzzzzzzzzzzzzzzzzzzz'
- 'zzzzzzzzzzzzzzzzzzzz_m2m_also_quite_long_zzzzzzzzzzzzzzzzzzzz'
+ "zzzzzzzzzzzzzzzzzzzz_m2m_also_quite_long_zzzzzzzzzzzzzzzzzzzz"
'zzzzzzzzzzzzzzzzzzzzzzz";',
],
)
- self.assertIs(statements[-1].startswith(
- 'UPDATE "sqlite_sequence" SET "seq" = 0 WHERE "name" IN ('
- ), True)
+ self.assertIs(
+ statements[-1].startswith(
+ 'UPDATE "sqlite_sequence" SET "seq" = 0 WHERE "name" IN ('
+ ),
+ True,
+ )
self.assertIn("'backends_person'", statements[-1])
self.assertIn("'backends_tag'", statements[-1])
self.assertIn(
diff --git a/tests/backends/sqlite/tests.py b/tests/backends/sqlite/tests.py
index 07477a5c71..e167e09dcf 100644
--- a/tests/backends/sqlite/tests.py
+++ b/tests/backends/sqlite/tests.py
@@ -12,7 +12,10 @@ from django.db import NotSupportedError, connection, transaction
from django.db.models import Aggregate, Avg, CharField, StdDev, Sum, Variance
from django.db.utils import ConnectionHandler
from django.test import (
- TestCase, TransactionTestCase, override_settings, skipIfDBFeature,
+ TestCase,
+ TransactionTestCase,
+ override_settings,
+ skipIfDBFeature,
)
from django.test.utils import isolate_apps
@@ -25,35 +28,43 @@ except ImproperlyConfigured:
pass
-@unittest.skipUnless(connection.vendor == 'sqlite', 'SQLite tests')
+@unittest.skipUnless(connection.vendor == "sqlite", "SQLite tests")
class Tests(TestCase):
longMessage = True
def test_check_sqlite_version(self):
- msg = 'SQLite 3.9.0 or later is required (found 3.8.11.1).'
- with mock.patch.object(dbapi2, 'sqlite_version_info', (3, 8, 11, 1)), \
- mock.patch.object(dbapi2, 'sqlite_version', '3.8.11.1'), \
- self.assertRaisesMessage(ImproperlyConfigured, msg):
+ msg = "SQLite 3.9.0 or later is required (found 3.8.11.1)."
+ with mock.patch.object(
+ dbapi2, "sqlite_version_info", (3, 8, 11, 1)
+ ), mock.patch.object(
+ dbapi2, "sqlite_version", "3.8.11.1"
+ ), self.assertRaisesMessage(
+ ImproperlyConfigured, msg
+ ):
check_sqlite_version()
def test_aggregation(self):
"""Raise NotSupportedError when aggregating on date/time fields."""
for aggregate in (Sum, Avg, Variance, StdDev):
with self.assertRaises(NotSupportedError):
- Item.objects.all().aggregate(aggregate('time'))
+ Item.objects.all().aggregate(aggregate("time"))
with self.assertRaises(NotSupportedError):
- Item.objects.all().aggregate(aggregate('date'))
+ Item.objects.all().aggregate(aggregate("date"))
with self.assertRaises(NotSupportedError):
- Item.objects.all().aggregate(aggregate('last_modified'))
+ Item.objects.all().aggregate(aggregate("last_modified"))
with self.assertRaises(NotSupportedError):
Item.objects.all().aggregate(
- **{'complex': aggregate('last_modified') + aggregate('last_modified')}
+ **{
+ "complex": aggregate("last_modified")
+ + aggregate("last_modified")
+ }
)
def test_distinct_aggregation(self):
class DistinctAggregate(Aggregate):
allow_distinct = True
- aggregate = DistinctAggregate('first', 'second', distinct=True)
+
+ aggregate = DistinctAggregate("first", "second", distinct=True)
msg = (
"SQLite doesn't support DISTINCT on aggregate functions accepting "
"multiple arguments."
@@ -67,32 +78,36 @@ class Tests(TestCase):
class DistinctAggregate(Aggregate):
allow_distinct = True
- aggregate = DistinctAggregate('first', 'second', distinct=False)
+ aggregate = DistinctAggregate("first", "second", distinct=False)
connection.ops.check_expression_support(aggregate)
def test_memory_db_test_name(self):
"""A named in-memory db should be allowed where supported."""
from django.db.backends.sqlite3.base import DatabaseWrapper
+
settings_dict = {
- 'TEST': {
- 'NAME': 'file:memorydb_test?mode=memory&cache=shared',
+ "TEST": {
+ "NAME": "file:memorydb_test?mode=memory&cache=shared",
}
}
creation = DatabaseWrapper(settings_dict).creation
- self.assertEqual(creation._get_test_db_name(), creation.connection.settings_dict['TEST']['NAME'])
+ self.assertEqual(
+ creation._get_test_db_name(),
+ creation.connection.settings_dict["TEST"]["NAME"],
+ )
def test_regexp_function(self):
tests = (
- ('test', r'[0-9]+', False),
- ('test', r'[a-z]+', True),
- ('test', None, None),
- (None, r'[a-z]+', None),
+ ("test", r"[0-9]+", False),
+ ("test", r"[a-z]+", True),
+ ("test", None, None),
+ (None, r"[a-z]+", None),
(None, None, None),
)
for string, pattern, expected in tests:
with self.subTest((string, pattern)):
with connection.cursor() as cursor:
- cursor.execute('SELECT %s REGEXP %s', [string, pattern])
+ cursor.execute("SELECT %s REGEXP %s", [string, pattern])
value = cursor.fetchone()[0]
value = bool(value) if value in {0, 1} else value
self.assertIs(value, expected)
@@ -100,22 +115,22 @@ class Tests(TestCase):
def test_pathlib_name(self):
with tempfile.TemporaryDirectory() as tmp:
settings_dict = {
- 'default': {
- 'ENGINE': 'django.db.backends.sqlite3',
- 'NAME': Path(tmp) / 'test.db',
+ "default": {
+ "ENGINE": "django.db.backends.sqlite3",
+ "NAME": Path(tmp) / "test.db",
},
}
connections = ConnectionHandler(settings_dict)
- connections['default'].ensure_connection()
- connections['default'].close()
- self.assertTrue(os.path.isfile(os.path.join(tmp, 'test.db')))
+ connections["default"].ensure_connection()
+ connections["default"].close()
+ self.assertTrue(os.path.isfile(os.path.join(tmp, "test.db")))
-@unittest.skipUnless(connection.vendor == 'sqlite', 'SQLite tests')
-@isolate_apps('backends')
+@unittest.skipUnless(connection.vendor == "sqlite", "SQLite tests")
+@isolate_apps("backends")
class SchemaTests(TransactionTestCase):
- available_apps = ['backends']
+ available_apps = ["backends"]
def test_autoincrement(self):
"""
@@ -128,9 +143,9 @@ class SchemaTests(TransactionTestCase):
match = re.search('"id" ([^,]+),', statements[0])
self.assertIsNotNone(match)
self.assertEqual(
- 'integer NOT NULL PRIMARY KEY AUTOINCREMENT',
+ "integer NOT NULL PRIMARY KEY AUTOINCREMENT",
match[1],
- 'Wrong SQL used to create an auto-increment column on SQLite'
+ "Wrong SQL used to create an auto-increment column on SQLite",
)
def test_disable_constraint_checking_failure_disallowed(self):
@@ -139,11 +154,11 @@ class SchemaTests(TransactionTestCase):
foreign key constraint checks are not disabled beforehand.
"""
msg = (
- 'SQLite schema editor cannot be used while foreign key '
- 'constraint checks are enabled. Make sure to disable them '
- 'before entering a transaction.atomic() context because '
- 'SQLite does not support disabling them in the middle of '
- 'a multi-statement transaction.'
+ "SQLite schema editor cannot be used while foreign key "
+ "constraint checks are enabled. Make sure to disable them "
+ "before entering a transaction.atomic() context because "
+ "SQLite does not support disabling them in the middle of "
+ "a multi-statement transaction."
)
with self.assertRaisesMessage(NotSupportedError, msg):
with transaction.atomic(), connection.schema_editor(atomic=True):
@@ -154,23 +169,25 @@ class SchemaTests(TransactionTestCase):
SQLite schema editor is usable within an outer transaction as long as
foreign key constraints checks are disabled beforehand.
"""
+
def constraint_checks_enabled():
with connection.cursor() as cursor:
- return bool(cursor.execute('PRAGMA foreign_keys').fetchone()[0])
+ return bool(cursor.execute("PRAGMA foreign_keys").fetchone()[0])
+
with connection.constraint_checks_disabled(), transaction.atomic():
with connection.schema_editor(atomic=True):
self.assertFalse(constraint_checks_enabled())
self.assertFalse(constraint_checks_enabled())
self.assertTrue(constraint_checks_enabled())
- @skipIfDBFeature('supports_atomic_references_rename')
+ @skipIfDBFeature("supports_atomic_references_rename")
def test_field_rename_inside_atomic_block(self):
"""
NotImplementedError is raised when a model field rename is attempted
inside an atomic block.
"""
new_field = CharField(max_length=255, unique=True)
- new_field.set_attributes_from_name('renamed')
+ new_field.set_attributes_from_name("renamed")
msg = (
"Renaming the 'backends_author'.'name' column while in a "
"transaction is not supported on SQLite < 3.26 because it would "
@@ -179,9 +196,9 @@ class SchemaTests(TransactionTestCase):
)
with self.assertRaisesMessage(NotSupportedError, msg):
with connection.schema_editor(atomic=True) as editor:
- editor.alter_field(Author, Author._meta.get_field('name'), new_field)
+ editor.alter_field(Author, Author._meta.get_field("name"), new_field)
- @skipIfDBFeature('supports_atomic_references_rename')
+ @skipIfDBFeature("supports_atomic_references_rename")
def test_table_rename_inside_atomic_block(self):
"""
NotImplementedError is raised when a table rename is attempted inside
@@ -197,16 +214,15 @@ class SchemaTests(TransactionTestCase):
editor.alter_db_table(Author, "backends_author", "renamed_table")
-@unittest.skipUnless(connection.vendor == 'sqlite', 'Test only for SQLite')
+@unittest.skipUnless(connection.vendor == "sqlite", "Test only for SQLite")
@override_settings(DEBUG=True)
class LastExecutedQueryTest(TestCase):
-
def test_no_interpolation(self):
# This shouldn't raise an exception (#17158)
query = "SELECT strftime('%Y', 'now');"
with connection.cursor() as cursor:
cursor.execute(query)
- self.assertEqual(connection.queries[-1]['sql'], query)
+ self.assertEqual(connection.queries[-1]["sql"], query)
def test_parameter_quoting(self):
# The implementation of last_executed_queries isn't optimal. It's
@@ -217,7 +233,7 @@ class LastExecutedQueryTest(TestCase):
cursor.execute(query, params)
# Note that the single quote is repeated
substituted = "SELECT '\"''\\'"
- self.assertEqual(connection.queries[-1]['sql'], substituted)
+ self.assertEqual(connection.queries[-1]["sql"], substituted)
def test_large_number_of_parameters(self):
# If SQLITE_MAX_VARIABLE_NUMBER (default = 999) has been changed to be
@@ -230,12 +246,13 @@ class LastExecutedQueryTest(TestCase):
cursor.db.ops.last_executed_query(cursor.cursor, sql, params)
-@unittest.skipUnless(connection.vendor == 'sqlite', 'SQLite tests')
+@unittest.skipUnless(connection.vendor == "sqlite", "SQLite tests")
class EscapingChecks(TestCase):
"""
All tests in this test case are also run with settings.DEBUG=True in
EscapingChecksDebug test case, to also test CursorDebugWrapper.
"""
+
def test_parameter_escaping(self):
# '%s' escaping support for sqlite3 (#13648).
with connection.cursor() as cursor:
@@ -245,19 +262,20 @@ class EscapingChecks(TestCase):
self.assertTrue(int(response))
-@unittest.skipUnless(connection.vendor == 'sqlite', 'SQLite tests')
+@unittest.skipUnless(connection.vendor == "sqlite", "SQLite tests")
@override_settings(DEBUG=True)
class EscapingChecksDebug(EscapingChecks):
pass
-@unittest.skipUnless(connection.vendor == 'sqlite', 'SQLite tests')
+@unittest.skipUnless(connection.vendor == "sqlite", "SQLite tests")
class ThreadSharing(TransactionTestCase):
- available_apps = ['backends']
+ available_apps = ["backends"]
def test_database_sharing_in_threads(self):
def create_object():
Object.objects.create()
+
create_object()
thread = threading.Thread(target=create_object)
thread.start()