diff options
| author | django-bot <ops@djangoproject.com> | 2022-02-03 20:24:19 +0100 |
|---|---|---|
| committer | Mariusz Felisiak <felisiak.mariusz@gmail.com> | 2022-02-07 20:37:05 +0100 |
| commit | 9c19aff7c7561e3a82978a272ecdaad40dda5c00 (patch) | |
| tree | f0506b668a013d0063e5fba3dbf4863b466713ba /tests/select_for_update | |
| parent | f68fa8b45dfac545cfc4111d4e52804c86db68d3 (diff) | |
Refs #33476 -- Reformatted code with Black.
Diffstat (limited to 'tests/select_for_update')
| -rw-r--r-- | tests/select_for_update/models.py | 6 | ||||
| -rw-r--r-- | tests/select_for_update/tests.py | 375 |
2 files changed, 221 insertions, 160 deletions
diff --git a/tests/select_for_update/models.py b/tests/select_for_update/models.py index 0a6396bc70..c1b42f026d 100644 --- a/tests/select_for_update/models.py +++ b/tests/select_for_update/models.py @@ -39,9 +39,9 @@ class CityCountryProxy(models.Model): class Person(models.Model): name = models.CharField(max_length=30) - born = models.ForeignKey(City, models.CASCADE, related_name='+') - died = models.ForeignKey(City, models.CASCADE, related_name='+') + born = models.ForeignKey(City, models.CASCADE, related_name="+") + died = models.ForeignKey(City, models.CASCADE, related_name="+") class PersonProfile(models.Model): - person = models.OneToOneField(Person, models.CASCADE, related_name='profile') + person = models.OneToOneField(Person, models.CASCADE, related_name="profile") diff --git a/tests/select_for_update/tests.py b/tests/select_for_update/tests.py index 5bae8a355a..9d73c59af9 100644 --- a/tests/select_for_update/tests.py +++ b/tests/select_for_update/tests.py @@ -6,32 +6,46 @@ from multiple_database.routers import TestRouter from django.core.exceptions import FieldError from django.db import ( - DatabaseError, NotSupportedError, connection, connections, router, + DatabaseError, + NotSupportedError, + connection, + connections, + router, transaction, ) from django.test import ( - TransactionTestCase, override_settings, skipIfDBFeature, + TransactionTestCase, + override_settings, + skipIfDBFeature, skipUnlessDBFeature, ) from django.test.utils import CaptureQueriesContext from .models import ( - City, CityCountryProxy, Country, EUCity, EUCountry, Person, PersonProfile, + City, + CityCountryProxy, + Country, + EUCity, + EUCountry, + Person, + PersonProfile, ) class SelectForUpdateTests(TransactionTestCase): - available_apps = ['select_for_update'] + available_apps = ["select_for_update"] def setUp(self): # This is executed in autocommit mode so that code in # run_select_for_update can see this data. - self.country1 = Country.objects.create(name='Belgium') - self.country2 = Country.objects.create(name='France') - self.city1 = City.objects.create(name='Liberchies', country=self.country1) - self.city2 = City.objects.create(name='Samois-sur-Seine', country=self.country2) - self.person = Person.objects.create(name='Reinhardt', born=self.city1, died=self.city2) + self.country1 = Country.objects.create(name="Belgium") + self.country2 = Country.objects.create(name="France") + self.city1 = City.objects.create(name="Liberchies", country=self.country1) + self.city2 = City.objects.create(name="Samois-sur-Seine", country=self.country2) + self.person = Person.objects.create( + name="Reinhardt", born=self.city1, died=self.city2 + ) self.person_profile = PersonProfile.objects.create(person=self.person) # We need another database connection in transaction to test that one @@ -50,9 +64,9 @@ class SelectForUpdateTests(TransactionTestCase): # Start a blocking transaction. At some point, # end_blocking_transaction() should be called. self.cursor = self.new_connection.cursor() - sql = 'SELECT * FROM %(db_table)s %(for_update)s;' % { - 'db_table': Person._meta.db_table, - 'for_update': self.new_connection.ops.for_update_sql(), + sql = "SELECT * FROM %(db_table)s %(for_update)s;" % { + "db_table": Person._meta.db_table, + "for_update": self.new_connection.ops.for_update_sql(), } self.cursor.execute(sql, ()) self.cursor.fetchone() @@ -67,9 +81,9 @@ class SelectForUpdateTests(TransactionTestCase): # Examine the SQL that was executed to determine whether it # contains the 'SELECT..FOR UPDATE' stanza. for_update_sql = connection.ops.for_update_sql(**kwargs) - return any(for_update_sql in query['sql'] for query in queries) + return any(for_update_sql in query["sql"] for query in queries) - @skipUnlessDBFeature('has_select_for_update') + @skipUnlessDBFeature("has_select_for_update") def test_for_update_sql_generated(self): """ The backend's FOR UPDATE variant appears in @@ -79,7 +93,7 @@ class SelectForUpdateTests(TransactionTestCase): list(Person.objects.all().select_for_update()) self.assertTrue(self.has_for_update_sql(ctx.captured_queries)) - @skipUnlessDBFeature('has_select_for_update_nowait') + @skipUnlessDBFeature("has_select_for_update_nowait") def test_for_update_sql_generated_nowait(self): """ The backend's FOR UPDATE NOWAIT variant appears in @@ -89,7 +103,7 @@ class SelectForUpdateTests(TransactionTestCase): list(Person.objects.all().select_for_update(nowait=True)) self.assertTrue(self.has_for_update_sql(ctx.captured_queries, nowait=True)) - @skipUnlessDBFeature('has_select_for_update_skip_locked') + @skipUnlessDBFeature("has_select_for_update_skip_locked") def test_for_update_sql_generated_skip_locked(self): """ The backend's FOR UPDATE SKIP LOCKED variant appears in @@ -99,7 +113,7 @@ class SelectForUpdateTests(TransactionTestCase): list(Person.objects.all().select_for_update(skip_locked=True)) self.assertTrue(self.has_for_update_sql(ctx.captured_queries, skip_locked=True)) - @skipUnlessDBFeature('has_select_for_no_key_update') + @skipUnlessDBFeature("has_select_for_no_key_update") def test_update_sql_generated_no_key(self): """ The backend's FOR NO KEY UPDATE variant appears in generated SQL when @@ -109,150 +123,175 @@ class SelectForUpdateTests(TransactionTestCase): list(Person.objects.all().select_for_update(no_key=True)) self.assertIs(self.has_for_update_sql(ctx.captured_queries, no_key=True), True) - @skipUnlessDBFeature('has_select_for_update_of') + @skipUnlessDBFeature("has_select_for_update_of") def test_for_update_sql_generated_of(self): """ The backend's FOR UPDATE OF variant appears in the generated SQL when select_for_update() is invoked. """ with transaction.atomic(), CaptureQueriesContext(connection) as ctx: - list(Person.objects.select_related( - 'born__country', - ).select_for_update( - of=('born__country',), - ).select_for_update( - of=('self', 'born__country') - )) - features = connections['default'].features + list( + Person.objects.select_related( + "born__country", + ) + .select_for_update( + of=("born__country",), + ) + .select_for_update(of=("self", "born__country")) + ) + features = connections["default"].features if features.select_for_update_of_column: expected = [ 'select_for_update_person"."id', 'select_for_update_country"."entity_ptr_id', ] else: - expected = ['select_for_update_person', 'select_for_update_country'] + expected = ["select_for_update_person", "select_for_update_country"] expected = [connection.ops.quote_name(value) for value in expected] self.assertTrue(self.has_for_update_sql(ctx.captured_queries, of=expected)) - @skipUnlessDBFeature('has_select_for_update_of') + @skipUnlessDBFeature("has_select_for_update_of") def test_for_update_sql_model_inheritance_generated_of(self): with transaction.atomic(), CaptureQueriesContext(connection) as ctx: - list(EUCountry.objects.select_for_update(of=('self',))) + list(EUCountry.objects.select_for_update(of=("self",))) if connection.features.select_for_update_of_column: expected = ['select_for_update_eucountry"."country_ptr_id'] else: - expected = ['select_for_update_eucountry'] + expected = ["select_for_update_eucountry"] expected = [connection.ops.quote_name(value) for value in expected] self.assertTrue(self.has_for_update_sql(ctx.captured_queries, of=expected)) - @skipUnlessDBFeature('has_select_for_update_of') + @skipUnlessDBFeature("has_select_for_update_of") def test_for_update_sql_model_inheritance_ptr_generated_of(self): with transaction.atomic(), CaptureQueriesContext(connection) as ctx: - list(EUCountry.objects.select_for_update(of=('self', 'country_ptr',))) + list( + EUCountry.objects.select_for_update( + of=( + "self", + "country_ptr", + ) + ) + ) if connection.features.select_for_update_of_column: expected = [ 'select_for_update_eucountry"."country_ptr_id', 'select_for_update_country"."entity_ptr_id', ] else: - expected = ['select_for_update_eucountry', 'select_for_update_country'] + expected = ["select_for_update_eucountry", "select_for_update_country"] expected = [connection.ops.quote_name(value) for value in expected] self.assertTrue(self.has_for_update_sql(ctx.captured_queries, of=expected)) - @skipUnlessDBFeature('has_select_for_update_of') + @skipUnlessDBFeature("has_select_for_update_of") def test_for_update_sql_related_model_inheritance_generated_of(self): with transaction.atomic(), CaptureQueriesContext(connection) as ctx: - list(EUCity.objects.select_related('country').select_for_update( - of=('self', 'country'), - )) + list( + EUCity.objects.select_related("country").select_for_update( + of=("self", "country"), + ) + ) if connection.features.select_for_update_of_column: expected = [ 'select_for_update_eucity"."id', 'select_for_update_eucountry"."country_ptr_id', ] else: - expected = ['select_for_update_eucity', 'select_for_update_eucountry'] + expected = ["select_for_update_eucity", "select_for_update_eucountry"] expected = [connection.ops.quote_name(value) for value in expected] self.assertTrue(self.has_for_update_sql(ctx.captured_queries, of=expected)) - @skipUnlessDBFeature('has_select_for_update_of') + @skipUnlessDBFeature("has_select_for_update_of") def test_for_update_sql_model_inheritance_nested_ptr_generated_of(self): with transaction.atomic(), CaptureQueriesContext(connection) as ctx: - list(EUCity.objects.select_related('country').select_for_update( - of=('self', 'country__country_ptr',), - )) + list( + EUCity.objects.select_related("country").select_for_update( + of=( + "self", + "country__country_ptr", + ), + ) + ) if connection.features.select_for_update_of_column: expected = [ 'select_for_update_eucity"."id', 'select_for_update_country"."entity_ptr_id', ] else: - expected = ['select_for_update_eucity', 'select_for_update_country'] + expected = ["select_for_update_eucity", "select_for_update_country"] expected = [connection.ops.quote_name(value) for value in expected] self.assertTrue(self.has_for_update_sql(ctx.captured_queries, of=expected)) - @skipUnlessDBFeature('has_select_for_update_of') + @skipUnlessDBFeature("has_select_for_update_of") def test_for_update_sql_multilevel_model_inheritance_ptr_generated_of(self): with transaction.atomic(), CaptureQueriesContext(connection) as ctx: - list(EUCountry.objects.select_for_update( - of=('country_ptr', 'country_ptr__entity_ptr'), - )) + list( + EUCountry.objects.select_for_update( + of=("country_ptr", "country_ptr__entity_ptr"), + ) + ) if connection.features.select_for_update_of_column: expected = [ 'select_for_update_country"."entity_ptr_id', 'select_for_update_entity"."id', ] else: - expected = ['select_for_update_country', 'select_for_update_entity'] + expected = ["select_for_update_country", "select_for_update_entity"] expected = [connection.ops.quote_name(value) for value in expected] self.assertTrue(self.has_for_update_sql(ctx.captured_queries, of=expected)) - @skipUnlessDBFeature('has_select_for_update_of') + @skipUnlessDBFeature("has_select_for_update_of") def test_for_update_sql_model_proxy_generated_of(self): with transaction.atomic(), CaptureQueriesContext(connection) as ctx: - list(CityCountryProxy.objects.select_related( - 'country', - ).select_for_update( - of=('country',), - )) + list( + CityCountryProxy.objects.select_related("country",).select_for_update( + of=("country",), + ) + ) if connection.features.select_for_update_of_column: expected = ['select_for_update_country"."entity_ptr_id'] else: - expected = ['select_for_update_country'] + expected = ["select_for_update_country"] expected = [connection.ops.quote_name(value) for value in expected] self.assertTrue(self.has_for_update_sql(ctx.captured_queries, of=expected)) - @skipUnlessDBFeature('has_select_for_update_of') + @skipUnlessDBFeature("has_select_for_update_of") def test_for_update_of_followed_by_values(self): with transaction.atomic(): - values = list(Person.objects.select_for_update(of=('self',)).values('pk')) - self.assertEqual(values, [{'pk': self.person.pk}]) + values = list(Person.objects.select_for_update(of=("self",)).values("pk")) + self.assertEqual(values, [{"pk": self.person.pk}]) - @skipUnlessDBFeature('has_select_for_update_of') + @skipUnlessDBFeature("has_select_for_update_of") def test_for_update_of_followed_by_values_list(self): with transaction.atomic(): - values = list(Person.objects.select_for_update(of=('self',)).values_list('pk')) + values = list( + Person.objects.select_for_update(of=("self",)).values_list("pk") + ) self.assertEqual(values, [(self.person.pk,)]) - @skipUnlessDBFeature('has_select_for_update_of') + @skipUnlessDBFeature("has_select_for_update_of") def test_for_update_of_self_when_self_is_not_selected(self): """ select_for_update(of=['self']) when the only columns selected are from related tables. """ with transaction.atomic(): - values = list(Person.objects.select_related('born').select_for_update(of=('self',)).values('born__name')) - self.assertEqual(values, [{'born__name': self.city1.name}]) + values = list( + Person.objects.select_related("born") + .select_for_update(of=("self",)) + .values("born__name") + ) + self.assertEqual(values, [{"born__name": self.city1.name}]) @skipUnlessDBFeature( - 'has_select_for_update_of', 'supports_select_for_update_with_limit', + "has_select_for_update_of", + "supports_select_for_update_with_limit", ) def test_for_update_of_with_exists(self): with transaction.atomic(): - qs = Person.objects.select_for_update(of=('self', 'born')) + qs = Person.objects.select_for_update(of=("self", "born")) self.assertIs(qs.exists(), True) - @skipUnlessDBFeature('has_select_for_update_nowait') + @skipUnlessDBFeature("has_select_for_update_nowait") def test_nowait_raises_error_on_block(self): """ If nowait is specified, we expect an error to be raised rather @@ -264,7 +303,7 @@ class SelectForUpdateTests(TransactionTestCase): thread = threading.Thread( target=self.run_select_for_update, args=(status,), - kwargs={'nowait': True}, + kwargs={"nowait": True}, ) thread.start() @@ -273,7 +312,7 @@ class SelectForUpdateTests(TransactionTestCase): self.end_blocking_transaction() self.assertIsInstance(status[-1], DatabaseError) - @skipUnlessDBFeature('has_select_for_update_skip_locked') + @skipUnlessDBFeature("has_select_for_update_skip_locked") def test_skip_locked_skips_locked_rows(self): """ If skip_locked is specified, the locked row is skipped resulting in @@ -284,7 +323,7 @@ class SelectForUpdateTests(TransactionTestCase): thread = threading.Thread( target=self.run_select_for_update, args=(status,), - kwargs={'skip_locked': True}, + kwargs={"skip_locked": True}, ) thread.start() time.sleep(1) @@ -292,158 +331,177 @@ class SelectForUpdateTests(TransactionTestCase): self.end_blocking_transaction() self.assertIsInstance(status[-1], Person.DoesNotExist) - @skipIfDBFeature('has_select_for_update_nowait') - @skipUnlessDBFeature('has_select_for_update') + @skipIfDBFeature("has_select_for_update_nowait") + @skipUnlessDBFeature("has_select_for_update") def test_unsupported_nowait_raises_error(self): """ NotSupportedError is raised if a SELECT...FOR UPDATE NOWAIT is run on a database backend that supports FOR UPDATE but not NOWAIT. """ - with self.assertRaisesMessage(NotSupportedError, 'NOWAIT is not supported on this database backend.'): + with self.assertRaisesMessage( + NotSupportedError, "NOWAIT is not supported on this database backend." + ): with transaction.atomic(): Person.objects.select_for_update(nowait=True).get() - @skipIfDBFeature('has_select_for_update_skip_locked') - @skipUnlessDBFeature('has_select_for_update') + @skipIfDBFeature("has_select_for_update_skip_locked") + @skipUnlessDBFeature("has_select_for_update") def test_unsupported_skip_locked_raises_error(self): """ NotSupportedError is raised if a SELECT...FOR UPDATE SKIP LOCKED is run on a database backend that supports FOR UPDATE but not SKIP LOCKED. """ - with self.assertRaisesMessage(NotSupportedError, 'SKIP LOCKED is not supported on this database backend.'): + with self.assertRaisesMessage( + NotSupportedError, "SKIP LOCKED is not supported on this database backend." + ): with transaction.atomic(): Person.objects.select_for_update(skip_locked=True).get() - @skipIfDBFeature('has_select_for_update_of') - @skipUnlessDBFeature('has_select_for_update') + @skipIfDBFeature("has_select_for_update_of") + @skipUnlessDBFeature("has_select_for_update") def test_unsupported_of_raises_error(self): """ NotSupportedError is raised if a SELECT...FOR UPDATE OF... is run on a database backend that supports FOR UPDATE but not OF. """ - msg = 'FOR UPDATE OF is not supported on this database backend.' + msg = "FOR UPDATE OF is not supported on this database backend." with self.assertRaisesMessage(NotSupportedError, msg): with transaction.atomic(): - Person.objects.select_for_update(of=('self',)).get() + Person.objects.select_for_update(of=("self",)).get() - @skipIfDBFeature('has_select_for_no_key_update') - @skipUnlessDBFeature('has_select_for_update') + @skipIfDBFeature("has_select_for_no_key_update") + @skipUnlessDBFeature("has_select_for_update") def test_unsuported_no_key_raises_error(self): """ NotSupportedError is raised if a SELECT...FOR NO KEY UPDATE... is run on a database backend that supports FOR UPDATE but not NO KEY. """ - msg = 'FOR NO KEY UPDATE is not supported on this database backend.' + msg = "FOR NO KEY UPDATE is not supported on this database backend." with self.assertRaisesMessage(NotSupportedError, msg): with transaction.atomic(): Person.objects.select_for_update(no_key=True).get() - @skipUnlessDBFeature('has_select_for_update', 'has_select_for_update_of') + @skipUnlessDBFeature("has_select_for_update", "has_select_for_update_of") def test_unrelated_of_argument_raises_error(self): """ FieldError is raised if a non-relation field is specified in of=(...). """ msg = ( - 'Invalid field name(s) given in select_for_update(of=(...)): %s. ' - 'Only relational fields followed in the query are allowed. ' - 'Choices are: self, born, born__country, ' - 'born__country__entity_ptr.' + "Invalid field name(s) given in select_for_update(of=(...)): %s. " + "Only relational fields followed in the query are allowed. " + "Choices are: self, born, born__country, " + "born__country__entity_ptr." ) invalid_of = [ - ('nonexistent',), - ('name',), - ('born__nonexistent',), - ('born__name',), - ('born__nonexistent', 'born__name'), + ("nonexistent",), + ("name",), + ("born__nonexistent",), + ("born__name",), + ("born__nonexistent", "born__name"), ] for of in invalid_of: with self.subTest(of=of): - with self.assertRaisesMessage(FieldError, msg % ', '.join(of)): + with self.assertRaisesMessage(FieldError, msg % ", ".join(of)): with transaction.atomic(): - Person.objects.select_related('born__country').select_for_update(of=of).get() + Person.objects.select_related( + "born__country" + ).select_for_update(of=of).get() - @skipUnlessDBFeature('has_select_for_update', 'has_select_for_update_of') + @skipUnlessDBFeature("has_select_for_update", "has_select_for_update_of") def test_related_but_unselected_of_argument_raises_error(self): """ FieldError is raised if a relation field that is not followed in the query is specified in of=(...). """ msg = ( - 'Invalid field name(s) given in select_for_update(of=(...)): %s. ' - 'Only relational fields followed in the query are allowed. ' - 'Choices are: self, born, profile.' + "Invalid field name(s) given in select_for_update(of=(...)): %s. " + "Only relational fields followed in the query are allowed. " + "Choices are: self, born, profile." ) - for name in ['born__country', 'died', 'died__country']: + for name in ["born__country", "died", "died__country"]: with self.subTest(name=name): with self.assertRaisesMessage(FieldError, msg % name): with transaction.atomic(): - Person.objects.select_related( - 'born', 'profile', - ).exclude(profile=None).select_for_update(of=(name,)).get() + Person.objects.select_related("born", "profile",).exclude( + profile=None + ).select_for_update(of=(name,)).get() - @skipUnlessDBFeature('has_select_for_update', 'has_select_for_update_of') + @skipUnlessDBFeature("has_select_for_update", "has_select_for_update_of") def test_model_inheritance_of_argument_raises_error_ptr_in_choices(self): msg = ( - 'Invalid field name(s) given in select_for_update(of=(...)): ' - 'name. Only relational fields followed in the query are allowed. ' - 'Choices are: self, %s.' + "Invalid field name(s) given in select_for_update(of=(...)): " + "name. Only relational fields followed in the query are allowed. " + "Choices are: self, %s." ) with self.assertRaisesMessage( FieldError, - msg % 'country, country__country_ptr, country__country_ptr__entity_ptr', + msg % "country, country__country_ptr, country__country_ptr__entity_ptr", ): with transaction.atomic(): EUCity.objects.select_related( - 'country', - ).select_for_update(of=('name',)).get() - with self.assertRaisesMessage(FieldError, msg % 'country_ptr, country_ptr__entity_ptr'): + "country", + ).select_for_update(of=("name",)).get() + with self.assertRaisesMessage( + FieldError, msg % "country_ptr, country_ptr__entity_ptr" + ): with transaction.atomic(): - EUCountry.objects.select_for_update(of=('name',)).get() + EUCountry.objects.select_for_update(of=("name",)).get() - @skipUnlessDBFeature('has_select_for_update', 'has_select_for_update_of') + @skipUnlessDBFeature("has_select_for_update", "has_select_for_update_of") def test_model_proxy_of_argument_raises_error_proxy_field_in_choices(self): msg = ( - 'Invalid field name(s) given in select_for_update(of=(...)): ' - 'name. Only relational fields followed in the query are allowed. ' - 'Choices are: self, country, country__entity_ptr.' + "Invalid field name(s) given in select_for_update(of=(...)): " + "name. Only relational fields followed in the query are allowed. " + "Choices are: self, country, country__entity_ptr." ) with self.assertRaisesMessage(FieldError, msg): with transaction.atomic(): CityCountryProxy.objects.select_related( - 'country', - ).select_for_update(of=('name',)).get() + "country", + ).select_for_update(of=("name",)).get() - @skipUnlessDBFeature('has_select_for_update', 'has_select_for_update_of') + @skipUnlessDBFeature("has_select_for_update", "has_select_for_update_of") def test_reverse_one_to_one_of_arguments(self): """ Reverse OneToOneFields may be included in of=(...) as long as NULLs are excluded because LEFT JOIN isn't allowed in SELECT FOR UPDATE. """ with transaction.atomic(): - person = Person.objects.select_related( - 'profile', - ).exclude(profile=None).select_for_update(of=('profile',)).get() + person = ( + Person.objects.select_related( + "profile", + ) + .exclude(profile=None) + .select_for_update(of=("profile",)) + .get() + ) self.assertEqual(person.profile, self.person_profile) - @skipUnlessDBFeature('has_select_for_update') + @skipUnlessDBFeature("has_select_for_update") def test_for_update_after_from(self): - features_class = connections['default'].features.__class__ - attribute_to_patch = "%s.%s.for_update_after_from" % (features_class.__module__, features_class.__name__) + features_class = connections["default"].features.__class__ + attribute_to_patch = "%s.%s.for_update_after_from" % ( + features_class.__module__, + features_class.__name__, + ) with mock.patch(attribute_to_patch, return_value=True): with transaction.atomic(): - self.assertIn('FOR UPDATE WHERE', str(Person.objects.filter(name='foo').select_for_update().query)) + self.assertIn( + "FOR UPDATE WHERE", + str(Person.objects.filter(name="foo").select_for_update().query), + ) - @skipUnlessDBFeature('has_select_for_update') + @skipUnlessDBFeature("has_select_for_update") def test_for_update_requires_transaction(self): """ A TransactionManagementError is raised when a select_for_update query is executed outside of a transaction. """ - msg = 'select_for_update cannot be used outside of a transaction.' + msg = "select_for_update cannot be used outside of a transaction." with self.assertRaisesMessage(transaction.TransactionManagementError, msg): list(Person.objects.all().select_for_update()) - @skipUnlessDBFeature('has_select_for_update') + @skipUnlessDBFeature("has_select_for_update") def test_for_update_requires_transaction_only_in_execution(self): """ No TransactionManagementError is raised @@ -451,23 +509,23 @@ class SelectForUpdateTests(TransactionTestCase): only when the query is executed. """ people = Person.objects.all().select_for_update() - msg = 'select_for_update cannot be used outside of a transaction.' + msg = "select_for_update cannot be used outside of a transaction." with self.assertRaisesMessage(transaction.TransactionManagementError, msg): list(people) - @skipUnlessDBFeature('supports_select_for_update_with_limit') + @skipUnlessDBFeature("supports_select_for_update_with_limit") def test_select_for_update_with_limit(self): - other = Person.objects.create(name='Grappeli', born=self.city1, died=self.city2) + other = Person.objects.create(name="Grappeli", born=self.city1, died=self.city2) with transaction.atomic(): - qs = list(Person.objects.all().order_by('pk').select_for_update()[1:2]) + qs = list(Person.objects.all().order_by("pk").select_for_update()[1:2]) self.assertEqual(qs[0], other) - @skipIfDBFeature('supports_select_for_update_with_limit') + @skipIfDBFeature("supports_select_for_update_with_limit") def test_unsupported_select_for_update_with_limit(self): - msg = 'LIMIT/OFFSET is not supported with select_for_update on this database backend.' + msg = "LIMIT/OFFSET is not supported with select_for_update on this database backend." with self.assertRaisesMessage(NotSupportedError, msg): with transaction.atomic(): - list(Person.objects.all().order_by('pk').select_for_update()[1:2]) + list(Person.objects.all().order_by("pk").select_for_update()[1:2]) def run_select_for_update(self, status, **kwargs): """ @@ -477,13 +535,13 @@ class SelectForUpdateTests(TransactionTestCase): This function expects to run in a separate thread. """ - status.append('started') + status.append("started") try: # We need to enter transaction management again, as this is done on # per-thread basis with transaction.atomic(): person = Person.objects.select_for_update(**kwargs).get() - person.name = 'Fred' + person.name = "Fred" person.save() except (DatabaseError, Person.DoesNotExist) as e: status.append(e) @@ -492,8 +550,8 @@ class SelectForUpdateTests(TransactionTestCase): # database connection. Close it without waiting for the GC. connection.close() - @skipUnlessDBFeature('has_select_for_update') - @skipUnlessDBFeature('supports_transactions') + @skipUnlessDBFeature("has_select_for_update") + @skipUnlessDBFeature("supports_transactions") def test_block(self): """ A thread running a select_for_update that accesses rows being touched @@ -505,9 +563,7 @@ class SelectForUpdateTests(TransactionTestCase): # Now, try it again using the ORM's select_for_update # facility. Do this in a separate thread. status = [] - thread = threading.Thread( - target=self.run_select_for_update, args=(status,) - ) + thread = threading.Thread(target=self.run_select_for_update, args=(status,)) # The thread should immediately block, but we'll sleep # for a bit to make sure. @@ -517,12 +573,12 @@ class SelectForUpdateTests(TransactionTestCase): sanity_count += 1 time.sleep(1) if sanity_count >= 10: - raise ValueError('Thread did not run and block') + raise ValueError("Thread did not run and block") # Check the person hasn't been updated. Since this isn't # using FOR UPDATE, it won't block. p = Person.objects.get(pk=self.person.pk) - self.assertEqual('Reinhardt', p.name) + self.assertEqual("Reinhardt", p.name) # When we end our blocking transaction, our thread should # be able to continue. @@ -538,9 +594,9 @@ class SelectForUpdateTests(TransactionTestCase): transaction.commit() p = Person.objects.get(pk=self.person.pk) - self.assertEqual('Fred', p.name) + self.assertEqual("Fred", p.name) - @skipUnlessDBFeature('has_select_for_update') + @skipUnlessDBFeature("has_select_for_update") def test_raw_lock_not_available(self): """ Running a raw query which can't obtain a FOR UPDATE lock raises @@ -552,9 +608,10 @@ class SelectForUpdateTests(TransactionTestCase): try: list( Person.objects.raw( - 'SELECT * FROM %s %s' % ( + "SELECT * FROM %s %s" + % ( Person._meta.db_table, - connection.ops.for_update_sql(nowait=True) + connection.ops.for_update_sql(nowait=True), ) ) ) @@ -565,31 +622,33 @@ class SelectForUpdateTests(TransactionTestCase): # database connection. Close it without waiting for the GC. # Connection cannot be closed on Oracle because cursor is still # open. - if connection.vendor != 'oracle': + if connection.vendor != "oracle": connection.close() status = [] - thread = threading.Thread(target=raw, kwargs={'status': status}) + thread = threading.Thread(target=raw, kwargs={"status": status}) thread.start() time.sleep(1) thread.join() self.end_blocking_transaction() self.assertIsInstance(status[-1], DatabaseError) - @skipUnlessDBFeature('has_select_for_update') + @skipUnlessDBFeature("has_select_for_update") @override_settings(DATABASE_ROUTERS=[TestRouter()]) def test_select_for_update_on_multidb(self): query = Person.objects.select_for_update() self.assertEqual(router.db_for_write(Person), query.db) - @skipUnlessDBFeature('has_select_for_update') + @skipUnlessDBFeature("has_select_for_update") def test_select_for_update_with_get(self): with transaction.atomic(): - person = Person.objects.select_for_update().get(name='Reinhardt') - self.assertEqual(person.name, 'Reinhardt') + person = Person.objects.select_for_update().get(name="Reinhardt") + self.assertEqual(person.name, "Reinhardt") def test_nowait_and_skip_locked(self): - with self.assertRaisesMessage(ValueError, 'The nowait option cannot be used with skip_locked.'): + with self.assertRaisesMessage( + ValueError, "The nowait option cannot be used with skip_locked." + ): Person.objects.select_for_update(nowait=True, skip_locked=True) def test_ordered_select_for_update(self): @@ -598,5 +657,7 @@ class SelectForUpdateTests(TransactionTestCase): to specify a row locking order to prevent deadlocks (#27193). """ with transaction.atomic(): - qs = Person.objects.filter(id__in=Person.objects.order_by('-id').select_for_update()) - self.assertIn('ORDER BY', str(qs.query)) + qs = Person.objects.filter( + id__in=Person.objects.order_by("-id").select_for_update() + ) + self.assertIn("ORDER BY", str(qs.query)) |
