summaryrefslogtreecommitdiff
path: root/tests/backends/postgresql/tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/backends/postgresql/tests.py')
-rw-r--r--tests/backends/postgresql/tests.py22
1 files changed, 19 insertions, 3 deletions
diff --git a/tests/backends/postgresql/tests.py b/tests/backends/postgresql/tests.py
index d375bd2b8c..41d445e6c7 100644
--- a/tests/backends/postgresql/tests.py
+++ b/tests/backends/postgresql/tests.py
@@ -223,7 +223,7 @@ class Tests(TestCase):
The transaction level can be configured with
DATABASES ['OPTIONS']['isolation_level'].
"""
- from psycopg2.extensions import ISOLATION_LEVEL_SERIALIZABLE as serializable
+ from django.db.backends.postgresql.psycopg_any import IsolationLevel
# Since this is a django.test.TestCase, a transaction is in progress
# and the isolation level isn't reported as 0. This test assumes that
@@ -232,15 +232,31 @@ class Tests(TestCase):
self.assertIsNone(connection.connection.isolation_level)
new_connection = connection.copy()
- new_connection.settings_dict["OPTIONS"]["isolation_level"] = serializable
+ new_connection.settings_dict["OPTIONS"][
+ "isolation_level"
+ ] = IsolationLevel.SERIALIZABLE
try:
# Start a transaction so the isolation level isn't reported as 0.
new_connection.set_autocommit(False)
# Check the level on the psycopg2 connection, not the Django wrapper.
- self.assertEqual(new_connection.connection.isolation_level, serializable)
+ self.assertEqual(
+ new_connection.connection.isolation_level,
+ IsolationLevel.SERIALIZABLE,
+ )
finally:
new_connection.close()
+ def test_connect_invalid_isolation_level(self):
+ self.assertIsNone(connection.connection.isolation_level)
+ new_connection = connection.copy()
+ new_connection.settings_dict["OPTIONS"]["isolation_level"] = -1
+ msg = (
+ "Invalid transaction isolation level -1 specified. Use one of the "
+ "IsolationLevel values."
+ )
+ with self.assertRaisesMessage(ImproperlyConfigured, msg):
+ new_connection.ensure_connection()
+
def test_connect_no_is_usable_checks(self):
new_connection = connection.copy()
try: