summaryrefslogtreecommitdiff
path: root/tests/backends/postgresql/tests.py
diff options
context:
space:
mode:
authorMariusz Felisiak <felisiak.mariusz@gmail.com>2017-02-11 21:37:49 +0100
committerTim Graham <timograham@gmail.com>2017-06-21 12:00:47 -0400
commit8cb1b1fd8e529d1896daeb089ea726109e0ba4f7 (patch)
tree3337abead0afac6a46b0ccfd48bc530aec3717de /tests/backends/postgresql/tests.py
parent0f91ba1adc037345474749faa64d36a4077b3fb8 (diff)
Reorganized backends tests.
Diffstat (limited to 'tests/backends/postgresql/tests.py')
-rw-r--r--tests/backends/postgresql/tests.py147
1 files changed, 147 insertions, 0 deletions
diff --git a/tests/backends/postgresql/tests.py b/tests/backends/postgresql/tests.py
new file mode 100644
index 0000000000..140fbbc444
--- /dev/null
+++ b/tests/backends/postgresql/tests.py
@@ -0,0 +1,147 @@
+import unittest
+import warnings
+from unittest import mock
+
+from django.db import DatabaseError, connection
+from django.test import TestCase
+
+
+@unittest.skipUnless(connection.vendor == 'postgresql', 'PostgreSQL tests')
+class Tests(TestCase):
+
+ def test_nodb_connection(self):
+ """
+ The _nodb_connection property fallbacks to the default connection
+ database when access to the 'postgres' database is not granted.
+ """
+ def mocked_connect(self):
+ if self.settings_dict['NAME'] is None:
+ raise DatabaseError()
+ return ''
+
+ nodb_conn = connection._nodb_connection
+ self.assertIsNone(nodb_conn.settings_dict['NAME'])
+
+ # Now assume the 'postgres' db isn't available
+ with warnings.catch_warnings(record=True) as w:
+ with mock.patch('django.db.backends.base.base.BaseDatabaseWrapper.connect',
+ side_effect=mocked_connect, autospec=True):
+ warnings.simplefilter('always', RuntimeWarning)
+ nodb_conn = connection._nodb_connection
+ self.assertIsNotNone(nodb_conn.settings_dict['NAME'])
+ self.assertEqual(nodb_conn.settings_dict['NAME'], connection.settings_dict['NAME'])
+ # Check a RuntimeWarning has been emitted
+ self.assertEqual(len(w), 1)
+ self.assertEqual(w[0].message.__class__, RuntimeWarning)
+
+ def test_connect_and_rollback(self):
+ """
+ PostgreSQL shouldn't roll back SET TIME ZONE, even if the first
+ transaction is rolled back (#17062).
+ """
+ new_connection = connection.copy()
+ try:
+ # Ensure the database default time zone is different than
+ # the time zone in new_connection.settings_dict. We can
+ # get the default time zone by reset & show.
+ cursor = new_connection.cursor()
+ cursor.execute("RESET TIMEZONE")
+ cursor.execute("SHOW TIMEZONE")
+ db_default_tz = cursor.fetchone()[0]
+ new_tz = 'Europe/Paris' if db_default_tz == 'UTC' else 'UTC'
+ new_connection.close()
+
+ # Invalidate timezone name cache, because the setting_changed
+ # handler cannot know about new_connection.
+ del new_connection.timezone_name
+
+ # Fetch a new connection with the new_tz as default
+ # time zone, run a query and rollback.
+ with self.settings(TIME_ZONE=new_tz):
+ new_connection.set_autocommit(False)
+ cursor = new_connection.cursor()
+ new_connection.rollback()
+
+ # Now let's see if the rollback rolled back the SET TIME ZONE.
+ cursor.execute("SHOW TIMEZONE")
+ tz = cursor.fetchone()[0]
+ self.assertEqual(new_tz, tz)
+
+ finally:
+ new_connection.close()
+
+ def test_connect_non_autocommit(self):
+ """
+ The connection wrapper shouldn't believe that autocommit is enabled
+ after setting the time zone when AUTOCOMMIT is False (#21452).
+ """
+ new_connection = connection.copy()
+ new_connection.settings_dict['AUTOCOMMIT'] = False
+
+ try:
+ # Open a database connection.
+ new_connection.cursor()
+ self.assertFalse(new_connection.get_autocommit())
+ finally:
+ new_connection.close()
+
+ def test_connect_isolation_level(self):
+ """
+ The transaction level can be configured with
+ DATABASES ['OPTIONS']['isolation_level'].
+ """
+ import psycopg2
+ from psycopg2.extensions import (
+ ISOLATION_LEVEL_READ_COMMITTED as read_committed,
+ ISOLATION_LEVEL_SERIALIZABLE as serializable,
+ )
+ # Since this is a django.test.TestCase, a transaction is in progress
+ # and the isolation level isn't reported as 0. This test assumes that
+ # PostgreSQL is configured with the default isolation level.
+
+ # Check the level on the psycopg2 connection, not the Django wrapper.
+ default_level = read_committed if psycopg2.__version__ < '2.7' else None
+ self.assertEqual(connection.connection.isolation_level, default_level)
+
+ new_connection = connection.copy()
+ new_connection.settings_dict['OPTIONS']['isolation_level'] = serializable
+ try:
+ # Start a transaction so the isolation level isn't reported as 0.
+ new_connection.set_autocommit(False)
+ # Check the level on the psycopg2 connection, not the Django wrapper.
+ self.assertEqual(new_connection.connection.isolation_level, serializable)
+ finally:
+ new_connection.close()
+
+ def _select(self, val):
+ with connection.cursor() as cursor:
+ cursor.execute('SELECT %s', (val,))
+ return cursor.fetchone()[0]
+
+ def test_select_ascii_array(self):
+ a = ['awef']
+ b = self._select(a)
+ self.assertEqual(a[0], b[0])
+
+ def test_select_unicode_array(self):
+ a = ['ᄲawef']
+ b = self._select(a)
+ self.assertEqual(a[0], b[0])
+
+ def test_lookup_cast(self):
+ from django.db.backends.postgresql.operations import DatabaseOperations
+ do = DatabaseOperations(connection=None)
+ lookups = (
+ 'iexact', 'contains', 'icontains', 'startswith', 'istartswith',
+ 'endswith', 'iendswith', 'regex', 'iregex',
+ )
+ for lookup in lookups:
+ with self.subTest(lookup=lookup):
+ self.assertIn('::text', do.lookup_cast(lookup))
+
+ def test_correct_extraction_psycopg2_version(self):
+ from django.db.backends.postgresql.base import psycopg2_version
+ with mock.patch('psycopg2.__version__', '4.2.1 (dt dec pq3 ext lo64)'):
+ self.assertEqual(psycopg2_version(), (4, 2, 1))
+ with mock.patch('psycopg2.__version__', '4.2b0.dev1 (dt dec pq3 ext lo64)'):
+ self.assertEqual(psycopg2_version(), (4, 2))