diff options
| author | Paveł Tyślacki <pavel.tyslacki@gmail.com> | 2019-02-28 00:47:29 +0300 |
|---|---|---|
| committer | Tim Graham <timograham@gmail.com> | 2019-03-15 20:01:36 -0400 |
| commit | 40b0a58f5ff949fba1072627e4ad11ef98aa7f36 (patch) | |
| tree | a9b91d02f68462798f156bab191c95ed63406896 /tests | |
| parent | d8252025bc85b23e8f9d774f46dc2773750deebf (diff) | |
[2.2.x] Fixed #30183 -- Added introspection of inline SQLite constraints.
Backport of 782d85b6dfa191e67c0f1d572641d8236c79174c from master.
Diffstat (limited to 'tests')
| -rw-r--r-- | tests/backends/sqlite/test_introspection.py | 115 | ||||
| -rw-r--r-- | tests/introspection/models.py | 18 | ||||
| -rw-r--r-- | tests/introspection/tests.py | 59 | ||||
| -rw-r--r-- | tests/schema/tests.py | 12 |
4 files changed, 201 insertions, 3 deletions
diff --git a/tests/backends/sqlite/test_introspection.py b/tests/backends/sqlite/test_introspection.py index 1695ee549e..e378e0ee56 100644 --- a/tests/backends/sqlite/test_introspection.py +++ b/tests/backends/sqlite/test_introspection.py @@ -1,5 +1,7 @@ import unittest +import sqlparse + from django.db import connection from django.test import TestCase @@ -25,3 +27,116 @@ class IntrospectionTests(TestCase): self.assertEqual(field, expected_string) finally: cursor.execute('DROP TABLE test_primary') + + +@unittest.skipUnless(connection.vendor == 'sqlite', 'SQLite tests') +class ParsingTests(TestCase): + def parse_definition(self, sql, columns): + """Parse a column or constraint definition.""" + statement = sqlparse.parse(sql)[0] + tokens = (token for token in statement.flatten() if not token.is_whitespace) + with connection.cursor(): + return connection.introspection._parse_column_or_constraint_definition(tokens, set(columns)) + + def assertConstraint(self, constraint_details, cols, unique=False, check=False): + self.assertEqual(constraint_details, { + 'unique': unique, + 'columns': cols, + 'primary_key': False, + 'foreign_key': None, + 'check': check, + 'index': False, + }) + + def test_unique_column(self): + tests = ( + ('"ref" integer UNIQUE,', ['ref']), + ('ref integer UNIQUE,', ['ref']), + ('"customname" integer UNIQUE,', ['customname']), + ('customname integer UNIQUE,', ['customname']), + ) + for sql, columns in tests: + with self.subTest(sql=sql): + constraint, details, check, _ = self.parse_definition(sql, columns) + self.assertIsNone(constraint) + self.assertConstraint(details, columns, unique=True) + self.assertIsNone(check) + + def test_unique_constraint(self): + tests = ( + ('CONSTRAINT "ref" UNIQUE ("ref"),', 'ref', ['ref']), + ('CONSTRAINT ref UNIQUE (ref),', 'ref', ['ref']), + ('CONSTRAINT "customname1" UNIQUE ("customname2"),', 'customname1', ['customname2']), + ('CONSTRAINT customname1 UNIQUE (customname2),', 'customname1', ['customname2']), + ) + for sql, constraint_name, columns in tests: + with self.subTest(sql=sql): + constraint, details, check, _ = self.parse_definition(sql, columns) + self.assertEqual(constraint, constraint_name) + self.assertConstraint(details, columns, unique=True) + self.assertIsNone(check) + + def test_unique_constraint_multicolumn(self): + tests = ( + ('CONSTRAINT "ref" UNIQUE ("ref", "customname"),', 'ref', ['ref', 'customname']), + ('CONSTRAINT ref UNIQUE (ref, customname),', 'ref', ['ref', 'customname']), + ) + for sql, constraint_name, columns in tests: + with self.subTest(sql=sql): + constraint, details, check, _ = self.parse_definition(sql, columns) + self.assertEqual(constraint, constraint_name) + self.assertConstraint(details, columns, unique=True) + self.assertIsNone(check) + + def test_check_column(self): + tests = ( + ('"ref" varchar(255) CHECK ("ref" != \'test\'),', ['ref']), + ('ref varchar(255) CHECK (ref != \'test\'),', ['ref']), + ('"customname1" varchar(255) CHECK ("customname2" != \'test\'),', ['customname2']), + ('customname1 varchar(255) CHECK (customname2 != \'test\'),', ['customname2']), + ) + for sql, columns in tests: + with self.subTest(sql=sql): + constraint, details, check, _ = self.parse_definition(sql, columns) + self.assertIsNone(constraint) + self.assertIsNone(details) + self.assertConstraint(check, columns, check=True) + + def test_check_constraint(self): + tests = ( + ('CONSTRAINT "ref" CHECK ("ref" != \'test\'),', 'ref', ['ref']), + ('CONSTRAINT ref CHECK (ref != \'test\'),', 'ref', ['ref']), + ('CONSTRAINT "customname1" CHECK ("customname2" != \'test\'),', 'customname1', ['customname2']), + ('CONSTRAINT customname1 CHECK (customname2 != \'test\'),', 'customname1', ['customname2']), + ) + for sql, constraint_name, columns in tests: + with self.subTest(sql=sql): + constraint, details, check, _ = self.parse_definition(sql, columns) + self.assertEqual(constraint, constraint_name) + self.assertIsNone(details) + self.assertConstraint(check, columns, check=True) + + def test_check_column_with_operators_and_functions(self): + tests = ( + ('"ref" integer CHECK ("ref" BETWEEN 1 AND 10),', ['ref']), + ('"ref" varchar(255) CHECK ("ref" LIKE \'test%\'),', ['ref']), + ('"ref" varchar(255) CHECK (LENGTH(ref) > "max_length"),', ['ref', 'max_length']), + ) + for sql, columns in tests: + with self.subTest(sql=sql): + constraint, details, check, _ = self.parse_definition(sql, columns) + self.assertIsNone(constraint) + self.assertIsNone(details) + self.assertConstraint(check, columns, check=True) + + def test_check_and_unique_column(self): + tests = ( + ('"ref" varchar(255) CHECK ("ref" != \'test\') UNIQUE,', ['ref']), + ('ref varchar(255) UNIQUE CHECK (ref != \'test\'),', ['ref']), + ) + for sql, columns in tests: + with self.subTest(sql=sql): + constraint, details, check, _ = self.parse_definition(sql, columns) + self.assertIsNone(constraint) + self.assertConstraint(details, columns, unique=True) + self.assertConstraint(check, columns, check=True) diff --git a/tests/introspection/models.py b/tests/introspection/models.py index 32acc323bd..fa663de2fd 100644 --- a/tests/introspection/models.py +++ b/tests/introspection/models.py @@ -58,3 +58,21 @@ class ArticleReporter(models.Model): class Meta: managed = False + + +class Comment(models.Model): + ref = models.UUIDField(unique=True) + article = models.ForeignKey(Article, models.CASCADE, db_index=True) + email = models.EmailField() + pub_date = models.DateTimeField() + up_votes = models.PositiveIntegerField() + body = models.TextField() + + class Meta: + constraints = [ + models.CheckConstraint(name='up_votes_gte_0_check', check=models.Q(up_votes__gte=0)), + models.UniqueConstraint(fields=['article', 'email', 'pub_date'], name='article_email_pub_date_uniq'), + ] + indexes = [ + models.Index(fields=['email', 'pub_date'], name='email_pub_date_idx'), + ] diff --git a/tests/introspection/tests.py b/tests/introspection/tests.py index 4eb868e907..b72d0f5165 100644 --- a/tests/introspection/tests.py +++ b/tests/introspection/tests.py @@ -5,7 +5,7 @@ from django.db.models import Index from django.db.utils import DatabaseError from django.test import TransactionTestCase, skipUnlessDBFeature -from .models import Article, ArticleReporter, City, District, Reporter +from .models import Article, ArticleReporter, City, Comment, District, Reporter class IntrospectionTests(TransactionTestCase): @@ -209,6 +209,63 @@ class IntrospectionTests(TransactionTestCase): indexes_verified += 1 self.assertEqual(indexes_verified, 4) + def test_get_constraints(self): + def assertDetails(details, cols, primary_key=False, unique=False, index=False, check=False, foreign_key=None): + # Different backends have different values for same constraints: + # PRIMARY KEY UNIQUE CONSTRAINT UNIQUE INDEX + # MySQL pk=1 uniq=1 idx=1 pk=0 uniq=1 idx=1 pk=0 uniq=1 idx=1 + # PostgreSQL pk=1 uniq=1 idx=0 pk=0 uniq=1 idx=0 pk=0 uniq=1 idx=1 + # SQLite pk=1 uniq=0 idx=0 pk=0 uniq=1 idx=0 pk=0 uniq=1 idx=1 + if details['primary_key']: + details['unique'] = True + if details['unique']: + details['index'] = False + self.assertEqual(details['columns'], cols) + self.assertEqual(details['primary_key'], primary_key) + self.assertEqual(details['unique'], unique) + self.assertEqual(details['index'], index) + self.assertEqual(details['check'], check) + self.assertEqual(details['foreign_key'], foreign_key) + + with connection.cursor() as cursor: + constraints = connection.introspection.get_constraints(cursor, Comment._meta.db_table) + # Test custom constraints + custom_constraints = { + 'article_email_pub_date_uniq', + 'email_pub_date_idx', + } + if connection.features.supports_column_check_constraints: + custom_constraints.add('up_votes_gte_0_check') + assertDetails(constraints['up_votes_gte_0_check'], ['up_votes'], check=True) + assertDetails(constraints['article_email_pub_date_uniq'], ['article_id', 'email', 'pub_date'], unique=True) + assertDetails(constraints['email_pub_date_idx'], ['email', 'pub_date'], index=True) + # Test field constraints + field_constraints = set() + for name, details in constraints.items(): + if name in custom_constraints: + continue + elif details['columns'] == ['up_votes'] and details['check']: + assertDetails(details, ['up_votes'], check=True) + field_constraints.add(name) + elif details['columns'] == ['ref'] and details['unique']: + assertDetails(details, ['ref'], unique=True) + field_constraints.add(name) + elif details['columns'] == ['article_id'] and details['index']: + assertDetails(details, ['article_id'], index=True) + field_constraints.add(name) + elif details['columns'] == ['id'] and details['primary_key']: + assertDetails(details, ['id'], primary_key=True, unique=True) + field_constraints.add(name) + elif details['columns'] == ['article_id'] and details['foreign_key']: + assertDetails(details, ['article_id'], foreign_key=('introspection_article', 'id')) + field_constraints.add(name) + elif details['check']: + # Some databases (e.g. Oracle) include additional check + # constraints. + field_constraints.add(name) + # All constraints are accounted for. + self.assertEqual(constraints.keys() ^ (custom_constraints | field_constraints), set()) + def datatype(dbtype, description): """Helper to convert a data type into a string.""" diff --git a/tests/schema/tests.py b/tests/schema/tests.py index 410a52b646..37f39ff7ed 100644 --- a/tests/schema/tests.py +++ b/tests/schema/tests.py @@ -129,6 +129,14 @@ class SchemaTests(TransactionTestCase): if c['index'] and len(c['columns']) == 1 ] + def get_uniques(self, table): + with connection.cursor() as cursor: + return [ + c['columns'][0] + for c in connection.introspection.get_constraints(cursor, table).values() + if c['unique'] and len(c['columns']) == 1 + ] + def get_constraints(self, table): """ Get the constraints on a table using a new cursor. @@ -1950,7 +1958,7 @@ class SchemaTests(TransactionTestCase): editor.add_field(Book, new_field3) self.assertIn( "slug", - self.get_indexes(Book._meta.db_table), + self.get_uniques(Book._meta.db_table), ) # Remove the unique, check the index goes with it new_field4 = CharField(max_length=20, unique=False) @@ -1959,7 +1967,7 @@ class SchemaTests(TransactionTestCase): editor.alter_field(BookWithSlug, new_field3, new_field4, strict=True) self.assertNotIn( "slug", - self.get_indexes(Book._meta.db_table), + self.get_uniques(Book._meta.db_table), ) def test_text_field_with_db_index(self): |
