diff options
| author | Dingning <49514587+HappyDingning@users.noreply.github.com> | 2023-12-12 22:09:18 +0800 |
|---|---|---|
| committer | Mariusz Felisiak <felisiak.mariusz@gmail.com> | 2024-03-07 09:59:33 +0100 |
| commit | 549320946dfdaac6ccb6c0d3150c46db92637187 (patch) | |
| tree | ea7451469fa80c5ed98092e3c6d176f815c135d6 /tests/auth_tests | |
| parent | 1fffa4af1297c01cb1d116a32173271f4889033c (diff) | |
Fixed #35030 -- Made django.contrib.auth decorators to work with async functions.
Diffstat (limited to 'tests/auth_tests')
| -rw-r--r-- | tests/auth_tests/test_decorators.py | 209 |
1 files changed, 209 insertions, 0 deletions
diff --git a/tests/auth_tests/test_decorators.py b/tests/auth_tests/test_decorators.py index 6cc92302d6..48fa915c5c 100644 --- a/tests/auth_tests/test_decorators.py +++ b/tests/auth_tests/test_decorators.py @@ -1,3 +1,7 @@ +from asyncio import iscoroutinefunction + +from asgiref.sync import sync_to_async + from django.conf import settings from django.contrib.auth import models from django.contrib.auth.decorators import ( @@ -19,6 +23,22 @@ class LoginRequiredTestCase(AuthViewsTestCase): Tests the login_required decorators """ + factory = RequestFactory() + + def test_wrapped_sync_function_is_not_coroutine_function(self): + def sync_view(request): + return HttpResponse() + + wrapped_view = login_required(sync_view) + self.assertIs(iscoroutinefunction(wrapped_view), False) + + def test_wrapped_async_function_is_coroutine_function(self): + async def async_view(request): + return HttpResponse() + + wrapped_view = login_required(async_view) + self.assertIs(iscoroutinefunction(wrapped_view), True) + def test_callable(self): """ login_required is assignable to callable objects. @@ -63,6 +83,35 @@ class LoginRequiredTestCase(AuthViewsTestCase): view_url="/login_required_login_url/", login_url="/somewhere/" ) + async def test_login_required_async_view(self, login_url=None): + async def async_view(request): + return HttpResponse() + + async def auser_anonymous(): + return models.AnonymousUser() + + async def auser(): + return self.u1 + + if login_url is None: + async_view = login_required(async_view) + login_url = settings.LOGIN_URL + else: + async_view = login_required(async_view, login_url=login_url) + + request = self.factory.get("/rand") + request.auser = auser_anonymous + response = await async_view(request) + self.assertEqual(response.status_code, 302) + self.assertIn(login_url, response.url) + + request.auser = auser + response = await async_view(request) + self.assertEqual(response.status_code, 200) + + async def test_login_required_next_url_async_view(self): + await self.test_login_required_async_view(login_url="/somewhere/") + class PermissionsRequiredDecoratorTest(TestCase): """ @@ -80,6 +129,24 @@ class PermissionsRequiredDecoratorTest(TestCase): ) cls.user.user_permissions.add(*perms) + @classmethod + async def auser(cls): + return cls.user + + def test_wrapped_sync_function_is_not_coroutine_function(self): + def sync_view(request): + return HttpResponse() + + wrapped_view = permission_required([])(sync_view) + self.assertIs(iscoroutinefunction(wrapped_view), False) + + def test_wrapped_async_function_is_coroutine_function(self): + async def async_view(request): + return HttpResponse() + + wrapped_view = permission_required([])(async_view) + self.assertIs(iscoroutinefunction(wrapped_view), True) + def test_many_permissions_pass(self): @permission_required( ["auth_tests.add_customuser", "auth_tests.change_customuser"] @@ -147,6 +214,73 @@ class PermissionsRequiredDecoratorTest(TestCase): with self.assertRaises(PermissionDenied): a_view(request) + async def test_many_permissions_pass_async_view(self): + @permission_required( + ["auth_tests.add_customuser", "auth_tests.change_customuser"] + ) + async def async_view(request): + return HttpResponse() + + request = self.factory.get("/rand") + request.auser = self.auser + response = await async_view(request) + self.assertEqual(response.status_code, 200) + + async def test_many_permissions_in_set_pass_async_view(self): + @permission_required( + {"auth_tests.add_customuser", "auth_tests.change_customuser"} + ) + async def async_view(request): + return HttpResponse() + + request = self.factory.get("/rand") + request.auser = self.auser + response = await async_view(request) + self.assertEqual(response.status_code, 200) + + async def test_single_permission_pass_async_view(self): + @permission_required("auth_tests.add_customuser") + async def async_view(request): + return HttpResponse() + + request = self.factory.get("/rand") + request.auser = self.auser + response = await async_view(request) + self.assertEqual(response.status_code, 200) + + async def test_permissioned_denied_redirect_async_view(self): + @permission_required( + [ + "auth_tests.add_customuser", + "auth_tests.change_customuser", + "nonexistent-permission", + ] + ) + async def async_view(request): + return HttpResponse() + + request = self.factory.get("/rand") + request.auser = self.auser + response = await async_view(request) + self.assertEqual(response.status_code, 302) + + async def test_permissioned_denied_exception_raised_async_view(self): + @permission_required( + [ + "auth_tests.add_customuser", + "auth_tests.change_customuser", + "nonexistent-permission", + ], + raise_exception=True, + ) + async def async_view(request): + return HttpResponse() + + request = self.factory.get("/rand") + request.auser = self.auser + with self.assertRaises(PermissionDenied): + await async_view(request) + class UserPassesTestDecoratorTest(TestCase): factory = RequestFactory() @@ -162,6 +296,28 @@ class UserPassesTestDecoratorTest(TestCase): ) cls.user_pass.user_permissions.add(*perms) + @classmethod + async def auser_pass(cls): + return cls.user_pass + + @classmethod + async def auser_deny(cls): + return cls.user_deny + + def test_wrapped_sync_function_is_not_coroutine_function(self): + def sync_view(request): + return HttpResponse() + + wrapped_view = user_passes_test(lambda user: True)(sync_view) + self.assertIs(iscoroutinefunction(wrapped_view), False) + + def test_wrapped_async_function_is_coroutine_function(self): + async def async_view(request): + return HttpResponse() + + wrapped_view = user_passes_test(lambda user: True)(async_view) + self.assertIs(iscoroutinefunction(wrapped_view), True) + def test_decorator(self): def sync_test_func(user): return bool( @@ -180,3 +336,56 @@ class UserPassesTestDecoratorTest(TestCase): request.user = self.user_deny response = sync_view(request) self.assertEqual(response.status_code, 302) + + def test_decorator_async_test_func(self): + async def async_test_func(user): + return await sync_to_async(user.has_perms)(["auth_tests.add_customuser"]) + + @user_passes_test(async_test_func) + def sync_view(request): + return HttpResponse() + + request = self.factory.get("/rand") + request.user = self.user_pass + response = sync_view(request) + self.assertEqual(response.status_code, 200) + + request.user = self.user_deny + response = sync_view(request) + self.assertEqual(response.status_code, 302) + + async def test_decorator_async_view(self): + def sync_test_func(user): + return bool( + models.Group.objects.filter(name__istartswith=user.username).exists() + ) + + @user_passes_test(sync_test_func) + async def async_view(request): + return HttpResponse() + + request = self.factory.get("/rand") + request.auser = self.auser_pass + response = await async_view(request) + self.assertEqual(response.status_code, 200) + + request.auser = self.auser_deny + response = await async_view(request) + self.assertEqual(response.status_code, 302) + + async def test_decorator_async_view_async_test_func(self): + async def async_test_func(user): + return await sync_to_async(user.has_perms)(["auth_tests.add_customuser"]) + + @user_passes_test(async_test_func) + async def async_view(request): + return HttpResponse() + + request = self.factory.get("/rand") + request.auser = self.auser_pass + response = await async_view(request) + self.assertEqual(response.status_code, 200) + + request.auser = self.auser_deny + response = await async_view(request) + self.assertEqual(response.status_code, 302) |
