diff options
| author | django-bot <ops@djangoproject.com> | 2022-02-03 20:24:19 +0100 |
|---|---|---|
| committer | Mariusz Felisiak <felisiak.mariusz@gmail.com> | 2022-02-07 20:37:05 +0100 |
| commit | 9c19aff7c7561e3a82978a272ecdaad40dda5c00 (patch) | |
| tree | f0506b668a013d0063e5fba3dbf4863b466713ba /tests/backends/base/test_base.py | |
| parent | f68fa8b45dfac545cfc4111d4e52804c86db68d3 (diff) | |
Refs #33476 -- Reformatted code with Black.
Diffstat (limited to 'tests/backends/base/test_base.py')
| -rw-r--r-- | tests/backends/base/test_base.py | 94 |
1 files changed, 52 insertions, 42 deletions
diff --git a/tests/backends/base/test_base.py b/tests/backends/base/test_base.py index b2ebea5829..00ef766d5d 100644 --- a/tests/backends/base/test_base.py +++ b/tests/backends/base/test_base.py @@ -8,7 +8,6 @@ from ..models import Square class DatabaseWrapperTests(SimpleTestCase): - def test_repr(self): conn = connections[DEFAULT_DB_ALIAS] self.assertEqual( @@ -25,12 +24,12 @@ class DatabaseWrapperTests(SimpleTestCase): conn = connections[DEFAULT_DB_ALIAS] conn_class = type(conn) attr_names = [ - ('client_class', 'client'), - ('creation_class', 'creation'), - ('features_class', 'features'), - ('introspection_class', 'introspection'), - ('ops_class', 'ops'), - ('validation_class', 'validation'), + ("client_class", "client"), + ("creation_class", "creation"), + ("features_class", "features"), + ("introspection_class", "introspection"), + ("ops_class", "ops"), + ("validation_class", "validation"), ] for class_attr_name, instance_attr_name in attr_names: class_attr_value = getattr(conn_class, class_attr_name) @@ -39,23 +38,22 @@ class DatabaseWrapperTests(SimpleTestCase): self.assertIsInstance(instance_attr_value, class_attr_value) def test_initialization_display_name(self): - self.assertEqual(BaseDatabaseWrapper.display_name, 'unknown') - self.assertNotEqual(connection.display_name, 'unknown') + self.assertEqual(BaseDatabaseWrapper.display_name, "unknown") + self.assertNotEqual(connection.display_name, "unknown") class ExecuteWrapperTests(TestCase): - @staticmethod def call_execute(connection, params=None): - ret_val = '1' if params is None else '%s' - sql = 'SELECT ' + ret_val + connection.features.bare_select_suffix + ret_val = "1" if params is None else "%s" + sql = "SELECT " + ret_val + connection.features.bare_select_suffix with connection.cursor() as cursor: cursor.execute(sql, params) def call_executemany(self, connection, params=None): # executemany() must use an update query. Make sure it does nothing # by putting a false condition in the WHERE clause. - sql = 'DELETE FROM {} WHERE 0=1 AND 0=%s'.format(Square._meta.db_table) + sql = "DELETE FROM {} WHERE 0=1 AND 0=%s".format(Square._meta.db_table) if params is None: params = [(i,) for i in range(3)] with connection.cursor() as cursor: @@ -71,10 +69,10 @@ class ExecuteWrapperTests(TestCase): self.call_execute(connection) self.assertTrue(wrapper.called) (_, sql, params, many, context), _ = wrapper.call_args - self.assertIn('SELECT', sql) + self.assertIn("SELECT", sql) self.assertIsNone(params) self.assertIs(many, False) - self.assertEqual(context['connection'], connection) + self.assertEqual(context["connection"], connection) def test_wrapper_invoked_many(self): wrapper = self.mock_wrapper() @@ -82,16 +80,16 @@ class ExecuteWrapperTests(TestCase): self.call_executemany(connection) self.assertTrue(wrapper.called) (_, sql, param_list, many, context), _ = wrapper.call_args - self.assertIn('DELETE', sql) + self.assertIn("DELETE", sql) self.assertIsInstance(param_list, (list, tuple)) self.assertIs(many, True) - self.assertEqual(context['connection'], connection) + self.assertEqual(context["connection"], connection) def test_database_queried(self): wrapper = self.mock_wrapper() with connection.execute_wrapper(wrapper): with connection.cursor() as cursor: - sql = 'SELECT 17' + connection.features.bare_select_suffix + sql = "SELECT 17" + connection.features.bare_select_suffix cursor.execute(sql) seventeen = cursor.fetchall() self.assertEqual(list(seventeen), [(17,)]) @@ -100,7 +98,9 @@ class ExecuteWrapperTests(TestCase): def test_nested_wrapper_invoked(self): outer_wrapper = self.mock_wrapper() inner_wrapper = self.mock_wrapper() - with connection.execute_wrapper(outer_wrapper), connection.execute_wrapper(inner_wrapper): + with connection.execute_wrapper(outer_wrapper), connection.execute_wrapper( + inner_wrapper + ): self.call_execute(connection) self.assertEqual(inner_wrapper.call_count, 1) self.call_executemany(connection) @@ -109,9 +109,12 @@ class ExecuteWrapperTests(TestCase): def test_outer_wrapper_blocks(self): def blocker(*args): pass + wrapper = self.mock_wrapper() c = connection # This alias shortens the next line. - with c.execute_wrapper(wrapper), c.execute_wrapper(blocker), c.execute_wrapper(wrapper): + with c.execute_wrapper(wrapper), c.execute_wrapper(blocker), c.execute_wrapper( + wrapper + ): with c.cursor() as cursor: cursor.execute("The database never sees this") self.assertEqual(wrapper.call_count, 1) @@ -128,16 +131,16 @@ class ExecuteWrapperTests(TestCase): def test_wrapper_connection_specific(self): wrapper = self.mock_wrapper() - with connections['other'].execute_wrapper(wrapper): - self.assertEqual(connections['other'].execute_wrappers, [wrapper]) + with connections["other"].execute_wrapper(wrapper): + self.assertEqual(connections["other"].execute_wrappers, [wrapper]) self.call_execute(connection) self.assertFalse(wrapper.called) self.assertEqual(connection.execute_wrappers, []) - self.assertEqual(connections['other'].execute_wrappers, []) + self.assertEqual(connections["other"].execute_wrappers, []) class ConnectionHealthChecksTests(SimpleTestCase): - databases = {'default'} + databases = {"default"} def setUp(self): # All test cases here need newly configured and created connections. @@ -146,25 +149,28 @@ class ConnectionHealthChecksTests(SimpleTestCase): self.addCleanup(connection.close) def patch_settings_dict(self, conn_health_checks): - self.settings_dict_patcher = patch.dict(connection.settings_dict, { - **connection.settings_dict, - 'CONN_MAX_AGE': None, - 'CONN_HEALTH_CHECKS': conn_health_checks, - }) + self.settings_dict_patcher = patch.dict( + connection.settings_dict, + { + **connection.settings_dict, + "CONN_MAX_AGE": None, + "CONN_HEALTH_CHECKS": conn_health_checks, + }, + ) self.settings_dict_patcher.start() self.addCleanup(self.settings_dict_patcher.stop) def run_query(self): with connection.cursor() as cursor: - cursor.execute('SELECT 42' + connection.features.bare_select_suffix) + cursor.execute("SELECT 42" + connection.features.bare_select_suffix) - @skipUnlessDBFeature('test_db_allows_multiple_connections') + @skipUnlessDBFeature("test_db_allows_multiple_connections") def test_health_checks_enabled(self): self.patch_settings_dict(conn_health_checks=True) self.assertIsNone(connection.connection) # Newly created connections are considered healthy without performing # the health check. - with patch.object(connection, 'is_usable', side_effect=AssertionError): + with patch.object(connection, "is_usable", side_effect=AssertionError): self.run_query() old_connection = connection.connection @@ -173,7 +179,9 @@ class ConnectionHealthChecksTests(SimpleTestCase): self.assertIs(old_connection, connection.connection) # Simulate connection health check failing. - with patch.object(connection, 'is_usable', return_value=False) as mocked_is_usable: + with patch.object( + connection, "is_usable", return_value=False + ) as mocked_is_usable: self.run_query() new_connection = connection.connection # A new connection is established. @@ -194,13 +202,13 @@ class ConnectionHealthChecksTests(SimpleTestCase): self.run_query() self.assertIs(new_connection, connection.connection) - @skipUnlessDBFeature('test_db_allows_multiple_connections') + @skipUnlessDBFeature("test_db_allows_multiple_connections") def test_health_checks_enabled_errors_occurred(self): self.patch_settings_dict(conn_health_checks=True) self.assertIsNone(connection.connection) # Newly created connections are considered healthy without performing # the health check. - with patch.object(connection, 'is_usable', side_effect=AssertionError): + with patch.object(connection, "is_usable", side_effect=AssertionError): self.run_query() old_connection = connection.connection @@ -213,16 +221,16 @@ class ConnectionHealthChecksTests(SimpleTestCase): # No additional health checks after the one in # close_if_unusable_or_obsolete() are executed during this "request" # when running queries. - with patch.object(connection, 'is_usable', side_effect=AssertionError): + with patch.object(connection, "is_usable", side_effect=AssertionError): self.run_query() - @skipUnlessDBFeature('test_db_allows_multiple_connections') + @skipUnlessDBFeature("test_db_allows_multiple_connections") def test_health_checks_disabled(self): self.patch_settings_dict(conn_health_checks=False) self.assertIsNone(connection.connection) # Newly created connections are considered healthy without performing # the health check. - with patch.object(connection, 'is_usable', side_effect=AssertionError): + with patch.object(connection, "is_usable", side_effect=AssertionError): self.run_query() old_connection = connection.connection @@ -231,7 +239,7 @@ class ConnectionHealthChecksTests(SimpleTestCase): # Persistent connections are enabled (connection is not). self.assertIs(old_connection, connection.connection) # Health checks are not performed. - with patch.object(connection, 'is_usable', side_effect=AssertionError): + with patch.object(connection, "is_usable", side_effect=AssertionError): self.run_query() # Health check wasn't performed and the connection is unchanged. self.assertIs(old_connection, connection.connection) @@ -240,13 +248,13 @@ class ConnectionHealthChecksTests(SimpleTestCase): # the current "request". self.assertIs(old_connection, connection.connection) - @skipUnlessDBFeature('test_db_allows_multiple_connections') + @skipUnlessDBFeature("test_db_allows_multiple_connections") def test_set_autocommit_health_checks_enabled(self): self.patch_settings_dict(conn_health_checks=True) self.assertIsNone(connection.connection) # Newly created connections are considered healthy without performing # the health check. - with patch.object(connection, 'is_usable', side_effect=AssertionError): + with patch.object(connection, "is_usable", side_effect=AssertionError): # Simulate outermost atomic block: changing autocommit for # a connection. connection.set_autocommit(False) @@ -261,7 +269,9 @@ class ConnectionHealthChecksTests(SimpleTestCase): self.assertIs(old_connection, connection.connection) # Simulate connection health check failing. - with patch.object(connection, 'is_usable', return_value=False) as mocked_is_usable: + with patch.object( + connection, "is_usable", return_value=False + ) as mocked_is_usable: # Simulate outermost atomic block: changing autocommit for # a connection. connection.set_autocommit(False) |
