diff options
Diffstat (limited to 'tests/backends/postgresql/tests.py')
| -rw-r--r-- | tests/backends/postgresql/tests.py | 152 |
1 files changed, 141 insertions, 11 deletions
diff --git a/tests/backends/postgresql/tests.py b/tests/backends/postgresql/tests.py index a045195991..d28c5be253 100644 --- a/tests/backends/postgresql/tests.py +++ b/tests/backends/postgresql/tests.py @@ -8,6 +8,7 @@ from django.db import ( DEFAULT_DB_ALIAS, DatabaseError, NotSupportedError, + ProgrammingError, connection, connections, ) @@ -20,6 +21,15 @@ except ImportError: is_psycopg3 = False +def no_pool_connection(alias=None): + new_connection = connection.copy(alias) + new_connection.settings_dict = copy.deepcopy(connection.settings_dict) + # Ensure that the second connection circumvents the pool, this is kind + # of a hack, but we cannot easily change the pool connections. + new_connection.settings_dict["OPTIONS"]["pool"] = False + return new_connection + + @unittest.skipUnless(connection.vendor == "postgresql", "PostgreSQL tests") class Tests(TestCase): databases = {"default", "other"} @@ -177,7 +187,7 @@ class Tests(TestCase): PostgreSQL shouldn't roll back SET TIME ZONE, even if the first transaction is rolled back (#17062). """ - new_connection = connection.copy() + new_connection = no_pool_connection() try: # Ensure the database default time zone is different than # the time zone in new_connection.settings_dict. We can @@ -213,7 +223,7 @@ class Tests(TestCase): The connection wrapper shouldn't believe that autocommit is enabled after setting the time zone when AUTOCOMMIT is False (#21452). """ - new_connection = connection.copy() + new_connection = no_pool_connection() new_connection.settings_dict["AUTOCOMMIT"] = False try: @@ -223,6 +233,126 @@ class Tests(TestCase): finally: new_connection.close() + @unittest.skipUnless(is_psycopg3, "psycopg3 specific test") + def test_connect_pool(self): + from psycopg_pool import PoolTimeout + + new_connection = no_pool_connection(alias="default_pool") + new_connection.settings_dict["OPTIONS"]["pool"] = { + "min_size": 0, + "max_size": 2, + "timeout": 0.1, + } + self.assertIsNotNone(new_connection.pool) + + connections = [] + + def get_connection(): + # copy() reuses the existing alias and as such the same pool. + conn = new_connection.copy() + conn.connect() + connections.append(conn) + return conn + + try: + connection_1 = get_connection() # First connection. + connection_1_backend_pid = connection_1.connection.info.backend_pid + get_connection() # Get the second connection. + with self.assertRaises(PoolTimeout): + # The pool has a maximum of 2 connections. + get_connection() + + connection_1.close() # Release back to the pool. + connection_3 = get_connection() + # Reuses the first connection as it is available. + self.assertEqual( + connection_3.connection.info.backend_pid, connection_1_backend_pid + ) + finally: + # Release all connections back to the pool. + for conn in connections: + conn.close() + new_connection.close_pool() + + @unittest.skipUnless(is_psycopg3, "psycopg3 specific test") + def test_connect_pool_set_to_true(self): + new_connection = no_pool_connection(alias="default_pool") + new_connection.settings_dict["OPTIONS"]["pool"] = True + try: + self.assertIsNotNone(new_connection.pool) + finally: + new_connection.close_pool() + + @unittest.skipUnless(is_psycopg3, "psycopg3 specific test") + def test_connect_pool_with_timezone(self): + new_time_zone = "Africa/Nairobi" + new_connection = no_pool_connection(alias="default_pool") + + try: + with new_connection.cursor() as cursor: + cursor.execute("SHOW TIMEZONE") + tz = cursor.fetchone()[0] + self.assertNotEqual(new_time_zone, tz) + finally: + new_connection.close() + + del new_connection.timezone_name + new_connection.settings_dict["OPTIONS"]["pool"] = True + try: + with self.settings(TIME_ZONE=new_time_zone): + with new_connection.cursor() as cursor: + cursor.execute("SHOW TIMEZONE") + tz = cursor.fetchone()[0] + self.assertEqual(new_time_zone, tz) + finally: + new_connection.close() + new_connection.close_pool() + + @unittest.skipUnless(is_psycopg3, "psycopg3 specific test") + def test_pooling_health_checks(self): + new_connection = no_pool_connection(alias="default_pool") + new_connection.settings_dict["OPTIONS"]["pool"] = True + new_connection.settings_dict["CONN_HEALTH_CHECKS"] = False + + try: + self.assertIsNone(new_connection.pool._check) + finally: + new_connection.close_pool() + + new_connection.settings_dict["CONN_HEALTH_CHECKS"] = True + try: + self.assertIsNotNone(new_connection.pool._check) + finally: + new_connection.close_pool() + + @unittest.skipUnless(is_psycopg3, "psycopg3 specific test") + def test_cannot_open_new_connection_in_atomic_block(self): + new_connection = no_pool_connection(alias="default_pool") + new_connection.settings_dict["OPTIONS"]["pool"] = True + + msg = "Cannot open a new connection in an atomic block." + new_connection.in_atomic_block = True + new_connection.closed_in_transaction = True + with self.assertRaisesMessage(ProgrammingError, msg): + new_connection.ensure_connection() + + @unittest.skipUnless(is_psycopg3, "psycopg3 specific test") + def test_pooling_not_support_persistent_connections(self): + new_connection = no_pool_connection(alias="default_pool") + new_connection.settings_dict["OPTIONS"]["pool"] = True + new_connection.settings_dict["CONN_MAX_AGE"] = 10 + msg = "Pooling doesn't support persistent connections." + with self.assertRaisesMessage(ImproperlyConfigured, msg): + new_connection.pool + + @unittest.skipIf(is_psycopg3, "psycopg2 specific test") + def test_connect_pool_setting_ignored_for_psycopg2(self): + new_connection = no_pool_connection() + new_connection.settings_dict["OPTIONS"]["pool"] = True + msg = "Database pooling requires psycopg >= 3" + with self.assertRaisesMessage(ImproperlyConfigured, msg): + new_connection.connect() + def test_connect_isolation_level(self): """ The transaction level can be configured with @@ -236,7 +366,7 @@ class Tests(TestCase): # Check the level on the psycopg connection, not the Django wrapper. self.assertIsNone(connection.connection.isolation_level) - new_connection = connection.copy() + new_connection = no_pool_connection() new_connection.settings_dict["OPTIONS"][ "isolation_level" ] = IsolationLevel.SERIALIZABLE @@ -253,7 +383,7 @@ class Tests(TestCase): def test_connect_invalid_isolation_level(self): self.assertIsNone(connection.connection.isolation_level) - new_connection = connection.copy() + new_connection = no_pool_connection() new_connection.settings_dict["OPTIONS"]["isolation_level"] = -1 msg = ( "Invalid transaction isolation level -1 specified. Use one of the " @@ -269,7 +399,7 @@ class Tests(TestCase): """ try: custom_role = "django_nonexistent_role" - new_connection = connection.copy() + new_connection = no_pool_connection() new_connection.settings_dict["OPTIONS"]["assume_role"] = custom_role msg = f'role "{custom_role}" does not exist' with self.assertRaisesMessage(errors.InvalidParameterValue, msg): @@ -285,7 +415,7 @@ class Tests(TestCase): """ from django.db.backends.postgresql.base import ServerBindingCursor - new_connection = connection.copy() + new_connection = no_pool_connection() new_connection.settings_dict["OPTIONS"]["server_side_binding"] = True try: new_connection.connect() @@ -306,7 +436,7 @@ class Tests(TestCase): class MyCursor(Cursor): pass - new_connection = connection.copy() + new_connection = no_pool_connection() new_connection.settings_dict["OPTIONS"]["cursor_factory"] = MyCursor try: new_connection.connect() @@ -315,7 +445,7 @@ class Tests(TestCase): new_connection.close() def test_connect_no_is_usable_checks(self): - new_connection = connection.copy() + new_connection = no_pool_connection() try: with mock.patch.object(new_connection, "is_usable") as is_usable: new_connection.connect() @@ -324,7 +454,7 @@ class Tests(TestCase): new_connection.close() def test_client_encoding_utf8_enforce(self): - new_connection = connection.copy() + new_connection = no_pool_connection() new_connection.settings_dict["OPTIONS"]["client_encoding"] = "iso-8859-2" try: new_connection.connect() @@ -417,7 +547,7 @@ class Tests(TestCase): self.assertEqual([q["sql"] for q in connection.queries], [copy_sql]) def test_get_database_version(self): - new_connection = connection.copy() + new_connection = no_pool_connection() new_connection.pg_version = 130009 self.assertEqual(new_connection.get_database_version(), (13, 9)) @@ -429,7 +559,7 @@ class Tests(TestCase): self.assertTrue(mocked_get_database_version.called) def test_compose_sql_when_no_connection(self): - new_connection = connection.copy() + new_connection = no_pool_connection() try: self.assertEqual( new_connection.ops.compose_sql("SELECT %s", ["test"]), |
