summaryrefslogtreecommitdiff
path: root/django/db/backends/sqlite3/schema.py
diff options
context:
space:
mode:
Diffstat (limited to 'django/db/backends/sqlite3/schema.py')
-rw-r--r--django/db/backends/sqlite3/schema.py307
1 files changed, 196 insertions, 111 deletions
diff --git a/django/db/backends/sqlite3/schema.py b/django/db/backends/sqlite3/schema.py
index 3ff0a3f7db..c9af8088e5 100644
--- a/django/db/backends/sqlite3/schema.py
+++ b/django/db/backends/sqlite3/schema.py
@@ -14,7 +14,9 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
sql_delete_table = "DROP TABLE %(table)s"
sql_create_fk = None
- sql_create_inline_fk = "REFERENCES %(to_table)s (%(to_column)s) DEFERRABLE INITIALLY DEFERRED"
+ sql_create_inline_fk = (
+ "REFERENCES %(to_table)s (%(to_column)s) DEFERRABLE INITIALLY DEFERRED"
+ )
sql_create_column_inline_fk = sql_create_inline_fk
sql_create_unique = "CREATE UNIQUE INDEX %(name)s ON %(table)s (%(columns)s)"
sql_delete_unique = "DROP INDEX %(name)s"
@@ -24,11 +26,11 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
# disabled. Enforce it here for the duration of the schema edition.
if not self.connection.disable_constraint_checking():
raise NotSupportedError(
- 'SQLite schema editor cannot be used while foreign key '
- 'constraint checks are enabled. Make sure to disable them '
- 'before entering a transaction.atomic() context because '
- 'SQLite does not support disabling them in the middle of '
- 'a multi-statement transaction.'
+ "SQLite schema editor cannot be used while foreign key "
+ "constraint checks are enabled. Make sure to disable them "
+ "before entering a transaction.atomic() context because "
+ "SQLite does not support disabling them in the middle of "
+ "a multi-statement transaction."
)
return super().__enter__()
@@ -43,6 +45,7 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
# security hardening).
try:
import sqlite3
+
value = sqlite3.adapt(value)
except ImportError:
pass
@@ -54,7 +57,7 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
elif isinstance(value, (Decimal, float, int)):
return str(value)
elif isinstance(value, str):
- return "'%s'" % value.replace("\'", "\'\'")
+ return "'%s'" % value.replace("'", "''")
elif value is None:
return "NULL"
elif isinstance(value, (bytes, bytearray, memoryview)):
@@ -63,12 +66,16 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
# character.
return "X'%s'" % value.hex()
else:
- raise ValueError("Cannot quote parameter value %r of type %s" % (value, type(value)))
+ raise ValueError(
+ "Cannot quote parameter value %r of type %s" % (value, type(value))
+ )
def prepare_default(self, value):
return self.quote_value(value)
- def _is_referenced_by_fk_constraint(self, table_name, column_name=None, ignore_self=False):
+ def _is_referenced_by_fk_constraint(
+ self, table_name, column_name=None, ignore_self=False
+ ):
"""
Return whether or not the provided table name is referenced by another
one. If `column_name` is specified, only references pointing to that
@@ -79,22 +86,33 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
for other_table in self.connection.introspection.get_table_list(cursor):
if ignore_self and other_table.name == table_name:
continue
- relations = self.connection.introspection.get_relations(cursor, other_table.name)
+ relations = self.connection.introspection.get_relations(
+ cursor, other_table.name
+ )
for constraint_column, constraint_table in relations.values():
- if (constraint_table == table_name and
- (column_name is None or constraint_column == column_name)):
+ if constraint_table == table_name and (
+ column_name is None or constraint_column == column_name
+ ):
return True
return False
- def alter_db_table(self, model, old_db_table, new_db_table, disable_constraints=True):
- if (not self.connection.features.supports_atomic_references_rename and
- disable_constraints and self._is_referenced_by_fk_constraint(old_db_table)):
+ def alter_db_table(
+ self, model, old_db_table, new_db_table, disable_constraints=True
+ ):
+ if (
+ not self.connection.features.supports_atomic_references_rename
+ and disable_constraints
+ and self._is_referenced_by_fk_constraint(old_db_table)
+ ):
if self.connection.in_atomic_block:
- raise NotSupportedError((
- 'Renaming the %r table while in a transaction is not '
- 'supported on SQLite < 3.26 because it would break referential '
- 'integrity. Try adding `atomic = False` to the Migration class.'
- ) % old_db_table)
+ raise NotSupportedError(
+ (
+ "Renaming the %r table while in a transaction is not "
+ "supported on SQLite < 3.26 because it would break referential "
+ "integrity. Try adding `atomic = False` to the Migration class."
+ )
+ % old_db_table
+ )
self.connection.enable_constraint_checking()
super().alter_db_table(model, old_db_table, new_db_table)
self.connection.disable_constraint_checking()
@@ -107,42 +125,56 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
old_field_name = old_field.name
table_name = model._meta.db_table
_, old_column_name = old_field.get_attname_column()
- if (new_field.name != old_field_name and
- not self.connection.features.supports_atomic_references_rename and
- self._is_referenced_by_fk_constraint(table_name, old_column_name, ignore_self=True)):
+ if (
+ new_field.name != old_field_name
+ and not self.connection.features.supports_atomic_references_rename
+ and self._is_referenced_by_fk_constraint(
+ table_name, old_column_name, ignore_self=True
+ )
+ ):
if self.connection.in_atomic_block:
- raise NotSupportedError((
- 'Renaming the %r.%r column while in a transaction is not '
- 'supported on SQLite < 3.26 because it would break referential '
- 'integrity. Try adding `atomic = False` to the Migration class.'
- ) % (model._meta.db_table, old_field_name))
+ raise NotSupportedError(
+ (
+ "Renaming the %r.%r column while in a transaction is not "
+ "supported on SQLite < 3.26 because it would break referential "
+ "integrity. Try adding `atomic = False` to the Migration class."
+ )
+ % (model._meta.db_table, old_field_name)
+ )
with atomic(self.connection.alias):
super().alter_field(model, old_field, new_field, strict=strict)
# Follow SQLite's documented procedure for performing changes
# that don't affect the on-disk content.
# https://sqlite.org/lang_altertable.html#otheralter
with self.connection.cursor() as cursor:
- schema_version = cursor.execute('PRAGMA schema_version').fetchone()[0]
- cursor.execute('PRAGMA writable_schema = 1')
+ schema_version = cursor.execute("PRAGMA schema_version").fetchone()[
+ 0
+ ]
+ cursor.execute("PRAGMA writable_schema = 1")
references_template = ' REFERENCES "%s" ("%%s") ' % table_name
new_column_name = new_field.get_attname_column()[1]
search = references_template % old_column_name
replacement = references_template % new_column_name
- cursor.execute('UPDATE sqlite_master SET sql = replace(sql, %s, %s)', (search, replacement))
- cursor.execute('PRAGMA schema_version = %d' % (schema_version + 1))
- cursor.execute('PRAGMA writable_schema = 0')
+ cursor.execute(
+ "UPDATE sqlite_master SET sql = replace(sql, %s, %s)",
+ (search, replacement),
+ )
+ cursor.execute("PRAGMA schema_version = %d" % (schema_version + 1))
+ cursor.execute("PRAGMA writable_schema = 0")
# The integrity check will raise an exception and rollback
# the transaction if the sqlite_master updates corrupt the
# database.
- cursor.execute('PRAGMA integrity_check')
+ cursor.execute("PRAGMA integrity_check")
# Perform a VACUUM to refresh the database representation from
# the sqlite_master table.
with self.connection.cursor() as cursor:
- cursor.execute('VACUUM')
+ cursor.execute("VACUUM")
else:
super().alter_field(model, old_field, new_field, strict=strict)
- def _remake_table(self, model, create_field=None, delete_field=None, alter_field=None):
+ def _remake_table(
+ self, model, create_field=None, delete_field=None, alter_field=None
+ ):
"""
Shortcut to transform a model from old_model into new_model
@@ -163,6 +195,7 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
# to an altered field.
def is_self_referential(f):
return f.is_relation and f.remote_field.model is model
+
# Work out the new fields dict / mapping
body = {
f.name: f.clone() if is_self_referential(f) else f
@@ -170,14 +203,18 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
}
# Since mapping might mix column names and default values,
# its values must be already quoted.
- mapping = {f.column: self.quote_name(f.column) for f in model._meta.local_concrete_fields}
+ mapping = {
+ f.column: self.quote_name(f.column)
+ for f in model._meta.local_concrete_fields
+ }
# This maps field names (not columns) for things like unique_together
rename_mapping = {}
# If any of the new or altered fields is introducing a new PK,
# remove the old one
restore_pk_field = None
- if getattr(create_field, 'primary_key', False) or (
- alter_field and getattr(alter_field[1], 'primary_key', False)):
+ if getattr(create_field, "primary_key", False) or (
+ alter_field and getattr(alter_field[1], "primary_key", False)
+ ):
for name, field in list(body.items()):
if field.primary_key:
field.primary_key = False
@@ -201,8 +238,8 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
body[new_field.name] = new_field
if old_field.null and not new_field.null:
case_sql = "coalesce(%(col)s, %(default)s)" % {
- 'col': self.quote_name(old_field.column),
- 'default': self.prepare_default(self.effective_default(new_field)),
+ "col": self.quote_name(old_field.column),
+ "default": self.prepare_default(self.effective_default(new_field)),
}
mapping[new_field.column] = case_sql
else:
@@ -213,7 +250,10 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
del body[delete_field.name]
del mapping[delete_field.column]
# Remove any implicit M2M tables
- if delete_field.many_to_many and delete_field.remote_field.through._meta.auto_created:
+ if (
+ delete_field.many_to_many
+ and delete_field.remote_field.through._meta.auto_created
+ ):
return self.delete_model(delete_field.remote_field.through)
# Work inside a new app registry
apps = Apps()
@@ -235,8 +275,7 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
indexes = model._meta.indexes
if delete_field:
indexes = [
- index for index in indexes
- if delete_field.name not in index.fields
+ index for index in indexes if delete_field.name not in index.fields
]
constraints = list(model._meta.constraints)
@@ -252,52 +291,57 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
# This wouldn't be required if the schema editor was operating on model
# states instead of rendered models.
meta_contents = {
- 'app_label': model._meta.app_label,
- 'db_table': model._meta.db_table,
- 'unique_together': unique_together,
- 'index_together': index_together,
- 'indexes': indexes,
- 'constraints': constraints,
- 'apps': apps,
+ "app_label": model._meta.app_label,
+ "db_table": model._meta.db_table,
+ "unique_together": unique_together,
+ "index_together": index_together,
+ "indexes": indexes,
+ "constraints": constraints,
+ "apps": apps,
}
meta = type("Meta", (), meta_contents)
- body_copy['Meta'] = meta
- body_copy['__module__'] = model.__module__
+ body_copy["Meta"] = meta
+ body_copy["__module__"] = model.__module__
type(model._meta.object_name, model.__bases__, body_copy)
# Construct a model with a renamed table name.
body_copy = copy.deepcopy(body)
meta_contents = {
- 'app_label': model._meta.app_label,
- 'db_table': 'new__%s' % strip_quotes(model._meta.db_table),
- 'unique_together': unique_together,
- 'index_together': index_together,
- 'indexes': indexes,
- 'constraints': constraints,
- 'apps': apps,
+ "app_label": model._meta.app_label,
+ "db_table": "new__%s" % strip_quotes(model._meta.db_table),
+ "unique_together": unique_together,
+ "index_together": index_together,
+ "indexes": indexes,
+ "constraints": constraints,
+ "apps": apps,
}
meta = type("Meta", (), meta_contents)
- body_copy['Meta'] = meta
- body_copy['__module__'] = model.__module__
- new_model = type('New%s' % model._meta.object_name, model.__bases__, body_copy)
+ body_copy["Meta"] = meta
+ body_copy["__module__"] = model.__module__
+ new_model = type("New%s" % model._meta.object_name, model.__bases__, body_copy)
# Create a new table with the updated schema.
self.create_model(new_model)
# Copy data from the old table into the new table
- self.execute("INSERT INTO %s (%s) SELECT %s FROM %s" % (
- self.quote_name(new_model._meta.db_table),
- ', '.join(self.quote_name(x) for x in mapping),
- ', '.join(mapping.values()),
- self.quote_name(model._meta.db_table),
- ))
+ self.execute(
+ "INSERT INTO %s (%s) SELECT %s FROM %s"
+ % (
+ self.quote_name(new_model._meta.db_table),
+ ", ".join(self.quote_name(x) for x in mapping),
+ ", ".join(mapping.values()),
+ self.quote_name(model._meta.db_table),
+ )
+ )
# Delete the old table to make way for the new
self.delete_model(model, handle_autom2m=False)
# Rename the new table to take way for the old
self.alter_db_table(
- new_model, new_model._meta.db_table, model._meta.db_table,
+ new_model,
+ new_model._meta.db_table,
+ model._meta.db_table,
disable_constraints=False,
)
@@ -314,12 +358,17 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
super().delete_model(model)
else:
# Delete the table (and only that)
- self.execute(self.sql_delete_table % {
- "table": self.quote_name(model._meta.db_table),
- })
+ self.execute(
+ self.sql_delete_table
+ % {
+ "table": self.quote_name(model._meta.db_table),
+ }
+ )
# Remove all deferred statements referencing the deleted table.
for sql in list(self.deferred_sql):
- if isinstance(sql, Statement) and sql.references_table(model._meta.db_table):
+ if isinstance(sql, Statement) and sql.references_table(
+ model._meta.db_table
+ ):
self.deferred_sql.remove(sql)
def add_field(self, model, field):
@@ -327,11 +376,14 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
if (
# Primary keys and unique fields are not supported in ALTER TABLE
# ADD COLUMN.
- field.primary_key or field.unique or
+ field.primary_key
+ or field.unique
+ or
# Fields with default values cannot by handled by ALTER TABLE ADD
# COLUMN statement because DROP DEFAULT is not supported in
# ALTER TABLE.
- not field.null or self.effective_default(field) is not None
+ not field.null
+ or self.effective_default(field) is not None
):
self._remake_table(model, create_field=field)
else:
@@ -351,21 +403,40 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
# For everything else, remake.
else:
# It might not actually have a column behind it
- if field.db_parameters(connection=self.connection)['type'] is None:
+ if field.db_parameters(connection=self.connection)["type"] is None:
return
self._remake_table(model, delete_field=field)
- def _alter_field(self, model, old_field, new_field, old_type, new_type,
- old_db_params, new_db_params, strict=False):
+ def _alter_field(
+ self,
+ model,
+ old_field,
+ new_field,
+ old_type,
+ new_type,
+ old_db_params,
+ new_db_params,
+ strict=False,
+ ):
"""Perform a "physical" (non-ManyToMany) field update."""
# Use "ALTER TABLE ... RENAME COLUMN" if only the column name
# changed and there aren't any constraints.
- if (self.connection.features.can_alter_table_rename_column and
- old_field.column != new_field.column and
- self.column_sql(model, old_field) == self.column_sql(model, new_field) and
- not (old_field.remote_field and old_field.db_constraint or
- new_field.remote_field and new_field.db_constraint)):
- return self.execute(self._rename_field_sql(model._meta.db_table, old_field, new_field, new_type))
+ if (
+ self.connection.features.can_alter_table_rename_column
+ and old_field.column != new_field.column
+ and self.column_sql(model, old_field) == self.column_sql(model, new_field)
+ and not (
+ old_field.remote_field
+ and old_field.db_constraint
+ or new_field.remote_field
+ and new_field.db_constraint
+ )
+ ):
+ return self.execute(
+ self._rename_field_sql(
+ model._meta.db_table, old_field, new_field, new_type
+ )
+ )
# Alter by remaking table
self._remake_table(model, alter_field=(old_field, new_field))
# Rebuild tables with FKs pointing to this field.
@@ -393,15 +464,22 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
def _alter_many_to_many(self, model, old_field, new_field, strict):
"""Alter M2Ms to repoint their to= endpoints."""
- if old_field.remote_field.through._meta.db_table == new_field.remote_field.through._meta.db_table:
+ if (
+ old_field.remote_field.through._meta.db_table
+ == new_field.remote_field.through._meta.db_table
+ ):
# The field name didn't change, but some options did; we have to propagate this altering.
self._remake_table(
old_field.remote_field.through,
alter_field=(
# We need the field that points to the target model, so we can tell alter_field to change it -
# this is m2m_reverse_field_name() (as opposed to m2m_field_name, which points to our model)
- old_field.remote_field.through._meta.get_field(old_field.m2m_reverse_field_name()),
- new_field.remote_field.through._meta.get_field(new_field.m2m_reverse_field_name()),
+ old_field.remote_field.through._meta.get_field(
+ old_field.m2m_reverse_field_name()
+ ),
+ new_field.remote_field.through._meta.get_field(
+ new_field.m2m_reverse_field_name()
+ ),
),
)
return
@@ -409,29 +487,36 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
# Make a new through table
self.create_model(new_field.remote_field.through)
# Copy the data across
- self.execute("INSERT INTO %s (%s) SELECT %s FROM %s" % (
- self.quote_name(new_field.remote_field.through._meta.db_table),
- ', '.join([
- "id",
- new_field.m2m_column_name(),
- new_field.m2m_reverse_name(),
- ]),
- ', '.join([
- "id",
- old_field.m2m_column_name(),
- old_field.m2m_reverse_name(),
- ]),
- self.quote_name(old_field.remote_field.through._meta.db_table),
- ))
+ self.execute(
+ "INSERT INTO %s (%s) SELECT %s FROM %s"
+ % (
+ self.quote_name(new_field.remote_field.through._meta.db_table),
+ ", ".join(
+ [
+ "id",
+ new_field.m2m_column_name(),
+ new_field.m2m_reverse_name(),
+ ]
+ ),
+ ", ".join(
+ [
+ "id",
+ old_field.m2m_column_name(),
+ old_field.m2m_reverse_name(),
+ ]
+ ),
+ self.quote_name(old_field.remote_field.through._meta.db_table),
+ )
+ )
# Delete the old through table
self.delete_model(old_field.remote_field.through)
def add_constraint(self, model, constraint):
if isinstance(constraint, UniqueConstraint) and (
- constraint.condition or
- constraint.contains_expressions or
- constraint.include or
- constraint.deferrable
+ constraint.condition
+ or constraint.contains_expressions
+ or constraint.include
+ or constraint.deferrable
):
super().add_constraint(model, constraint)
else:
@@ -439,14 +524,14 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
def remove_constraint(self, model, constraint):
if isinstance(constraint, UniqueConstraint) and (
- constraint.condition or
- constraint.contains_expressions or
- constraint.include or
- constraint.deferrable
+ constraint.condition
+ or constraint.contains_expressions
+ or constraint.include
+ or constraint.deferrable
):
super().remove_constraint(model, constraint)
else:
self._remake_table(model)
def _collate_sql(self, collation):
- return 'COLLATE ' + collation
+ return "COLLATE " + collation