summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorPaveł Tyślacki <pavel.tyslacki@gmail.com>2019-02-28 00:47:29 +0300
committerTim Graham <timograham@gmail.com>2019-03-15 20:01:36 -0400
commit40b0a58f5ff949fba1072627e4ad11ef98aa7f36 (patch)
treea9b91d02f68462798f156bab191c95ed63406896 /tests
parentd8252025bc85b23e8f9d774f46dc2773750deebf (diff)
[2.2.x] Fixed #30183 -- Added introspection of inline SQLite constraints.
Backport of 782d85b6dfa191e67c0f1d572641d8236c79174c from master.
Diffstat (limited to 'tests')
-rw-r--r--tests/backends/sqlite/test_introspection.py115
-rw-r--r--tests/introspection/models.py18
-rw-r--r--tests/introspection/tests.py59
-rw-r--r--tests/schema/tests.py12
4 files changed, 201 insertions, 3 deletions
diff --git a/tests/backends/sqlite/test_introspection.py b/tests/backends/sqlite/test_introspection.py
index 1695ee549e..e378e0ee56 100644
--- a/tests/backends/sqlite/test_introspection.py
+++ b/tests/backends/sqlite/test_introspection.py
@@ -1,5 +1,7 @@
import unittest
+import sqlparse
+
from django.db import connection
from django.test import TestCase
@@ -25,3 +27,116 @@ class IntrospectionTests(TestCase):
self.assertEqual(field, expected_string)
finally:
cursor.execute('DROP TABLE test_primary')
+
+
+@unittest.skipUnless(connection.vendor == 'sqlite', 'SQLite tests')
+class ParsingTests(TestCase):
+ def parse_definition(self, sql, columns):
+ """Parse a column or constraint definition."""
+ statement = sqlparse.parse(sql)[0]
+ tokens = (token for token in statement.flatten() if not token.is_whitespace)
+ with connection.cursor():
+ return connection.introspection._parse_column_or_constraint_definition(tokens, set(columns))
+
+ def assertConstraint(self, constraint_details, cols, unique=False, check=False):
+ self.assertEqual(constraint_details, {
+ 'unique': unique,
+ 'columns': cols,
+ 'primary_key': False,
+ 'foreign_key': None,
+ 'check': check,
+ 'index': False,
+ })
+
+ def test_unique_column(self):
+ tests = (
+ ('"ref" integer UNIQUE,', ['ref']),
+ ('ref integer UNIQUE,', ['ref']),
+ ('"customname" integer UNIQUE,', ['customname']),
+ ('customname integer UNIQUE,', ['customname']),
+ )
+ for sql, columns in tests:
+ with self.subTest(sql=sql):
+ constraint, details, check, _ = self.parse_definition(sql, columns)
+ self.assertIsNone(constraint)
+ self.assertConstraint(details, columns, unique=True)
+ self.assertIsNone(check)
+
+ def test_unique_constraint(self):
+ tests = (
+ ('CONSTRAINT "ref" UNIQUE ("ref"),', 'ref', ['ref']),
+ ('CONSTRAINT ref UNIQUE (ref),', 'ref', ['ref']),
+ ('CONSTRAINT "customname1" UNIQUE ("customname2"),', 'customname1', ['customname2']),
+ ('CONSTRAINT customname1 UNIQUE (customname2),', 'customname1', ['customname2']),
+ )
+ for sql, constraint_name, columns in tests:
+ with self.subTest(sql=sql):
+ constraint, details, check, _ = self.parse_definition(sql, columns)
+ self.assertEqual(constraint, constraint_name)
+ self.assertConstraint(details, columns, unique=True)
+ self.assertIsNone(check)
+
+ def test_unique_constraint_multicolumn(self):
+ tests = (
+ ('CONSTRAINT "ref" UNIQUE ("ref", "customname"),', 'ref', ['ref', 'customname']),
+ ('CONSTRAINT ref UNIQUE (ref, customname),', 'ref', ['ref', 'customname']),
+ )
+ for sql, constraint_name, columns in tests:
+ with self.subTest(sql=sql):
+ constraint, details, check, _ = self.parse_definition(sql, columns)
+ self.assertEqual(constraint, constraint_name)
+ self.assertConstraint(details, columns, unique=True)
+ self.assertIsNone(check)
+
+ def test_check_column(self):
+ tests = (
+ ('"ref" varchar(255) CHECK ("ref" != \'test\'),', ['ref']),
+ ('ref varchar(255) CHECK (ref != \'test\'),', ['ref']),
+ ('"customname1" varchar(255) CHECK ("customname2" != \'test\'),', ['customname2']),
+ ('customname1 varchar(255) CHECK (customname2 != \'test\'),', ['customname2']),
+ )
+ for sql, columns in tests:
+ with self.subTest(sql=sql):
+ constraint, details, check, _ = self.parse_definition(sql, columns)
+ self.assertIsNone(constraint)
+ self.assertIsNone(details)
+ self.assertConstraint(check, columns, check=True)
+
+ def test_check_constraint(self):
+ tests = (
+ ('CONSTRAINT "ref" CHECK ("ref" != \'test\'),', 'ref', ['ref']),
+ ('CONSTRAINT ref CHECK (ref != \'test\'),', 'ref', ['ref']),
+ ('CONSTRAINT "customname1" CHECK ("customname2" != \'test\'),', 'customname1', ['customname2']),
+ ('CONSTRAINT customname1 CHECK (customname2 != \'test\'),', 'customname1', ['customname2']),
+ )
+ for sql, constraint_name, columns in tests:
+ with self.subTest(sql=sql):
+ constraint, details, check, _ = self.parse_definition(sql, columns)
+ self.assertEqual(constraint, constraint_name)
+ self.assertIsNone(details)
+ self.assertConstraint(check, columns, check=True)
+
+ def test_check_column_with_operators_and_functions(self):
+ tests = (
+ ('"ref" integer CHECK ("ref" BETWEEN 1 AND 10),', ['ref']),
+ ('"ref" varchar(255) CHECK ("ref" LIKE \'test%\'),', ['ref']),
+ ('"ref" varchar(255) CHECK (LENGTH(ref) > "max_length"),', ['ref', 'max_length']),
+ )
+ for sql, columns in tests:
+ with self.subTest(sql=sql):
+ constraint, details, check, _ = self.parse_definition(sql, columns)
+ self.assertIsNone(constraint)
+ self.assertIsNone(details)
+ self.assertConstraint(check, columns, check=True)
+
+ def test_check_and_unique_column(self):
+ tests = (
+ ('"ref" varchar(255) CHECK ("ref" != \'test\') UNIQUE,', ['ref']),
+ ('ref varchar(255) UNIQUE CHECK (ref != \'test\'),', ['ref']),
+ )
+ for sql, columns in tests:
+ with self.subTest(sql=sql):
+ constraint, details, check, _ = self.parse_definition(sql, columns)
+ self.assertIsNone(constraint)
+ self.assertConstraint(details, columns, unique=True)
+ self.assertConstraint(check, columns, check=True)
diff --git a/tests/introspection/models.py b/tests/introspection/models.py
index 32acc323bd..fa663de2fd 100644
--- a/tests/introspection/models.py
+++ b/tests/introspection/models.py
@@ -58,3 +58,21 @@ class ArticleReporter(models.Model):
class Meta:
managed = False
+
+
+class Comment(models.Model):
+ ref = models.UUIDField(unique=True)
+ article = models.ForeignKey(Article, models.CASCADE, db_index=True)
+ email = models.EmailField()
+ pub_date = models.DateTimeField()
+ up_votes = models.PositiveIntegerField()
+ body = models.TextField()
+
+ class Meta:
+ constraints = [
+ models.CheckConstraint(name='up_votes_gte_0_check', check=models.Q(up_votes__gte=0)),
+ models.UniqueConstraint(fields=['article', 'email', 'pub_date'], name='article_email_pub_date_uniq'),
+ ]
+ indexes = [
+ models.Index(fields=['email', 'pub_date'], name='email_pub_date_idx'),
+ ]
diff --git a/tests/introspection/tests.py b/tests/introspection/tests.py
index 4eb868e907..b72d0f5165 100644
--- a/tests/introspection/tests.py
+++ b/tests/introspection/tests.py
@@ -5,7 +5,7 @@ from django.db.models import Index
from django.db.utils import DatabaseError
from django.test import TransactionTestCase, skipUnlessDBFeature
-from .models import Article, ArticleReporter, City, District, Reporter
+from .models import Article, ArticleReporter, City, Comment, District, Reporter
class IntrospectionTests(TransactionTestCase):
@@ -209,6 +209,63 @@ class IntrospectionTests(TransactionTestCase):
indexes_verified += 1
self.assertEqual(indexes_verified, 4)
+ def test_get_constraints(self):
+ def assertDetails(details, cols, primary_key=False, unique=False, index=False, check=False, foreign_key=None):
+ # Different backends have different values for same constraints:
+ # PRIMARY KEY UNIQUE CONSTRAINT UNIQUE INDEX
+ # MySQL pk=1 uniq=1 idx=1 pk=0 uniq=1 idx=1 pk=0 uniq=1 idx=1
+ # PostgreSQL pk=1 uniq=1 idx=0 pk=0 uniq=1 idx=0 pk=0 uniq=1 idx=1
+ # SQLite pk=1 uniq=0 idx=0 pk=0 uniq=1 idx=0 pk=0 uniq=1 idx=1
+ if details['primary_key']:
+ details['unique'] = True
+ if details['unique']:
+ details['index'] = False
+ self.assertEqual(details['columns'], cols)
+ self.assertEqual(details['primary_key'], primary_key)
+ self.assertEqual(details['unique'], unique)
+ self.assertEqual(details['index'], index)
+ self.assertEqual(details['check'], check)
+ self.assertEqual(details['foreign_key'], foreign_key)
+
+ with connection.cursor() as cursor:
+ constraints = connection.introspection.get_constraints(cursor, Comment._meta.db_table)
+ # Test custom constraints
+ custom_constraints = {
+ 'article_email_pub_date_uniq',
+ 'email_pub_date_idx',
+ }
+ if connection.features.supports_column_check_constraints:
+ custom_constraints.add('up_votes_gte_0_check')
+ assertDetails(constraints['up_votes_gte_0_check'], ['up_votes'], check=True)
+ assertDetails(constraints['article_email_pub_date_uniq'], ['article_id', 'email', 'pub_date'], unique=True)
+ assertDetails(constraints['email_pub_date_idx'], ['email', 'pub_date'], index=True)
+ # Test field constraints
+ field_constraints = set()
+ for name, details in constraints.items():
+ if name in custom_constraints:
+ continue
+ elif details['columns'] == ['up_votes'] and details['check']:
+ assertDetails(details, ['up_votes'], check=True)
+ field_constraints.add(name)
+ elif details['columns'] == ['ref'] and details['unique']:
+ assertDetails(details, ['ref'], unique=True)
+ field_constraints.add(name)
+ elif details['columns'] == ['article_id'] and details['index']:
+ assertDetails(details, ['article_id'], index=True)
+ field_constraints.add(name)
+ elif details['columns'] == ['id'] and details['primary_key']:
+ assertDetails(details, ['id'], primary_key=True, unique=True)
+ field_constraints.add(name)
+ elif details['columns'] == ['article_id'] and details['foreign_key']:
+ assertDetails(details, ['article_id'], foreign_key=('introspection_article', 'id'))
+ field_constraints.add(name)
+ elif details['check']:
+ # Some databases (e.g. Oracle) include additional check
+ # constraints.
+ field_constraints.add(name)
+ # All constraints are accounted for.
+ self.assertEqual(constraints.keys() ^ (custom_constraints | field_constraints), set())
+
def datatype(dbtype, description):
"""Helper to convert a data type into a string."""
diff --git a/tests/schema/tests.py b/tests/schema/tests.py
index 410a52b646..37f39ff7ed 100644
--- a/tests/schema/tests.py
+++ b/tests/schema/tests.py
@@ -129,6 +129,14 @@ class SchemaTests(TransactionTestCase):
if c['index'] and len(c['columns']) == 1
]
+ def get_uniques(self, table):
+ with connection.cursor() as cursor:
+ return [
+ c['columns'][0]
+ for c in connection.introspection.get_constraints(cursor, table).values()
+ if c['unique'] and len(c['columns']) == 1
+ ]
+
def get_constraints(self, table):
"""
Get the constraints on a table using a new cursor.
@@ -1950,7 +1958,7 @@ class SchemaTests(TransactionTestCase):
editor.add_field(Book, new_field3)
self.assertIn(
"slug",
- self.get_indexes(Book._meta.db_table),
+ self.get_uniques(Book._meta.db_table),
)
# Remove the unique, check the index goes with it
new_field4 = CharField(max_length=20, unique=False)
@@ -1959,7 +1967,7 @@ class SchemaTests(TransactionTestCase):
editor.alter_field(BookWithSlug, new_field3, new_field4, strict=True)
self.assertNotIn(
"slug",
- self.get_indexes(Book._meta.db_table),
+ self.get_uniques(Book._meta.db_table),
)
def test_text_field_with_db_index(self):