diff options
| author | Hannes Ljungberg <hannes@5monkeys.se> | 2019-10-10 20:04:17 +0200 |
|---|---|---|
| committer | Mariusz Felisiak <felisiak.mariusz@gmail.com> | 2021-01-13 11:47:50 +0100 |
| commit | 83fcfc9ec8610540948815e127101f1206562ead (patch) | |
| tree | aca51a61ef2d7397ee170fc74c564d242c441147 /tests | |
| parent | e3ece0144a988bc522c4bd551baecaf2139ce4ed (diff) | |
Fixed #26167 -- Added support for functional indexes.
Thanks Simon Charette, Mads Jensen, and Mariusz Felisiak for reviews.
Co-authored-by: Markus Holtermann <info@markusholtermann.eu>
Diffstat (limited to 'tests')
| -rw-r--r-- | tests/backends/test_ddl_references.py | 73 | ||||
| -rw-r--r-- | tests/indexes/tests.py | 60 | ||||
| -rw-r--r-- | tests/invalid_models_tests/test_models.py | 30 | ||||
| -rw-r--r-- | tests/migrations/test_base.py | 18 | ||||
| -rw-r--r-- | tests/migrations/test_operations.py | 71 | ||||
| -rw-r--r-- | tests/model_indexes/tests.py | 76 | ||||
| -rw-r--r-- | tests/postgres_tests/fields.py | 3 | ||||
| -rw-r--r-- | tests/postgres_tests/test_indexes.py | 162 | ||||
| -rw-r--r-- | tests/schema/tests.py | 375 |
9 files changed, 854 insertions, 14 deletions
diff --git a/tests/backends/test_ddl_references.py b/tests/backends/test_ddl_references.py index d96ebcb57f..bd4036ee33 100644 --- a/tests/backends/test_ddl_references.py +++ b/tests/backends/test_ddl_references.py @@ -1,7 +1,13 @@ +from django.db import connection from django.db.backends.ddl_references import ( - Columns, ForeignKeyName, IndexName, Statement, Table, + Columns, Expressions, ForeignKeyName, IndexName, Statement, Table, ) -from django.test import SimpleTestCase +from django.db.models import ExpressionList, F +from django.db.models.functions import Upper +from django.db.models.indexes import IndexExpression +from django.test import SimpleTestCase, TransactionTestCase + +from .models import Person class TableTests(SimpleTestCase): @@ -181,3 +187,66 @@ class StatementTests(SimpleTestCase): reference = MockReference('reference', {}, {}) statement = Statement("%(reference)s - %(non_reference)s", reference=reference, non_reference='non_reference') self.assertEqual(str(statement), 'reference - non_reference') + + +class ExpressionsTests(TransactionTestCase): + available_apps = [] + + def setUp(self): + compiler = Person.objects.all().query.get_compiler(connection.alias) + self.editor = connection.schema_editor() + self.expressions = Expressions( + table=Person._meta.db_table, + expressions=ExpressionList( + IndexExpression(F('first_name')), + IndexExpression(F('last_name').desc()), + IndexExpression(Upper('last_name')), + ).resolve_expression(compiler.query), + compiler=compiler, + quote_value=self.editor.quote_value, + ) + + def test_references_table(self): + self.assertIs(self.expressions.references_table(Person._meta.db_table), True) + self.assertIs(self.expressions.references_table('other'), False) + + def test_references_column(self): + table = Person._meta.db_table + self.assertIs(self.expressions.references_column(table, 'first_name'), True) + self.assertIs(self.expressions.references_column(table, 'last_name'), True) + self.assertIs(self.expressions.references_column(table, 'other'), False) + + def test_rename_table_references(self): + table = Person._meta.db_table + self.expressions.rename_table_references(table, 'other') + self.assertIs(self.expressions.references_table(table), False) + self.assertIs(self.expressions.references_table('other'), True) + self.assertIn( + '%s.%s' % ( + self.editor.quote_name('other'), + self.editor.quote_name('first_name'), + ), + str(self.expressions), + ) + + def test_rename_column_references(self): + table = Person._meta.db_table + self.expressions.rename_column_references(table, 'first_name', 'other') + self.assertIs(self.expressions.references_column(table, 'other'), True) + self.assertIs(self.expressions.references_column(table, 'first_name'), False) + self.assertIn( + '%s.%s' % (self.editor.quote_name(table), self.editor.quote_name('other')), + str(self.expressions), + ) + + def test_str(self): + table_name = self.editor.quote_name(Person._meta.db_table) + expected_str = '%s.%s, %s.%s DESC, (UPPER(%s.%s))' % ( + table_name, + self.editor.quote_name('first_name'), + table_name, + self.editor.quote_name('last_name'), + table_name, + self.editor.quote_name('last_name'), + ) + self.assertEqual(str(self.expressions), expected_str) diff --git a/tests/indexes/tests.py b/tests/indexes/tests.py index 6d01e3b52f..ae68113c75 100644 --- a/tests/indexes/tests.py +++ b/tests/indexes/tests.py @@ -3,6 +3,7 @@ from unittest import skipUnless from django.db import connection from django.db.models import CASCADE, ForeignKey, Index, Q +from django.db.models.functions import Lower from django.test import ( TestCase, TransactionTestCase, skipIfDBFeature, skipUnlessDBFeature, ) @@ -452,6 +453,40 @@ class PartialIndexTests(TransactionTestCase): )) editor.remove_index(index=index, model=Article) + @skipUnlessDBFeature('supports_expression_indexes') + def test_partial_func_index(self): + index_name = 'partial_func_idx' + index = Index( + Lower('headline').desc(), + name=index_name, + condition=Q(pub_date__isnull=False), + ) + with connection.schema_editor() as editor: + editor.add_index(index=index, model=Article) + sql = index.create_sql(Article, schema_editor=editor) + table = Article._meta.db_table + self.assertIs(sql.references_column(table, 'headline'), True) + sql = str(sql) + self.assertIn('LOWER(%s)' % editor.quote_name('headline'), sql) + self.assertIn( + 'WHERE %s IS NOT NULL' % editor.quote_name('pub_date'), + sql, + ) + self.assertGreater(sql.find('WHERE'), sql.find('LOWER')) + with connection.cursor() as cursor: + constraints = connection.introspection.get_constraints( + cursor=cursor, table_name=table, + ) + self.assertIn(index_name, constraints) + if connection.features.supports_index_column_ordering: + self.assertEqual(constraints[index_name]['orders'], ['DESC']) + with connection.schema_editor() as editor: + editor.remove_index(Article, index) + with connection.cursor() as cursor: + self.assertNotIn(index_name, connection.introspection.get_constraints( + cursor=cursor, table_name=table, + )) + @skipUnlessDBFeature('supports_covering_indexes') class CoveringIndexTests(TransactionTestCase): @@ -520,6 +555,31 @@ class CoveringIndexTests(TransactionTestCase): cursor=cursor, table_name=Article._meta.db_table, )) + def test_covering_func_index(self): + index_name = 'covering_func_headline_idx' + index = Index(Lower('headline'), name=index_name, include=['pub_date']) + with connection.schema_editor() as editor: + editor.add_index(index=index, model=Article) + sql = index.create_sql(Article, schema_editor=editor) + table = Article._meta.db_table + self.assertIs(sql.references_column(table, 'headline'), True) + sql = str(sql) + self.assertIn('LOWER(%s)' % editor.quote_name('headline'), sql) + self.assertIn('INCLUDE (%s)' % editor.quote_name('pub_date'), sql) + self.assertGreater(sql.find('INCLUDE'), sql.find('LOWER')) + with connection.cursor() as cursor: + constraints = connection.introspection.get_constraints( + cursor=cursor, table_name=table, + ) + self.assertIn(index_name, constraints) + self.assertIn('pub_date', constraints[index_name]['columns']) + with connection.schema_editor() as editor: + editor.remove_index(Article, index) + with connection.cursor() as cursor: + self.assertNotIn(index_name, connection.introspection.get_constraints( + cursor=cursor, table_name=table, + )) + @skipIfDBFeature('supports_covering_indexes') class CoveringIndexIgnoredTests(TransactionTestCase): diff --git a/tests/invalid_models_tests/test_models.py b/tests/invalid_models_tests/test_models.py index d9993c00cd..3203c26a2e 100644 --- a/tests/invalid_models_tests/test_models.py +++ b/tests/invalid_models_tests/test_models.py @@ -495,6 +495,36 @@ class IndexesTests(TestCase): self.assertEqual(Model.check(databases=self.databases), []) + def test_func_index(self): + class Model(models.Model): + name = models.CharField(max_length=10) + + class Meta: + indexes = [models.Index(Lower('name'), name='index_lower_name')] + + warn = Warning( + '%s does not support indexes on expressions.' + % connection.display_name, + hint=( + "An index won't be created. Silence this warning if you don't " + "care about it." + ), + obj=Model, + id='models.W043', + ) + expected = [] if connection.features.supports_expression_indexes else [warn] + self.assertEqual(Model.check(databases=self.databases), expected) + + def test_func_index_required_db_features(self): + class Model(models.Model): + name = models.CharField(max_length=10) + + class Meta: + indexes = [models.Index(Lower('name'), name='index_lower_name')] + required_db_features = {'supports_expression_indexes'} + + self.assertEqual(Model.check(databases=self.databases), []) + @isolate_apps('invalid_models_tests') class FieldNamesTests(TestCase): diff --git a/tests/migrations/test_base.py b/tests/migrations/test_base.py index 9adc0d5264..6f8081a462 100644 --- a/tests/migrations/test_base.py +++ b/tests/migrations/test_base.py @@ -73,6 +73,20 @@ class MigrationTestBase(TransactionTestCase): def assertIndexNotExists(self, table, columns): return self.assertIndexExists(table, columns, False) + def assertIndexNameExists(self, table, index, using='default'): + with connections[using].cursor() as cursor: + self.assertIn( + index, + connection.introspection.get_constraints(cursor, table), + ) + + def assertIndexNameNotExists(self, table, index, using='default'): + with connections[using].cursor() as cursor: + self.assertNotIn( + index, + connection.introspection.get_constraints(cursor, table), + ) + def assertConstraintExists(self, table, name, value=True, using='default'): with connections[using].cursor() as cursor: constraints = connections[using].introspection.get_constraints(cursor, table).items() @@ -194,6 +208,7 @@ class OperationTestBase(MigrationTestBase): multicol_index=False, related_model=False, mti_model=False, proxy_model=False, manager_model=False, unique_together=False, options=False, db_table=None, index_together=False, constraints=None, + indexes=None, ): """Creates a test model state and database table.""" # Make the "current" state. @@ -225,6 +240,9 @@ class OperationTestBase(MigrationTestBase): 'Pony', models.Index(fields=['pink', 'weight'], name='pony_test_idx'), )) + if indexes: + for index in indexes: + operations.append(migrations.AddIndex('Pony', index)) if constraints: for constraint in constraints: operations.append(migrations.AddConstraint('Pony', constraint)) diff --git a/tests/migrations/test_operations.py b/tests/migrations/test_operations.py index 38ad6a2d18..897808f75b 100644 --- a/tests/migrations/test_operations.py +++ b/tests/migrations/test_operations.py @@ -5,6 +5,7 @@ from django.db import ( from django.db.migrations.migration import Migration from django.db.migrations.operations.fields import FieldOperation from django.db.migrations.state import ModelState, ProjectState +from django.db.models.functions import Abs from django.db.transaction import atomic from django.test import SimpleTestCase, override_settings, skipUnlessDBFeature @@ -1939,6 +1940,76 @@ class OperationTests(OperationTestBase): new_model = new_state.apps.get_model('test_rminsf', 'Pony') self.assertIsNot(old_model, new_model) + @skipUnlessDBFeature('supports_expression_indexes') + def test_add_func_index(self): + app_label = 'test_addfuncin' + index_name = f'{app_label}_pony_abs_idx' + table_name = f'{app_label}_pony' + project_state = self.set_up_test_model(app_label) + index = models.Index(Abs('weight'), name=index_name) + operation = migrations.AddIndex('Pony', index) + self.assertEqual( + operation.describe(), + 'Create index test_addfuncin_pony_abs_idx on Abs(F(weight)) on model Pony', + ) + self.assertEqual( + operation.migration_name_fragment, + 'pony_test_addfuncin_pony_abs_idx', + ) + new_state = project_state.clone() + operation.state_forwards(app_label, new_state) + self.assertEqual(len(new_state.models[app_label, 'pony'].options['indexes']), 1) + self.assertIndexNameNotExists(table_name, index_name) + # Add index. + with connection.schema_editor() as editor: + operation.database_forwards(app_label, editor, project_state, new_state) + self.assertIndexNameExists(table_name, index_name) + # Reversal. + with connection.schema_editor() as editor: + operation.database_backwards(app_label, editor, new_state, project_state) + self.assertIndexNameNotExists(table_name, index_name) + # Deconstruction. + definition = operation.deconstruct() + self.assertEqual(definition[0], 'AddIndex') + self.assertEqual(definition[1], []) + self.assertEqual(definition[2], {'model_name': 'Pony', 'index': index}) + + @skipUnlessDBFeature('supports_expression_indexes') + def test_remove_func_index(self): + app_label = 'test_rmfuncin' + index_name = f'{app_label}_pony_abs_idx' + table_name = f'{app_label}_pony' + project_state = self.set_up_test_model(app_label, indexes=[ + models.Index(Abs('weight'), name=index_name), + ]) + self.assertTableExists(table_name) + self.assertIndexNameExists(table_name, index_name) + operation = migrations.RemoveIndex('Pony', index_name) + self.assertEqual( + operation.describe(), + 'Remove index test_rmfuncin_pony_abs_idx from Pony', + ) + self.assertEqual( + operation.migration_name_fragment, + 'remove_pony_test_rmfuncin_pony_abs_idx', + ) + new_state = project_state.clone() + operation.state_forwards(app_label, new_state) + self.assertEqual(len(new_state.models[app_label, 'pony'].options['indexes']), 0) + # Remove index. + with connection.schema_editor() as editor: + operation.database_forwards(app_label, editor, project_state, new_state) + self.assertIndexNameNotExists(table_name, index_name) + # Reversal. + with connection.schema_editor() as editor: + operation.database_backwards(app_label, editor, new_state, project_state) + self.assertIndexNameExists(table_name, index_name) + # Deconstruction. + definition = operation.deconstruct() + self.assertEqual(definition[0], 'RemoveIndex') + self.assertEqual(definition[1], []) + self.assertEqual(definition[2], {'model_name': 'Pony', 'name': index_name}) + def test_alter_field_with_index(self): """ Test AlterField operation with an index to ensure indexes created via diff --git a/tests/model_indexes/tests.py b/tests/model_indexes/tests.py index 1fe283340e..ab231edd5e 100644 --- a/tests/model_indexes/tests.py +++ b/tests/model_indexes/tests.py @@ -2,6 +2,7 @@ from unittest import mock from django.conf import settings from django.db import connection, models +from django.db.models.functions import Lower, Upper from django.test import SimpleTestCase, TestCase, skipUnlessDBFeature from django.test.utils import isolate_apps @@ -27,6 +28,7 @@ class SimpleIndexesTests(SimpleTestCase): name='opclasses_idx', opclasses=['varchar_pattern_ops', 'text_pattern_ops'], ) + func_index = models.Index(Lower('title'), name='book_func_idx') self.assertEqual(repr(index), "<Index: fields='title'>") self.assertEqual(repr(multi_col_index), "<Index: fields='title, author'>") self.assertEqual(repr(partial_index), "<Index: fields='title' condition=(AND: ('pages__gt', 400))>") @@ -39,6 +41,7 @@ class SimpleIndexesTests(SimpleTestCase): "<Index: fields='headline, body' " "opclasses='varchar_pattern_ops, text_pattern_ops'>", ) + self.assertEqual(repr(func_index), "<Index: expressions='Lower(F(title))'>") def test_eq(self): index = models.Index(fields=['title']) @@ -51,6 +54,14 @@ class SimpleIndexesTests(SimpleTestCase): self.assertEqual(index, mock.ANY) self.assertNotEqual(index, another_index) + def test_eq_func(self): + index = models.Index(Lower('title'), models.F('author'), name='book_func_idx') + same_index = models.Index(Lower('title'), 'author', name='book_func_idx') + another_index = models.Index(Lower('title'), name='book_func_idx') + self.assertEqual(index, same_index) + self.assertEqual(index, mock.ANY) + self.assertNotEqual(index, another_index) + def test_index_fields_type(self): with self.assertRaisesMessage(ValueError, 'Index.fields must be a list or tuple.'): models.Index(fields='title') @@ -63,11 +74,16 @@ class SimpleIndexesTests(SimpleTestCase): def test_fields_tuple(self): self.assertEqual(models.Index(fields=('title',)).fields, ['title']) - def test_raises_error_without_field(self): - msg = 'At least one field is required to define an index.' + def test_requires_field_or_expression(self): + msg = 'At least one field or expression is required to define an index.' with self.assertRaisesMessage(ValueError, msg): models.Index() + def test_expressions_and_fields_mutually_exclusive(self): + msg = "Index.fields and expressions are mutually exclusive." + with self.assertRaisesMessage(ValueError, msg): + models.Index(Upper('foo'), fields=['field']) + def test_opclasses_requires_index_name(self): with self.assertRaisesMessage(ValueError, 'An index must be named to use opclasses.'): models.Index(opclasses=['jsonb_path_ops']) @@ -85,6 +101,23 @@ class SimpleIndexesTests(SimpleTestCase): with self.assertRaisesMessage(ValueError, 'An index must be named to use condition.'): models.Index(condition=models.Q(pages__gt=400)) + def test_expressions_requires_index_name(self): + msg = 'An index must be named to use expressions.' + with self.assertRaisesMessage(ValueError, msg): + models.Index(Lower('field')) + + def test_expressions_with_opclasses(self): + msg = ( + 'Index.opclasses cannot be used with expressions. Use ' + 'django.contrib.postgres.indexes.OpClass() instead.' + ) + with self.assertRaisesMessage(ValueError, msg): + models.Index( + Lower('field'), + name='test_func_opclass', + opclasses=['jsonb_path_ops'], + ) + def test_condition_must_be_q(self): with self.assertRaisesMessage(ValueError, 'Index.condition must be a Q instance.'): models.Index(condition='invalid', name='long_book_idx') @@ -181,12 +214,25 @@ class SimpleIndexesTests(SimpleTestCase): }, ) + def test_deconstruct_with_expressions(self): + index = models.Index(Upper('title'), name='book_func_idx') + path, args, kwargs = index.deconstruct() + self.assertEqual(path, 'django.db.models.Index') + self.assertEqual(args, (Upper('title'),)) + self.assertEqual(kwargs, {'name': 'book_func_idx'}) + def test_clone(self): index = models.Index(fields=['title']) new_index = index.clone() self.assertIsNot(index, new_index) self.assertEqual(index.fields, new_index.fields) + def test_clone_with_expressions(self): + index = models.Index(Upper('title'), name='book_func_idx') + new_index = index.clone() + self.assertIsNot(index, new_index) + self.assertEqual(index.expressions, new_index.expressions) + def test_name_set(self): index_names = [index.name for index in Book._meta.indexes] self.assertCountEqual( @@ -248,3 +294,29 @@ class IndexesTests(TestCase): # db_tablespace. index = models.Index(fields=['shortcut']) self.assertIn('"idx_tbls"', str(index.create_sql(Book, editor)).lower()) + + @skipUnlessDBFeature('supports_tablespaces') + def test_func_with_tablespace(self): + # Functional index with db_tablespace attribute. + index = models.Index( + Lower('shortcut').desc(), + name='functional_tbls', + db_tablespace='idx_tbls2', + ) + with connection.schema_editor() as editor: + sql = str(index.create_sql(Book, editor)) + self.assertIn(editor.quote_name('idx_tbls2'), sql) + # Functional index without db_tablespace attribute. + index = models.Index(Lower('shortcut').desc(), name='functional_no_tbls') + with connection.schema_editor() as editor: + sql = str(index.create_sql(Book, editor)) + # The DEFAULT_INDEX_TABLESPACE setting can't be tested because it's + # evaluated when the model class is defined. As a consequence, + # @override_settings doesn't work. + if settings.DEFAULT_INDEX_TABLESPACE: + self.assertIn( + editor.quote_name(settings.DEFAULT_INDEX_TABLESPACE), + sql, + ) + else: + self.assertNotIn('TABLESPACE', sql) diff --git a/tests/postgres_tests/fields.py b/tests/postgres_tests/fields.py index a36c10c750..b1bb6668d6 100644 --- a/tests/postgres_tests/fields.py +++ b/tests/postgres_tests/fields.py @@ -12,7 +12,7 @@ try: CITextField, DateRangeField, DateTimeRangeField, DecimalRangeField, HStoreField, IntegerRangeField, ) - from django.contrib.postgres.search import SearchVectorField + from django.contrib.postgres.search import SearchVector, SearchVectorField except ImportError: class DummyArrayField(models.Field): def __init__(self, base_field, size=None, **kwargs): @@ -36,6 +36,7 @@ except ImportError: DecimalRangeField = models.Field HStoreField = models.Field IntegerRangeField = models.Field + SearchVector = models.Expression SearchVectorField = models.Field diff --git a/tests/postgres_tests/test_indexes.py b/tests/postgres_tests/test_indexes.py index b9888f4843..49646feb97 100644 --- a/tests/postgres_tests/test_indexes.py +++ b/tests/postgres_tests/test_indexes.py @@ -1,17 +1,18 @@ from unittest import mock from django.contrib.postgres.indexes import ( - BloomIndex, BrinIndex, BTreeIndex, GinIndex, GistIndex, HashIndex, + BloomIndex, BrinIndex, BTreeIndex, GinIndex, GistIndex, HashIndex, OpClass, SpGistIndex, ) from django.db import NotSupportedError, connection -from django.db.models import CharField, Q -from django.db.models.functions import Length +from django.db.models import CharField, F, Index, Q +from django.db.models.functions import Cast, Collate, Length, Lower from django.test import skipUnlessDBFeature -from django.test.utils import register_lookup +from django.test.utils import modify_settings, register_lookup from . import PostgreSQLSimpleTestCase, PostgreSQLTestCase -from .models import CharFieldModel, IntegerArrayModel, Scene +from .fields import SearchVector, SearchVectorField +from .models import CharFieldModel, IntegerArrayModel, Scene, TextFieldModel class IndexTestMixin: @@ -28,6 +29,17 @@ class IndexTestMixin: self.assertEqual(args, ()) self.assertEqual(kwargs, {'fields': ['title'], 'name': 'test_title_%s' % self.index_class.suffix}) + def test_deconstruction_with_expressions_no_customization(self): + name = f'test_title_{self.index_class.suffix}' + index = self.index_class(Lower('title'), name=name) + path, args, kwargs = index.deconstruct() + self.assertEqual( + path, + f'django.contrib.postgres.indexes.{self.index_class.__name__}', + ) + self.assertEqual(args, (Lower('title'),)) + self.assertEqual(kwargs, {'name': name}) + class BloomIndexTests(IndexTestMixin, PostgreSQLSimpleTestCase): index_class = BloomIndex @@ -181,7 +193,14 @@ class SpGistIndexTests(IndexTestMixin, PostgreSQLSimpleTestCase): self.assertEqual(kwargs, {'fields': ['title'], 'name': 'test_title_spgist', 'fillfactor': 80}) +@modify_settings(INSTALLED_APPS={'append': 'django.contrib.postgres'}) class SchemaTests(PostgreSQLTestCase): + get_opclass_query = ''' + SELECT opcname, c.relname FROM pg_opclass AS oc + JOIN pg_index as i on oc.oid = ANY(i.indclass) + JOIN pg_class as c on c.oid = i.indexrelid + WHERE c.relname = %s + ''' def get_constraints(self, table): """ @@ -260,6 +279,37 @@ class SchemaTests(PostgreSQLTestCase): editor.remove_index(IntegerArrayModel, index) self.assertNotIn(index_name, self.get_constraints(IntegerArrayModel._meta.db_table)) + def test_trigram_op_class_gin_index(self): + index_name = 'trigram_op_class_gin' + index = GinIndex(OpClass(F('scene'), name='gin_trgm_ops'), name=index_name) + with connection.schema_editor() as editor: + editor.add_index(Scene, index) + with editor.connection.cursor() as cursor: + cursor.execute(self.get_opclass_query, [index_name]) + self.assertCountEqual(cursor.fetchall(), [('gin_trgm_ops', index_name)]) + constraints = self.get_constraints(Scene._meta.db_table) + self.assertIn(index_name, constraints) + self.assertIn(constraints[index_name]['type'], GinIndex.suffix) + with connection.schema_editor() as editor: + editor.remove_index(Scene, index) + self.assertNotIn(index_name, self.get_constraints(Scene._meta.db_table)) + + def test_cast_search_vector_gin_index(self): + index_name = 'cast_search_vector_gin' + index = GinIndex(Cast('field', SearchVectorField()), name=index_name) + with connection.schema_editor() as editor: + editor.add_index(TextFieldModel, index) + sql = index.create_sql(TextFieldModel, editor) + table = TextFieldModel._meta.db_table + constraints = self.get_constraints(table) + self.assertIn(index_name, constraints) + self.assertIn(constraints[index_name]['type'], GinIndex.suffix) + self.assertIs(sql.references_column(table, 'field'), True) + self.assertIn('::tsvector', str(sql)) + with connection.schema_editor() as editor: + editor.remove_index(TextFieldModel, index) + self.assertNotIn(index_name, self.get_constraints(table)) + def test_bloom_index(self): index_name = 'char_field_model_field_bloom' index = BloomIndex(fields=['field'], name=index_name) @@ -400,6 +450,28 @@ class SchemaTests(PostgreSQLTestCase): editor.add_index(Scene, index) self.assertNotIn(index_name, self.get_constraints(Scene._meta.db_table)) + def test_tsvector_op_class_gist_index(self): + index_name = 'tsvector_op_class_gist' + index = GistIndex( + OpClass( + SearchVector('scene', 'setting', config='english'), + name='tsvector_ops', + ), + name=index_name, + ) + with connection.schema_editor() as editor: + editor.add_index(Scene, index) + sql = index.create_sql(Scene, editor) + table = Scene._meta.db_table + constraints = self.get_constraints(table) + self.assertIn(index_name, constraints) + self.assertIn(constraints[index_name]['type'], GistIndex.suffix) + self.assertIs(sql.references_column(table, 'scene'), True) + self.assertIs(sql.references_column(table, 'setting'), True) + with connection.schema_editor() as editor: + editor.remove_index(Scene, index) + self.assertNotIn(index_name, self.get_constraints(table)) + def test_hash_index(self): # Ensure the table is there and doesn't have an index. self.assertNotIn('field', self.get_constraints(CharFieldModel._meta.db_table)) @@ -455,3 +527,83 @@ class SchemaTests(PostgreSQLTestCase): with connection.schema_editor() as editor: editor.remove_index(CharFieldModel, index) self.assertNotIn(index_name, self.get_constraints(CharFieldModel._meta.db_table)) + + def test_op_class(self): + index_name = 'test_op_class' + index = Index( + OpClass(Lower('field'), name='text_pattern_ops'), + name=index_name, + ) + with connection.schema_editor() as editor: + editor.add_index(TextFieldModel, index) + with editor.connection.cursor() as cursor: + cursor.execute(self.get_opclass_query, [index_name]) + self.assertCountEqual(cursor.fetchall(), [('text_pattern_ops', index_name)]) + + def test_op_class_descending_collation(self): + collation = connection.features.test_collations.get('non_default') + if not collation: + self.skipTest( + 'This backend does not support case-insensitive collations.' + ) + index_name = 'test_op_class_descending_collation' + index = Index( + Collate( + OpClass(Lower('field'), name='text_pattern_ops').desc(nulls_last=True), + collation=collation, + ), + name=index_name, + ) + with connection.schema_editor() as editor: + editor.add_index(TextFieldModel, index) + self.assertIn( + 'COLLATE %s' % editor.quote_name(collation), + str(index.create_sql(TextFieldModel, editor)), + ) + with editor.connection.cursor() as cursor: + cursor.execute(self.get_opclass_query, [index_name]) + self.assertCountEqual(cursor.fetchall(), [('text_pattern_ops', index_name)]) + table = TextFieldModel._meta.db_table + constraints = self.get_constraints(table) + self.assertIn(index_name, constraints) + self.assertEqual(constraints[index_name]['orders'], ['DESC']) + with connection.schema_editor() as editor: + editor.remove_index(TextFieldModel, index) + self.assertNotIn(index_name, self.get_constraints(table)) + + def test_op_class_descending_partial(self): + index_name = 'test_op_class_descending_partial' + index = Index( + OpClass(Lower('field'), name='text_pattern_ops').desc(), + name=index_name, + condition=Q(field__contains='China'), + ) + with connection.schema_editor() as editor: + editor.add_index(TextFieldModel, index) + with editor.connection.cursor() as cursor: + cursor.execute(self.get_opclass_query, [index_name]) + self.assertCountEqual(cursor.fetchall(), [('text_pattern_ops', index_name)]) + constraints = self.get_constraints(TextFieldModel._meta.db_table) + self.assertIn(index_name, constraints) + self.assertEqual(constraints[index_name]['orders'], ['DESC']) + + def test_op_class_descending_partial_tablespace(self): + index_name = 'test_op_class_descending_partial_tablespace' + index = Index( + OpClass(Lower('field').desc(), name='text_pattern_ops'), + name=index_name, + condition=Q(field__contains='China'), + db_tablespace='pg_default', + ) + with connection.schema_editor() as editor: + editor.add_index(TextFieldModel, index) + self.assertIn( + 'TABLESPACE "pg_default" ', + str(index.create_sql(TextFieldModel, editor)) + ) + with editor.connection.cursor() as cursor: + cursor.execute(self.get_opclass_query, [index_name]) + self.assertCountEqual(cursor.fetchall(), [('text_pattern_ops', index_name)]) + constraints = self.get_constraints(TextFieldModel._meta.db_table) + self.assertIn(index_name, constraints) + self.assertEqual(constraints[index_name]['orders'], ['DESC']) diff --git a/tests/schema/tests.py b/tests/schema/tests.py index 7740065d31..b994c252d9 100644 --- a/tests/schema/tests.py +++ b/tests/schema/tests.py @@ -4,6 +4,7 @@ import unittest from copy import copy from unittest import mock +from django.core.exceptions import FieldError from django.core.management.color import no_style from django.db import ( DatabaseError, DataError, IntegrityError, OperationalError, connection, @@ -11,15 +12,21 @@ from django.db import ( from django.db.models import ( CASCADE, PROTECT, AutoField, BigAutoField, BigIntegerField, BinaryField, BooleanField, CharField, CheckConstraint, DateField, DateTimeField, - ForeignKey, ForeignObject, Index, IntegerField, ManyToManyField, Model, - OneToOneField, PositiveIntegerField, Q, SlugField, SmallAutoField, - SmallIntegerField, TextField, TimeField, UniqueConstraint, UUIDField, + DecimalField, F, FloatField, ForeignKey, ForeignObject, Index, + IntegerField, JSONField, ManyToManyField, Model, OneToOneField, OrderBy, + PositiveIntegerField, Q, SlugField, SmallAutoField, SmallIntegerField, + TextField, TimeField, UniqueConstraint, UUIDField, Value, ) +from django.db.models.fields.json import KeyTextTransform +from django.db.models.functions import Abs, Cast, Collate, Lower, Random, Upper +from django.db.models.indexes import IndexExpression from django.db.transaction import TransactionManagementError, atomic from django.test import ( TransactionTestCase, skipIfDBFeature, skipUnlessDBFeature, ) -from django.test.utils import CaptureQueriesContext, isolate_apps +from django.test.utils import ( + CaptureQueriesContext, isolate_apps, register_lookup, +) from django.utils import timezone from .fields import ( @@ -2481,6 +2488,366 @@ class SchemaTests(TransactionTestCase): assertion = self.assertIn if connection.features.supports_index_on_text_field else self.assertNotIn assertion('text_field', self.get_indexes(AuthorTextFieldWithIndex._meta.db_table)) + def _index_expressions_wrappers(self): + index_expression = IndexExpression() + index_expression.set_wrapper_classes(connection) + return ', '.join([ + wrapper_cls.__qualname__ for wrapper_cls in index_expression.wrapper_classes + ]) + + @skipUnlessDBFeature('supports_expression_indexes') + def test_func_index_multiple_wrapper_references(self): + index = Index(OrderBy(F('name').desc(), descending=True), name='name') + msg = ( + "Multiple references to %s can't be used in an indexed expression." + % self._index_expressions_wrappers() + ) + with connection.schema_editor() as editor: + with self.assertRaisesMessage(ValueError, msg): + editor.add_index(Author, index) + + @skipUnlessDBFeature('supports_expression_indexes') + def test_func_index_invalid_topmost_expressions(self): + index = Index(Upper(F('name').desc()), name='name') + msg = ( + '%s must be topmost expressions in an indexed expression.' + % self._index_expressions_wrappers() + ) + with connection.schema_editor() as editor: + with self.assertRaisesMessage(ValueError, msg): + editor.add_index(Author, index) + + @skipUnlessDBFeature('supports_expression_indexes') + def test_func_index(self): + with connection.schema_editor() as editor: + editor.create_model(Author) + index = Index(Lower('name').desc(), name='func_lower_idx') + # Add index. + with connection.schema_editor() as editor: + editor.add_index(Author, index) + sql = index.create_sql(Author, editor) + table = Author._meta.db_table + if connection.features.supports_index_column_ordering: + self.assertIndexOrder(table, index.name, ['DESC']) + # SQL contains a database function. + self.assertIs(sql.references_column(table, 'name'), True) + self.assertIn('LOWER(%s)' % editor.quote_name('name'), str(sql)) + # Remove index. + with connection.schema_editor() as editor: + editor.remove_index(Author, index) + self.assertNotIn(index.name, self.get_constraints(table)) + + @skipUnlessDBFeature('supports_expression_indexes') + def test_func_index_f(self): + with connection.schema_editor() as editor: + editor.create_model(Tag) + index = Index('slug', F('title').desc(), name='func_f_idx') + # Add index. + with connection.schema_editor() as editor: + editor.add_index(Tag, index) + sql = index.create_sql(Tag, editor) + table = Tag._meta.db_table + self.assertIn(index.name, self.get_constraints(table)) + if connection.features.supports_index_column_ordering: + self.assertIndexOrder(Tag._meta.db_table, index.name, ['ASC', 'DESC']) + # SQL contains columns. + self.assertIs(sql.references_column(table, 'slug'), True) + self.assertIs(sql.references_column(table, 'title'), True) + # Remove index. + with connection.schema_editor() as editor: + editor.remove_index(Tag, index) + self.assertNotIn(index.name, self.get_constraints(table)) + + @skipUnlessDBFeature('supports_expression_indexes') + def test_func_index_lookups(self): + with connection.schema_editor() as editor: + editor.create_model(Author) + with register_lookup(CharField, Lower), register_lookup(IntegerField, Abs): + index = Index( + F('name__lower'), + F('weight__abs'), + name='func_lower_abs_lookup_idx', + ) + # Add index. + with connection.schema_editor() as editor: + editor.add_index(Author, index) + sql = index.create_sql(Author, editor) + table = Author._meta.db_table + self.assertIn(index.name, self.get_constraints(table)) + # SQL contains columns. + self.assertIs(sql.references_column(table, 'name'), True) + self.assertIs(sql.references_column(table, 'weight'), True) + # Remove index. + with connection.schema_editor() as editor: + editor.remove_index(Author, index) + self.assertNotIn(index.name, self.get_constraints(table)) + + @skipUnlessDBFeature('supports_expression_indexes') + def test_composite_func_index(self): + with connection.schema_editor() as editor: + editor.create_model(Author) + index = Index(Lower('name'), Upper('name'), name='func_lower_upper_idx') + # Add index. + with connection.schema_editor() as editor: + editor.add_index(Author, index) + sql = index.create_sql(Author, editor) + table = Author._meta.db_table + self.assertIn(index.name, self.get_constraints(table)) + # SQL contains database functions. + self.assertIs(sql.references_column(table, 'name'), True) + sql = str(sql) + self.assertIn('LOWER(%s)' % editor.quote_name('name'), sql) + self.assertIn('UPPER(%s)' % editor.quote_name('name'), sql) + self.assertLess(sql.index('LOWER'), sql.index('UPPER')) + # Remove index. + with connection.schema_editor() as editor: + editor.remove_index(Author, index) + self.assertNotIn(index.name, self.get_constraints(table)) + + @skipUnlessDBFeature('supports_expression_indexes') + def test_composite_func_index_field_and_expression(self): + with connection.schema_editor() as editor: + editor.create_model(Author) + editor.create_model(Book) + index = Index( + F('author').desc(), + Lower('title').asc(), + 'pub_date', + name='func_f_lower_field_idx', + ) + # Add index. + with connection.schema_editor() as editor: + editor.add_index(Book, index) + sql = index.create_sql(Book, editor) + table = Book._meta.db_table + constraints = self.get_constraints(table) + if connection.features.supports_index_column_ordering: + self.assertIndexOrder(table, index.name, ['DESC', 'ASC', 'ASC']) + self.assertEqual(len(constraints[index.name]['columns']), 3) + self.assertEqual(constraints[index.name]['columns'][2], 'pub_date') + # SQL contains database functions and columns. + self.assertIs(sql.references_column(table, 'author_id'), True) + self.assertIs(sql.references_column(table, 'title'), True) + self.assertIs(sql.references_column(table, 'pub_date'), True) + self.assertIn('LOWER(%s)' % editor.quote_name('title'), str(sql)) + # Remove index. + with connection.schema_editor() as editor: + editor.remove_index(Book, index) + self.assertNotIn(index.name, self.get_constraints(table)) + + @skipUnlessDBFeature('supports_expression_indexes') + @isolate_apps('schema') + def test_func_index_f_decimalfield(self): + class Node(Model): + value = DecimalField(max_digits=5, decimal_places=2) + + class Meta: + app_label = 'schema' + + with connection.schema_editor() as editor: + editor.create_model(Node) + index = Index(F('value'), name='func_f_decimalfield_idx') + # Add index. + with connection.schema_editor() as editor: + editor.add_index(Node, index) + sql = index.create_sql(Node, editor) + table = Node._meta.db_table + self.assertIn(index.name, self.get_constraints(table)) + self.assertIs(sql.references_column(table, 'value'), True) + # SQL doesn't contain casting. + self.assertNotIn('CAST', str(sql)) + # Remove index. + with connection.schema_editor() as editor: + editor.remove_index(Node, index) + self.assertNotIn(index.name, self.get_constraints(table)) + + @skipUnlessDBFeature('supports_expression_indexes') + def test_func_index_cast(self): + with connection.schema_editor() as editor: + editor.create_model(Author) + index = Index(Cast('weight', FloatField()), name='func_cast_idx') + # Add index. + with connection.schema_editor() as editor: + editor.add_index(Author, index) + sql = index.create_sql(Author, editor) + table = Author._meta.db_table + self.assertIn(index.name, self.get_constraints(table)) + self.assertIs(sql.references_column(table, 'weight'), True) + # Remove index. + with connection.schema_editor() as editor: + editor.remove_index(Author, index) + self.assertNotIn(index.name, self.get_constraints(table)) + + @skipUnlessDBFeature('supports_expression_indexes') + def test_func_index_collate(self): + collation = connection.features.test_collations.get('non_default') + if not collation: + self.skipTest( + 'This backend does not support case-insensitive collations.' + ) + with connection.schema_editor() as editor: + editor.create_model(Author) + editor.create_model(BookWithSlug) + index = Index( + Collate(F('title'), collation=collation).desc(), + Collate('slug', collation=collation), + name='func_collate_idx', + ) + # Add index. + with connection.schema_editor() as editor: + editor.add_index(BookWithSlug, index) + sql = index.create_sql(BookWithSlug, editor) + table = Book._meta.db_table + self.assertIn(index.name, self.get_constraints(table)) + if connection.features.supports_index_column_ordering: + self.assertIndexOrder(table, index.name, ['DESC', 'ASC']) + # SQL contains columns and a collation. + self.assertIs(sql.references_column(table, 'title'), True) + self.assertIs(sql.references_column(table, 'slug'), True) + self.assertIn('COLLATE %s' % editor.quote_name(collation), str(sql)) + # Remove index. + with connection.schema_editor() as editor: + editor.remove_index(Book, index) + self.assertNotIn(index.name, self.get_constraints(table)) + + @skipUnlessDBFeature('supports_expression_indexes') + @skipIfDBFeature('collate_as_index_expression') + def test_func_index_collate_f_ordered(self): + collation = connection.features.test_collations.get('non_default') + if not collation: + self.skipTest( + 'This backend does not support case-insensitive collations.' + ) + with connection.schema_editor() as editor: + editor.create_model(Author) + index = Index( + Collate(F('name').desc(), collation=collation), + name='func_collate_f_desc_idx', + ) + # Add index. + with connection.schema_editor() as editor: + editor.add_index(Author, index) + sql = index.create_sql(Author, editor) + table = Author._meta.db_table + self.assertIn(index.name, self.get_constraints(table)) + if connection.features.supports_index_column_ordering: + self.assertIndexOrder(table, index.name, ['DESC']) + # SQL contains columns and a collation. + self.assertIs(sql.references_column(table, 'name'), True) + self.assertIn('COLLATE %s' % editor.quote_name(collation), str(sql)) + # Remove index. + with connection.schema_editor() as editor: + editor.remove_index(Author, index) + self.assertNotIn(index.name, self.get_constraints(table)) + + @skipUnlessDBFeature('supports_expression_indexes') + def test_func_index_calc(self): + with connection.schema_editor() as editor: + editor.create_model(Author) + index = Index(F('height') / (F('weight') + Value(5)), name='func_calc_idx') + # Add index. + with connection.schema_editor() as editor: + editor.add_index(Author, index) + sql = index.create_sql(Author, editor) + table = Author._meta.db_table + self.assertIn(index.name, self.get_constraints(table)) + # SQL contains columns and expressions. + self.assertIs(sql.references_column(table, 'height'), True) + self.assertIs(sql.references_column(table, 'weight'), True) + sql = str(sql) + self.assertIs( + sql.index(editor.quote_name('height')) < + sql.index('/') < + sql.index(editor.quote_name('weight')) < + sql.index('+') < + sql.index('5'), + True, + ) + # Remove index. + with connection.schema_editor() as editor: + editor.remove_index(Author, index) + self.assertNotIn(index.name, self.get_constraints(table)) + + @skipUnlessDBFeature('supports_expression_indexes', 'supports_json_field') + @isolate_apps('schema') + def test_func_index_json_key_transform(self): + class JSONModel(Model): + field = JSONField() + + class Meta: + app_label = 'schema' + + with connection.schema_editor() as editor: + editor.create_model(JSONModel) + self.isolated_local_models = [JSONModel] + index = Index('field__some_key', name='func_json_key_idx') + with connection.schema_editor() as editor: + editor.add_index(JSONModel, index) + sql = index.create_sql(JSONModel, editor) + table = JSONModel._meta.db_table + self.assertIn(index.name, self.get_constraints(table)) + self.assertIs(sql.references_column(table, 'field'), True) + with connection.schema_editor() as editor: + editor.remove_index(JSONModel, index) + self.assertNotIn(index.name, self.get_constraints(table)) + + @skipUnlessDBFeature('supports_expression_indexes', 'supports_json_field') + @isolate_apps('schema') + def test_func_index_json_key_transform_cast(self): + class JSONModel(Model): + field = JSONField() + + class Meta: + app_label = 'schema' + + with connection.schema_editor() as editor: + editor.create_model(JSONModel) + self.isolated_local_models = [JSONModel] + index = Index( + Cast(KeyTextTransform('some_key', 'field'), IntegerField()), + name='func_json_key_cast_idx', + ) + with connection.schema_editor() as editor: + editor.add_index(JSONModel, index) + sql = index.create_sql(JSONModel, editor) + table = JSONModel._meta.db_table + self.assertIn(index.name, self.get_constraints(table)) + self.assertIs(sql.references_column(table, 'field'), True) + with connection.schema_editor() as editor: + editor.remove_index(JSONModel, index) + self.assertNotIn(index.name, self.get_constraints(table)) + + @skipIfDBFeature('supports_expression_indexes') + def test_func_index_unsupported(self): + # Index is ignored on databases that don't support indexes on + # expressions. + with connection.schema_editor() as editor: + editor.create_model(Author) + index = Index(F('name'), name='random_idx') + with connection.schema_editor() as editor, self.assertNumQueries(0): + self.assertIsNone(editor.add_index(Author, index)) + self.assertIsNone(editor.remove_index(Author, index)) + + @skipUnlessDBFeature('supports_expression_indexes') + def test_func_index_nonexistent_field(self): + index = Index(Lower('nonexistent'), name='func_nonexistent_idx') + msg = ( + "Cannot resolve keyword 'nonexistent' into field. Choices are: " + "height, id, name, uuid, weight" + ) + with self.assertRaisesMessage(FieldError, msg): + with connection.schema_editor() as editor: + editor.add_index(Author, index) + + @skipUnlessDBFeature('supports_expression_indexes') + def test_func_index_nondeterministic(self): + with connection.schema_editor() as editor: + editor.create_model(Author) + index = Index(Random(), name='func_random_idx') + with connection.schema_editor() as editor: + with self.assertRaises(DatabaseError): + editor.add_index(Author, index) + def test_primary_key(self): """ Tests altering of the primary key |
