diff options
Diffstat (limited to 'django/db/backends/sqlite3/schema.py')
| -rw-r--r-- | django/db/backends/sqlite3/schema.py | 307 |
1 files changed, 196 insertions, 111 deletions
diff --git a/django/db/backends/sqlite3/schema.py b/django/db/backends/sqlite3/schema.py index 3ff0a3f7db..c9af8088e5 100644 --- a/django/db/backends/sqlite3/schema.py +++ b/django/db/backends/sqlite3/schema.py @@ -14,7 +14,9 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor): sql_delete_table = "DROP TABLE %(table)s" sql_create_fk = None - sql_create_inline_fk = "REFERENCES %(to_table)s (%(to_column)s) DEFERRABLE INITIALLY DEFERRED" + sql_create_inline_fk = ( + "REFERENCES %(to_table)s (%(to_column)s) DEFERRABLE INITIALLY DEFERRED" + ) sql_create_column_inline_fk = sql_create_inline_fk sql_create_unique = "CREATE UNIQUE INDEX %(name)s ON %(table)s (%(columns)s)" sql_delete_unique = "DROP INDEX %(name)s" @@ -24,11 +26,11 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor): # disabled. Enforce it here for the duration of the schema edition. if not self.connection.disable_constraint_checking(): raise NotSupportedError( - 'SQLite schema editor cannot be used while foreign key ' - 'constraint checks are enabled. Make sure to disable them ' - 'before entering a transaction.atomic() context because ' - 'SQLite does not support disabling them in the middle of ' - 'a multi-statement transaction.' + "SQLite schema editor cannot be used while foreign key " + "constraint checks are enabled. Make sure to disable them " + "before entering a transaction.atomic() context because " + "SQLite does not support disabling them in the middle of " + "a multi-statement transaction." ) return super().__enter__() @@ -43,6 +45,7 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor): # security hardening). try: import sqlite3 + value = sqlite3.adapt(value) except ImportError: pass @@ -54,7 +57,7 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor): elif isinstance(value, (Decimal, float, int)): return str(value) elif isinstance(value, str): - return "'%s'" % value.replace("\'", "\'\'") + return "'%s'" % value.replace("'", "''") elif value is None: return "NULL" elif isinstance(value, (bytes, bytearray, memoryview)): @@ -63,12 +66,16 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor): # character. return "X'%s'" % value.hex() else: - raise ValueError("Cannot quote parameter value %r of type %s" % (value, type(value))) + raise ValueError( + "Cannot quote parameter value %r of type %s" % (value, type(value)) + ) def prepare_default(self, value): return self.quote_value(value) - def _is_referenced_by_fk_constraint(self, table_name, column_name=None, ignore_self=False): + def _is_referenced_by_fk_constraint( + self, table_name, column_name=None, ignore_self=False + ): """ Return whether or not the provided table name is referenced by another one. If `column_name` is specified, only references pointing to that @@ -79,22 +86,33 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor): for other_table in self.connection.introspection.get_table_list(cursor): if ignore_self and other_table.name == table_name: continue - relations = self.connection.introspection.get_relations(cursor, other_table.name) + relations = self.connection.introspection.get_relations( + cursor, other_table.name + ) for constraint_column, constraint_table in relations.values(): - if (constraint_table == table_name and - (column_name is None or constraint_column == column_name)): + if constraint_table == table_name and ( + column_name is None or constraint_column == column_name + ): return True return False - def alter_db_table(self, model, old_db_table, new_db_table, disable_constraints=True): - if (not self.connection.features.supports_atomic_references_rename and - disable_constraints and self._is_referenced_by_fk_constraint(old_db_table)): + def alter_db_table( + self, model, old_db_table, new_db_table, disable_constraints=True + ): + if ( + not self.connection.features.supports_atomic_references_rename + and disable_constraints + and self._is_referenced_by_fk_constraint(old_db_table) + ): if self.connection.in_atomic_block: - raise NotSupportedError(( - 'Renaming the %r table while in a transaction is not ' - 'supported on SQLite < 3.26 because it would break referential ' - 'integrity. Try adding `atomic = False` to the Migration class.' - ) % old_db_table) + raise NotSupportedError( + ( + "Renaming the %r table while in a transaction is not " + "supported on SQLite < 3.26 because it would break referential " + "integrity. Try adding `atomic = False` to the Migration class." + ) + % old_db_table + ) self.connection.enable_constraint_checking() super().alter_db_table(model, old_db_table, new_db_table) self.connection.disable_constraint_checking() @@ -107,42 +125,56 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor): old_field_name = old_field.name table_name = model._meta.db_table _, old_column_name = old_field.get_attname_column() - if (new_field.name != old_field_name and - not self.connection.features.supports_atomic_references_rename and - self._is_referenced_by_fk_constraint(table_name, old_column_name, ignore_self=True)): + if ( + new_field.name != old_field_name + and not self.connection.features.supports_atomic_references_rename + and self._is_referenced_by_fk_constraint( + table_name, old_column_name, ignore_self=True + ) + ): if self.connection.in_atomic_block: - raise NotSupportedError(( - 'Renaming the %r.%r column while in a transaction is not ' - 'supported on SQLite < 3.26 because it would break referential ' - 'integrity. Try adding `atomic = False` to the Migration class.' - ) % (model._meta.db_table, old_field_name)) + raise NotSupportedError( + ( + "Renaming the %r.%r column while in a transaction is not " + "supported on SQLite < 3.26 because it would break referential " + "integrity. Try adding `atomic = False` to the Migration class." + ) + % (model._meta.db_table, old_field_name) + ) with atomic(self.connection.alias): super().alter_field(model, old_field, new_field, strict=strict) # Follow SQLite's documented procedure for performing changes # that don't affect the on-disk content. # https://sqlite.org/lang_altertable.html#otheralter with self.connection.cursor() as cursor: - schema_version = cursor.execute('PRAGMA schema_version').fetchone()[0] - cursor.execute('PRAGMA writable_schema = 1') + schema_version = cursor.execute("PRAGMA schema_version").fetchone()[ + 0 + ] + cursor.execute("PRAGMA writable_schema = 1") references_template = ' REFERENCES "%s" ("%%s") ' % table_name new_column_name = new_field.get_attname_column()[1] search = references_template % old_column_name replacement = references_template % new_column_name - cursor.execute('UPDATE sqlite_master SET sql = replace(sql, %s, %s)', (search, replacement)) - cursor.execute('PRAGMA schema_version = %d' % (schema_version + 1)) - cursor.execute('PRAGMA writable_schema = 0') + cursor.execute( + "UPDATE sqlite_master SET sql = replace(sql, %s, %s)", + (search, replacement), + ) + cursor.execute("PRAGMA schema_version = %d" % (schema_version + 1)) + cursor.execute("PRAGMA writable_schema = 0") # The integrity check will raise an exception and rollback # the transaction if the sqlite_master updates corrupt the # database. - cursor.execute('PRAGMA integrity_check') + cursor.execute("PRAGMA integrity_check") # Perform a VACUUM to refresh the database representation from # the sqlite_master table. with self.connection.cursor() as cursor: - cursor.execute('VACUUM') + cursor.execute("VACUUM") else: super().alter_field(model, old_field, new_field, strict=strict) - def _remake_table(self, model, create_field=None, delete_field=None, alter_field=None): + def _remake_table( + self, model, create_field=None, delete_field=None, alter_field=None + ): """ Shortcut to transform a model from old_model into new_model @@ -163,6 +195,7 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor): # to an altered field. def is_self_referential(f): return f.is_relation and f.remote_field.model is model + # Work out the new fields dict / mapping body = { f.name: f.clone() if is_self_referential(f) else f @@ -170,14 +203,18 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor): } # Since mapping might mix column names and default values, # its values must be already quoted. - mapping = {f.column: self.quote_name(f.column) for f in model._meta.local_concrete_fields} + mapping = { + f.column: self.quote_name(f.column) + for f in model._meta.local_concrete_fields + } # This maps field names (not columns) for things like unique_together rename_mapping = {} # If any of the new or altered fields is introducing a new PK, # remove the old one restore_pk_field = None - if getattr(create_field, 'primary_key', False) or ( - alter_field and getattr(alter_field[1], 'primary_key', False)): + if getattr(create_field, "primary_key", False) or ( + alter_field and getattr(alter_field[1], "primary_key", False) + ): for name, field in list(body.items()): if field.primary_key: field.primary_key = False @@ -201,8 +238,8 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor): body[new_field.name] = new_field if old_field.null and not new_field.null: case_sql = "coalesce(%(col)s, %(default)s)" % { - 'col': self.quote_name(old_field.column), - 'default': self.prepare_default(self.effective_default(new_field)), + "col": self.quote_name(old_field.column), + "default": self.prepare_default(self.effective_default(new_field)), } mapping[new_field.column] = case_sql else: @@ -213,7 +250,10 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor): del body[delete_field.name] del mapping[delete_field.column] # Remove any implicit M2M tables - if delete_field.many_to_many and delete_field.remote_field.through._meta.auto_created: + if ( + delete_field.many_to_many + and delete_field.remote_field.through._meta.auto_created + ): return self.delete_model(delete_field.remote_field.through) # Work inside a new app registry apps = Apps() @@ -235,8 +275,7 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor): indexes = model._meta.indexes if delete_field: indexes = [ - index for index in indexes - if delete_field.name not in index.fields + index for index in indexes if delete_field.name not in index.fields ] constraints = list(model._meta.constraints) @@ -252,52 +291,57 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor): # This wouldn't be required if the schema editor was operating on model # states instead of rendered models. meta_contents = { - 'app_label': model._meta.app_label, - 'db_table': model._meta.db_table, - 'unique_together': unique_together, - 'index_together': index_together, - 'indexes': indexes, - 'constraints': constraints, - 'apps': apps, + "app_label": model._meta.app_label, + "db_table": model._meta.db_table, + "unique_together": unique_together, + "index_together": index_together, + "indexes": indexes, + "constraints": constraints, + "apps": apps, } meta = type("Meta", (), meta_contents) - body_copy['Meta'] = meta - body_copy['__module__'] = model.__module__ + body_copy["Meta"] = meta + body_copy["__module__"] = model.__module__ type(model._meta.object_name, model.__bases__, body_copy) # Construct a model with a renamed table name. body_copy = copy.deepcopy(body) meta_contents = { - 'app_label': model._meta.app_label, - 'db_table': 'new__%s' % strip_quotes(model._meta.db_table), - 'unique_together': unique_together, - 'index_together': index_together, - 'indexes': indexes, - 'constraints': constraints, - 'apps': apps, + "app_label": model._meta.app_label, + "db_table": "new__%s" % strip_quotes(model._meta.db_table), + "unique_together": unique_together, + "index_together": index_together, + "indexes": indexes, + "constraints": constraints, + "apps": apps, } meta = type("Meta", (), meta_contents) - body_copy['Meta'] = meta - body_copy['__module__'] = model.__module__ - new_model = type('New%s' % model._meta.object_name, model.__bases__, body_copy) + body_copy["Meta"] = meta + body_copy["__module__"] = model.__module__ + new_model = type("New%s" % model._meta.object_name, model.__bases__, body_copy) # Create a new table with the updated schema. self.create_model(new_model) # Copy data from the old table into the new table - self.execute("INSERT INTO %s (%s) SELECT %s FROM %s" % ( - self.quote_name(new_model._meta.db_table), - ', '.join(self.quote_name(x) for x in mapping), - ', '.join(mapping.values()), - self.quote_name(model._meta.db_table), - )) + self.execute( + "INSERT INTO %s (%s) SELECT %s FROM %s" + % ( + self.quote_name(new_model._meta.db_table), + ", ".join(self.quote_name(x) for x in mapping), + ", ".join(mapping.values()), + self.quote_name(model._meta.db_table), + ) + ) # Delete the old table to make way for the new self.delete_model(model, handle_autom2m=False) # Rename the new table to take way for the old self.alter_db_table( - new_model, new_model._meta.db_table, model._meta.db_table, + new_model, + new_model._meta.db_table, + model._meta.db_table, disable_constraints=False, ) @@ -314,12 +358,17 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor): super().delete_model(model) else: # Delete the table (and only that) - self.execute(self.sql_delete_table % { - "table": self.quote_name(model._meta.db_table), - }) + self.execute( + self.sql_delete_table + % { + "table": self.quote_name(model._meta.db_table), + } + ) # Remove all deferred statements referencing the deleted table. for sql in list(self.deferred_sql): - if isinstance(sql, Statement) and sql.references_table(model._meta.db_table): + if isinstance(sql, Statement) and sql.references_table( + model._meta.db_table + ): self.deferred_sql.remove(sql) def add_field(self, model, field): @@ -327,11 +376,14 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor): if ( # Primary keys and unique fields are not supported in ALTER TABLE # ADD COLUMN. - field.primary_key or field.unique or + field.primary_key + or field.unique + or # Fields with default values cannot by handled by ALTER TABLE ADD # COLUMN statement because DROP DEFAULT is not supported in # ALTER TABLE. - not field.null or self.effective_default(field) is not None + not field.null + or self.effective_default(field) is not None ): self._remake_table(model, create_field=field) else: @@ -351,21 +403,40 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor): # For everything else, remake. else: # It might not actually have a column behind it - if field.db_parameters(connection=self.connection)['type'] is None: + if field.db_parameters(connection=self.connection)["type"] is None: return self._remake_table(model, delete_field=field) - def _alter_field(self, model, old_field, new_field, old_type, new_type, - old_db_params, new_db_params, strict=False): + def _alter_field( + self, + model, + old_field, + new_field, + old_type, + new_type, + old_db_params, + new_db_params, + strict=False, + ): """Perform a "physical" (non-ManyToMany) field update.""" # Use "ALTER TABLE ... RENAME COLUMN" if only the column name # changed and there aren't any constraints. - if (self.connection.features.can_alter_table_rename_column and - old_field.column != new_field.column and - self.column_sql(model, old_field) == self.column_sql(model, new_field) and - not (old_field.remote_field and old_field.db_constraint or - new_field.remote_field and new_field.db_constraint)): - return self.execute(self._rename_field_sql(model._meta.db_table, old_field, new_field, new_type)) + if ( + self.connection.features.can_alter_table_rename_column + and old_field.column != new_field.column + and self.column_sql(model, old_field) == self.column_sql(model, new_field) + and not ( + old_field.remote_field + and old_field.db_constraint + or new_field.remote_field + and new_field.db_constraint + ) + ): + return self.execute( + self._rename_field_sql( + model._meta.db_table, old_field, new_field, new_type + ) + ) # Alter by remaking table self._remake_table(model, alter_field=(old_field, new_field)) # Rebuild tables with FKs pointing to this field. @@ -393,15 +464,22 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor): def _alter_many_to_many(self, model, old_field, new_field, strict): """Alter M2Ms to repoint their to= endpoints.""" - if old_field.remote_field.through._meta.db_table == new_field.remote_field.through._meta.db_table: + if ( + old_field.remote_field.through._meta.db_table + == new_field.remote_field.through._meta.db_table + ): # The field name didn't change, but some options did; we have to propagate this altering. self._remake_table( old_field.remote_field.through, alter_field=( # We need the field that points to the target model, so we can tell alter_field to change it - # this is m2m_reverse_field_name() (as opposed to m2m_field_name, which points to our model) - old_field.remote_field.through._meta.get_field(old_field.m2m_reverse_field_name()), - new_field.remote_field.through._meta.get_field(new_field.m2m_reverse_field_name()), + old_field.remote_field.through._meta.get_field( + old_field.m2m_reverse_field_name() + ), + new_field.remote_field.through._meta.get_field( + new_field.m2m_reverse_field_name() + ), ), ) return @@ -409,29 +487,36 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor): # Make a new through table self.create_model(new_field.remote_field.through) # Copy the data across - self.execute("INSERT INTO %s (%s) SELECT %s FROM %s" % ( - self.quote_name(new_field.remote_field.through._meta.db_table), - ', '.join([ - "id", - new_field.m2m_column_name(), - new_field.m2m_reverse_name(), - ]), - ', '.join([ - "id", - old_field.m2m_column_name(), - old_field.m2m_reverse_name(), - ]), - self.quote_name(old_field.remote_field.through._meta.db_table), - )) + self.execute( + "INSERT INTO %s (%s) SELECT %s FROM %s" + % ( + self.quote_name(new_field.remote_field.through._meta.db_table), + ", ".join( + [ + "id", + new_field.m2m_column_name(), + new_field.m2m_reverse_name(), + ] + ), + ", ".join( + [ + "id", + old_field.m2m_column_name(), + old_field.m2m_reverse_name(), + ] + ), + self.quote_name(old_field.remote_field.through._meta.db_table), + ) + ) # Delete the old through table self.delete_model(old_field.remote_field.through) def add_constraint(self, model, constraint): if isinstance(constraint, UniqueConstraint) and ( - constraint.condition or - constraint.contains_expressions or - constraint.include or - constraint.deferrable + constraint.condition + or constraint.contains_expressions + or constraint.include + or constraint.deferrable ): super().add_constraint(model, constraint) else: @@ -439,14 +524,14 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor): def remove_constraint(self, model, constraint): if isinstance(constraint, UniqueConstraint) and ( - constraint.condition or - constraint.contains_expressions or - constraint.include or - constraint.deferrable + constraint.condition + or constraint.contains_expressions + or constraint.include + or constraint.deferrable ): super().remove_constraint(model, constraint) else: self._remake_table(model) def _collate_sql(self, collation): - return 'COLLATE ' + collation + return "COLLATE " + collation |
