diff options
Diffstat (limited to 'tests/backends/postgresql/tests.py')
| -rw-r--r-- | tests/backends/postgresql/tests.py | 115 |
1 files changed, 67 insertions, 48 deletions
diff --git a/tests/backends/postgresql/tests.py b/tests/backends/postgresql/tests.py index 1905147f6f..af08f6f286 100644 --- a/tests/backends/postgresql/tests.py +++ b/tests/backends/postgresql/tests.py @@ -9,9 +9,9 @@ from django.db.backends.base.base import BaseDatabaseWrapper from django.test import TestCase, override_settings -@unittest.skipUnless(connection.vendor == 'postgresql', 'PostgreSQL tests') +@unittest.skipUnless(connection.vendor == "postgresql", "PostgreSQL tests") class Tests(TestCase): - databases = {'default', 'other'} + databases = {"default", "other"} def test_nodb_cursor(self): """ @@ -21,14 +21,14 @@ class Tests(TestCase): orig_connect = BaseDatabaseWrapper.connect def mocked_connect(self): - if self.settings_dict['NAME'] is None: + if self.settings_dict["NAME"] is None: raise DatabaseError() return orig_connect(self) with connection._nodb_cursor() as cursor: self.assertIs(cursor.closed, False) self.assertIsNotNone(cursor.db.connection) - self.assertIsNone(cursor.db.settings_dict['NAME']) + self.assertIsNone(cursor.db.settings_dict["NAME"]) self.assertIs(cursor.closed, True) self.assertIsNone(cursor.db.connection) @@ -41,24 +41,29 @@ class Tests(TestCase): "database and will use the first PostgreSQL database instead." ) with self.assertWarnsMessage(RuntimeWarning, msg): - with mock.patch('django.db.backends.base.base.BaseDatabaseWrapper.connect', - side_effect=mocked_connect, autospec=True): + with mock.patch( + "django.db.backends.base.base.BaseDatabaseWrapper.connect", + side_effect=mocked_connect, + autospec=True, + ): with mock.patch.object( connection, - 'settings_dict', - {**connection.settings_dict, 'NAME': 'postgres'}, + "settings_dict", + {**connection.settings_dict, "NAME": "postgres"}, ): with connection._nodb_cursor() as cursor: self.assertIs(cursor.closed, False) self.assertIsNotNone(cursor.db.connection) self.assertIs(cursor.closed, True) self.assertIsNone(cursor.db.connection) - self.assertIsNotNone(cursor.db.settings_dict['NAME']) - self.assertEqual(cursor.db.settings_dict['NAME'], connections['other'].settings_dict['NAME']) + self.assertIsNotNone(cursor.db.settings_dict["NAME"]) + self.assertEqual( + cursor.db.settings_dict["NAME"], connections["other"].settings_dict["NAME"] + ) # Cursor is yielded only for the first PostgreSQL database. with self.assertWarnsMessage(RuntimeWarning, msg): with mock.patch( - 'django.db.backends.base.base.BaseDatabaseWrapper.connect', + "django.db.backends.base.base.BaseDatabaseWrapper.connect", side_effect=mocked_connect, autospec=True, ): @@ -71,13 +76,14 @@ class Tests(TestCase): _nodb_cursor() re-raises authentication failure to the 'postgres' db when other connection to the PostgreSQL database isn't available. """ + def mocked_connect(self): raise DatabaseError() def mocked_all(self): test_connection = copy.copy(connections[DEFAULT_DB_ALIAS]) test_connection.settings_dict = copy.deepcopy(connection.settings_dict) - test_connection.settings_dict['NAME'] = 'postgres' + test_connection.settings_dict["NAME"] = "postgres" return [test_connection] msg = ( @@ -89,12 +95,12 @@ class Tests(TestCase): ) with self.assertWarnsMessage(RuntimeWarning, msg): mocker_connections_all = mock.patch( - 'django.utils.connection.BaseConnectionHandler.all', + "django.utils.connection.BaseConnectionHandler.all", side_effect=mocked_all, autospec=True, ) mocker_connect = mock.patch( - 'django.db.backends.base.base.BaseDatabaseWrapper.connect', + "django.db.backends.base.base.BaseDatabaseWrapper.connect", side_effect=mocked_connect, autospec=True, ) @@ -104,27 +110,29 @@ class Tests(TestCase): pass def test_nodb_cursor_reraise_exceptions(self): - with self.assertRaisesMessage(DatabaseError, 'exception'): + with self.assertRaisesMessage(DatabaseError, "exception"): with connection._nodb_cursor(): - raise DatabaseError('exception') + raise DatabaseError("exception") def test_database_name_too_long(self): from django.db.backends.postgresql.base import DatabaseWrapper + settings = connection.settings_dict.copy() max_name_length = connection.ops.max_name_length() - settings['NAME'] = 'a' + (max_name_length * 'a') + settings["NAME"] = "a" + (max_name_length * "a") msg = ( "The database name '%s' (%d characters) is longer than " "PostgreSQL's limit of %s characters. Supply a shorter NAME in " "settings.DATABASES." - ) % (settings['NAME'], max_name_length + 1, max_name_length) + ) % (settings["NAME"], max_name_length + 1, max_name_length) with self.assertRaisesMessage(ImproperlyConfigured, msg): DatabaseWrapper(settings).get_connection_params() def test_database_name_empty(self): from django.db.backends.postgresql.base import DatabaseWrapper + settings = connection.settings_dict.copy() - settings['NAME'] = '' + settings["NAME"] = "" msg = ( "settings.DATABASES is improperly configured. Please supply the " "NAME or OPTIONS['service'] value." @@ -134,22 +142,24 @@ class Tests(TestCase): def test_service_name(self): from django.db.backends.postgresql.base import DatabaseWrapper + settings = connection.settings_dict.copy() - settings['OPTIONS'] = {'service': 'my_service'} - settings['NAME'] = '' + settings["OPTIONS"] = {"service": "my_service"} + settings["NAME"] = "" params = DatabaseWrapper(settings).get_connection_params() - self.assertEqual(params['service'], 'my_service') - self.assertNotIn('database', params) + self.assertEqual(params["service"], "my_service") + self.assertNotIn("database", params) def test_service_name_default_db(self): # None is used to connect to the default 'postgres' db. from django.db.backends.postgresql.base import DatabaseWrapper + settings = connection.settings_dict.copy() - settings['NAME'] = None - settings['OPTIONS'] = {'service': 'django_test'} + settings["NAME"] = None + settings["OPTIONS"] = {"service": "django_test"} params = DatabaseWrapper(settings).get_connection_params() - self.assertEqual(params['database'], 'postgres') - self.assertNotIn('service', params) + self.assertEqual(params["database"], "postgres") + self.assertNotIn("service", params) def test_connect_and_rollback(self): """ @@ -165,7 +175,7 @@ class Tests(TestCase): cursor.execute("RESET TIMEZONE") cursor.execute("SHOW TIMEZONE") db_default_tz = cursor.fetchone()[0] - new_tz = 'Europe/Paris' if db_default_tz == 'UTC' else 'UTC' + new_tz = "Europe/Paris" if db_default_tz == "UTC" else "UTC" new_connection.close() # Invalidate timezone name cache, because the setting_changed @@ -193,7 +203,7 @@ class Tests(TestCase): after setting the time zone when AUTOCOMMIT is False (#21452). """ new_connection = connection.copy() - new_connection.settings_dict['AUTOCOMMIT'] = False + new_connection.settings_dict["AUTOCOMMIT"] = False try: # Open a database connection. @@ -207,9 +217,7 @@ class Tests(TestCase): The transaction level can be configured with DATABASES ['OPTIONS']['isolation_level']. """ - from psycopg2.extensions import ( - ISOLATION_LEVEL_SERIALIZABLE as serializable, - ) + from psycopg2.extensions import ISOLATION_LEVEL_SERIALIZABLE as serializable # Since this is a django.test.TestCase, a transaction is in progress # and the isolation level isn't reported as 0. This test assumes that @@ -218,7 +226,7 @@ class Tests(TestCase): self.assertIsNone(connection.connection.isolation_level) new_connection = connection.copy() - new_connection.settings_dict['OPTIONS']['isolation_level'] = serializable + new_connection.settings_dict["OPTIONS"]["isolation_level"] = serializable try: # Start a transaction so the isolation level isn't reported as 0. new_connection.set_autocommit(False) @@ -230,7 +238,7 @@ class Tests(TestCase): def test_connect_no_is_usable_checks(self): new_connection = connection.copy() try: - with mock.patch.object(new_connection, 'is_usable') as is_usable: + with mock.patch.object(new_connection, "is_usable") as is_usable: new_connection.connect() is_usable.assert_not_called() finally: @@ -238,49 +246,60 @@ class Tests(TestCase): def _select(self, val): with connection.cursor() as cursor: - cursor.execute('SELECT %s', (val,)) + cursor.execute("SELECT %s", (val,)) return cursor.fetchone()[0] def test_select_ascii_array(self): - a = ['awef'] + a = ["awef"] b = self._select(a) self.assertEqual(a[0], b[0]) def test_select_unicode_array(self): - a = ['ᄲawef'] + a = ["ᄲawef"] b = self._select(a) self.assertEqual(a[0], b[0]) def test_lookup_cast(self): from django.db.backends.postgresql.operations import DatabaseOperations + do = DatabaseOperations(connection=None) lookups = ( - 'iexact', 'contains', 'icontains', 'startswith', 'istartswith', - 'endswith', 'iendswith', 'regex', 'iregex', + "iexact", + "contains", + "icontains", + "startswith", + "istartswith", + "endswith", + "iendswith", + "regex", + "iregex", ) for lookup in lookups: with self.subTest(lookup=lookup): - self.assertIn('::text', do.lookup_cast(lookup)) + self.assertIn("::text", do.lookup_cast(lookup)) for lookup in lookups: - for field_type in ('CICharField', 'CIEmailField', 'CITextField'): + for field_type in ("CICharField", "CIEmailField", "CITextField"): with self.subTest(lookup=lookup, field_type=field_type): - self.assertIn('::citext', do.lookup_cast(lookup, internal_type=field_type)) + self.assertIn( + "::citext", do.lookup_cast(lookup, internal_type=field_type) + ) def test_correct_extraction_psycopg2_version(self): from django.db.backends.postgresql.base import psycopg2_version - with mock.patch('psycopg2.__version__', '4.2.1 (dt dec pq3 ext lo64)'): + + with mock.patch("psycopg2.__version__", "4.2.1 (dt dec pq3 ext lo64)"): self.assertEqual(psycopg2_version(), (4, 2, 1)) - with mock.patch('psycopg2.__version__', '4.2b0.dev1 (dt dec pq3 ext lo64)'): + with mock.patch("psycopg2.__version__", "4.2b0.dev1 (dt dec pq3 ext lo64)"): self.assertEqual(psycopg2_version(), (4, 2)) @override_settings(DEBUG=True) def test_copy_cursors(self): out = StringIO() - copy_expert_sql = 'COPY django_session TO STDOUT (FORMAT CSV, HEADER)' + copy_expert_sql = "COPY django_session TO STDOUT (FORMAT CSV, HEADER)" with connection.cursor() as cursor: cursor.copy_expert(copy_expert_sql, out) - cursor.copy_to(out, 'django_session') + cursor.copy_to(out, "django_session") self.assertEqual( - [q['sql'] for q in connection.queries], - [copy_expert_sql, 'COPY django_session TO STDOUT'], + [q["sql"] for q in connection.queries], + [copy_expert_sql, "COPY django_session TO STDOUT"], ) |
