diff options
| author | Jon Janzen <jon@jonjanzen.com> | 2024-03-31 12:29:10 -0700 |
|---|---|---|
| committer | Sarah Boyce <42296566+sarahboyce@users.noreply.github.com> | 2024-10-07 14:19:41 +0200 |
| commit | 50f89ae850f6b4e35819fe725a08c7e579bfd099 (patch) | |
| tree | 856a0e954e0be928c55f6070f2ac8b766459b3e7 /tests/auth_tests/test_auth_backends.py | |
| parent | 4cad317ff1f9a79d54c1d5b12f1ccbd260ca009f (diff) | |
Fixed #35303 -- Implemented async auth backends and utils.
Diffstat (limited to 'tests/auth_tests/test_auth_backends.py')
| -rw-r--r-- | tests/auth_tests/test_auth_backends.py | 324 |
1 files changed, 324 insertions, 0 deletions
diff --git a/tests/auth_tests/test_auth_backends.py b/tests/auth_tests/test_auth_backends.py index 3b4f40e6e0..b612d27ab0 100644 --- a/tests/auth_tests/test_auth_backends.py +++ b/tests/auth_tests/test_auth_backends.py @@ -2,6 +2,8 @@ import sys from datetime import date from unittest import mock +from asgiref.sync import sync_to_async + from django.contrib.auth import ( BACKEND_SESSION_KEY, SESSION_KEY, @@ -55,17 +57,33 @@ class BaseBackendTest(TestCase): def test_get_user_permissions(self): self.assertEqual(self.user.get_user_permissions(), {"user_perm"}) + async def test_aget_user_permissions(self): + self.assertEqual(await self.user.aget_user_permissions(), {"user_perm"}) + def test_get_group_permissions(self): self.assertEqual(self.user.get_group_permissions(), {"group_perm"}) + async def test_aget_group_permissions(self): + self.assertEqual(await self.user.aget_group_permissions(), {"group_perm"}) + def test_get_all_permissions(self): self.assertEqual(self.user.get_all_permissions(), {"user_perm", "group_perm"}) + async def test_aget_all_permissions(self): + self.assertEqual( + await self.user.aget_all_permissions(), {"user_perm", "group_perm"} + ) + def test_has_perm(self): self.assertIs(self.user.has_perm("user_perm"), True) self.assertIs(self.user.has_perm("group_perm"), True) self.assertIs(self.user.has_perm("other_perm", TestObj()), False) + async def test_ahas_perm(self): + self.assertIs(await self.user.ahas_perm("user_perm"), True) + self.assertIs(await self.user.ahas_perm("group_perm"), True) + self.assertIs(await self.user.ahas_perm("other_perm", TestObj()), False) + def test_has_perms_perm_list_invalid(self): msg = "perm_list must be an iterable of permissions." with self.assertRaisesMessage(ValueError, msg): @@ -73,6 +91,13 @@ class BaseBackendTest(TestCase): with self.assertRaisesMessage(ValueError, msg): self.user.has_perms(object()) + async def test_ahas_perms_perm_list_invalid(self): + msg = "perm_list must be an iterable of permissions." + with self.assertRaisesMessage(ValueError, msg): + await self.user.ahas_perms("user_perm") + with self.assertRaisesMessage(ValueError, msg): + await self.user.ahas_perms(object()) + class CountingMD5PasswordHasher(MD5PasswordHasher): """Hasher that counts how many times it computes a hash.""" @@ -125,6 +150,25 @@ class BaseModelBackendTest: user.save() self.assertIs(user.has_perm("auth.test"), False) + async def test_ahas_perm(self): + user = await self.UserModel._default_manager.aget(pk=self.user.pk) + self.assertIs(await user.ahas_perm("auth.test"), False) + + user.is_staff = True + await user.asave() + self.assertIs(await user.ahas_perm("auth.test"), False) + + user.is_superuser = True + await user.asave() + self.assertIs(await user.ahas_perm("auth.test"), True) + self.assertIs(await user.ahas_module_perms("auth"), True) + + user.is_staff = True + user.is_superuser = True + user.is_active = False + await user.asave() + self.assertIs(await user.ahas_perm("auth.test"), False) + def test_custom_perms(self): user = self.UserModel._default_manager.get(pk=self.user.pk) content_type = ContentType.objects.get_for_model(Group) @@ -174,6 +218,55 @@ class BaseModelBackendTest: self.assertIs(user.has_perm("test"), False) self.assertIs(user.has_perms(["auth.test2", "auth.test3"]), False) + async def test_acustom_perms(self): + user = await self.UserModel._default_manager.aget(pk=self.user.pk) + content_type = await sync_to_async(ContentType.objects.get_for_model)(Group) + perm = await Permission.objects.acreate( + name="test", content_type=content_type, codename="test" + ) + await user.user_permissions.aadd(perm) + + # Reloading user to purge the _perm_cache. + user = await self.UserModel._default_manager.aget(pk=self.user.pk) + self.assertEqual(await user.aget_all_permissions(), {"auth.test"}) + self.assertEqual(await user.aget_user_permissions(), {"auth.test"}) + self.assertEqual(await user.aget_group_permissions(), set()) + self.assertIs(await user.ahas_module_perms("Group"), False) + self.assertIs(await user.ahas_module_perms("auth"), True) + + perm = await Permission.objects.acreate( + name="test2", content_type=content_type, codename="test2" + ) + await user.user_permissions.aadd(perm) + perm = await Permission.objects.acreate( + name="test3", content_type=content_type, codename="test3" + ) + await user.user_permissions.aadd(perm) + user = await self.UserModel._default_manager.aget(pk=self.user.pk) + expected_user_perms = {"auth.test2", "auth.test", "auth.test3"} + self.assertEqual(await user.aget_all_permissions(), expected_user_perms) + self.assertIs(await user.ahas_perm("test"), False) + self.assertIs(await user.ahas_perm("auth.test"), True) + self.assertIs(await user.ahas_perms(["auth.test2", "auth.test3"]), True) + + perm = await Permission.objects.acreate( + name="test_group", content_type=content_type, codename="test_group" + ) + group = await Group.objects.acreate(name="test_group") + await group.permissions.aadd(perm) + await user.groups.aadd(group) + user = await self.UserModel._default_manager.aget(pk=self.user.pk) + self.assertEqual( + await user.aget_all_permissions(), {*expected_user_perms, "auth.test_group"} + ) + self.assertEqual(await user.aget_user_permissions(), expected_user_perms) + self.assertEqual(await user.aget_group_permissions(), {"auth.test_group"}) + self.assertIs(await user.ahas_perms(["auth.test3", "auth.test_group"]), True) + + user = AnonymousUser() + self.assertIs(await user.ahas_perm("test"), False) + self.assertIs(await user.ahas_perms(["auth.test2", "auth.test3"]), False) + def test_has_no_object_perm(self): """Regressiontest for #12462""" user = self.UserModel._default_manager.get(pk=self.user.pk) @@ -188,6 +281,20 @@ class BaseModelBackendTest: self.assertIs(user.has_perm("auth.test"), True) self.assertEqual(user.get_all_permissions(), {"auth.test"}) + async def test_ahas_no_object_perm(self): + """See test_has_no_object_perm()""" + user = await self.UserModel._default_manager.aget(pk=self.user.pk) + content_type = await sync_to_async(ContentType.objects.get_for_model)(Group) + perm = await Permission.objects.acreate( + name="test", content_type=content_type, codename="test" + ) + await user.user_permissions.aadd(perm) + + self.assertIs(await user.ahas_perm("auth.test", "object"), False) + self.assertEqual(await user.aget_all_permissions("object"), set()) + self.assertIs(await user.ahas_perm("auth.test"), True) + self.assertEqual(await user.aget_all_permissions(), {"auth.test"}) + def test_anonymous_has_no_permissions(self): """ #17903 -- Anonymous users shouldn't have permissions in @@ -220,6 +327,38 @@ class BaseModelBackendTest: self.assertEqual(backend.get_user_permissions(user), set()) self.assertEqual(backend.get_group_permissions(user), set()) + async def test_aanonymous_has_no_permissions(self): + """See test_anonymous_has_no_permissions()""" + backend = ModelBackend() + + user = await self.UserModel._default_manager.aget(pk=self.user.pk) + content_type = await sync_to_async(ContentType.objects.get_for_model)(Group) + user_perm = await Permission.objects.acreate( + name="test", content_type=content_type, codename="test_user" + ) + group_perm = await Permission.objects.acreate( + name="test2", content_type=content_type, codename="test_group" + ) + await user.user_permissions.aadd(user_perm) + + group = await Group.objects.acreate(name="test_group") + await user.groups.aadd(group) + await group.permissions.aadd(group_perm) + + self.assertEqual( + await backend.aget_all_permissions(user), + {"auth.test_user", "auth.test_group"}, + ) + self.assertEqual(await backend.aget_user_permissions(user), {"auth.test_user"}) + self.assertEqual( + await backend.aget_group_permissions(user), {"auth.test_group"} + ) + + with mock.patch.object(self.UserModel, "is_anonymous", True): + self.assertEqual(await backend.aget_all_permissions(user), set()) + self.assertEqual(await backend.aget_user_permissions(user), set()) + self.assertEqual(await backend.aget_group_permissions(user), set()) + def test_inactive_has_no_permissions(self): """ #17903 -- Inactive users shouldn't have permissions in @@ -254,11 +393,52 @@ class BaseModelBackendTest: self.assertEqual(backend.get_user_permissions(user), set()) self.assertEqual(backend.get_group_permissions(user), set()) + async def test_ainactive_has_no_permissions(self): + """See test_inactive_has_no_permissions()""" + backend = ModelBackend() + + user = await self.UserModel._default_manager.aget(pk=self.user.pk) + content_type = await sync_to_async(ContentType.objects.get_for_model)(Group) + user_perm = await Permission.objects.acreate( + name="test", content_type=content_type, codename="test_user" + ) + group_perm = await Permission.objects.acreate( + name="test2", content_type=content_type, codename="test_group" + ) + await user.user_permissions.aadd(user_perm) + + group = await Group.objects.acreate(name="test_group") + await user.groups.aadd(group) + await group.permissions.aadd(group_perm) + + self.assertEqual( + await backend.aget_all_permissions(user), + {"auth.test_user", "auth.test_group"}, + ) + self.assertEqual(await backend.aget_user_permissions(user), {"auth.test_user"}) + self.assertEqual( + await backend.aget_group_permissions(user), {"auth.test_group"} + ) + + user.is_active = False + await user.asave() + + self.assertEqual(await backend.aget_all_permissions(user), set()) + self.assertEqual(await backend.aget_user_permissions(user), set()) + self.assertEqual(await backend.aget_group_permissions(user), set()) + def test_get_all_superuser_permissions(self): """A superuser has all permissions. Refs #14795.""" user = self.UserModel._default_manager.get(pk=self.superuser.pk) self.assertEqual(len(user.get_all_permissions()), len(Permission.objects.all())) + async def test_aget_all_superuser_permissions(self): + """See test_get_all_superuser_permissions()""" + user = await self.UserModel._default_manager.aget(pk=self.superuser.pk) + self.assertEqual( + len(await user.aget_all_permissions()), await Permission.objects.acount() + ) + @override_settings( PASSWORD_HASHERS=["auth_tests.test_auth_backends.CountingMD5PasswordHasher"] ) @@ -280,6 +460,24 @@ class BaseModelBackendTest: @override_settings( PASSWORD_HASHERS=["auth_tests.test_auth_backends.CountingMD5PasswordHasher"] ) + async def test_aauthentication_timing(self): + """See test_authentication_timing()""" + # Re-set the password, because this tests overrides PASSWORD_HASHERS. + self.user.set_password("test") + await self.user.asave() + + CountingMD5PasswordHasher.calls = 0 + username = getattr(self.user, self.UserModel.USERNAME_FIELD) + await aauthenticate(username=username, password="test") + self.assertEqual(CountingMD5PasswordHasher.calls, 1) + + CountingMD5PasswordHasher.calls = 0 + await aauthenticate(username="no_such_user", password="test") + self.assertEqual(CountingMD5PasswordHasher.calls, 1) + + @override_settings( + PASSWORD_HASHERS=["auth_tests.test_auth_backends.CountingMD5PasswordHasher"] + ) def test_authentication_without_credentials(self): CountingMD5PasswordHasher.calls = 0 for credentials in ( @@ -320,6 +518,15 @@ class ModelBackendTest(BaseModelBackendTest, TestCase): self.user.save() self.assertIsNone(authenticate(**self.user_credentials)) + async def test_aauthenticate_inactive(self): + """ + An inactive user can't authenticate. + """ + self.assertEqual(await aauthenticate(**self.user_credentials), self.user) + self.user.is_active = False + await self.user.asave() + self.assertIsNone(await aauthenticate(**self.user_credentials)) + @override_settings(AUTH_USER_MODEL="auth_tests.CustomUserWithoutIsActiveField") def test_authenticate_user_without_is_active_field(self): """ @@ -332,6 +539,18 @@ class ModelBackendTest(BaseModelBackendTest, TestCase): ) self.assertEqual(authenticate(username="test", password="test"), user) + @override_settings(AUTH_USER_MODEL="auth_tests.CustomUserWithoutIsActiveField") + async def test_aauthenticate_user_without_is_active_field(self): + """ + A custom user without an `is_active` field is allowed to authenticate. + """ + user = await CustomUserWithoutIsActiveField.objects._acreate_user( + username="test", + email="test@example.com", + password="test", + ) + self.assertEqual(await aauthenticate(username="test", password="test"), user) + @override_settings(AUTH_USER_MODEL="auth_tests.ExtensionUser") class ExtensionUserModelBackendTest(BaseModelBackendTest, TestCase): @@ -403,6 +622,15 @@ class CustomUserModelBackendAuthenticateTest(TestCase): authenticated_user = authenticate(email="test@example.com", password="test") self.assertEqual(test_user, authenticated_user) + async def test_aauthenticate(self): + test_user = await CustomUser._default_manager.acreate_user( + email="test@example.com", password="test", date_of_birth=date(2006, 4, 25) + ) + authenticated_user = await aauthenticate( + email="test@example.com", password="test" + ) + self.assertEqual(test_user, authenticated_user) + @override_settings(AUTH_USER_MODEL="auth_tests.UUIDUser") class UUIDUserTests(TestCase): @@ -416,6 +644,13 @@ class UUIDUserTests(TestCase): UUIDUser.objects.get(pk=self.client.session[SESSION_KEY]), user ) + async def test_alogin(self): + """See test_login()""" + user = await UUIDUser.objects.acreate_user(username="uuid", password="test") + self.assertTrue(await self.client.alogin(username="uuid", password="test")) + session_key = await self.client.session.aget(SESSION_KEY) + self.assertEqual(await UUIDUser.objects.aget(pk=session_key), user) + class TestObj: pass @@ -435,9 +670,15 @@ class SimpleRowlevelBackend: return True return False + async def ahas_perm(self, user, perm, obj=None): + return self.has_perm(user, perm, obj) + def has_module_perms(self, user, app_label): return (user.is_anonymous or user.is_active) and app_label == "app1" + async def ahas_module_perms(self, user, app_label): + return self.has_module_perms(user, app_label) + def get_all_permissions(self, user, obj=None): if not obj: return [] # We only support row level perms @@ -452,6 +693,9 @@ class SimpleRowlevelBackend: else: return ["simple"] + async def aget_all_permissions(self, user, obj=None): + return self.get_all_permissions(user, obj) + def get_group_permissions(self, user, obj=None): if not obj: return # We only support row level perms @@ -524,10 +768,18 @@ class AnonymousUserBackendTest(SimpleTestCase): self.assertIs(self.user1.has_perm("perm", TestObj()), False) self.assertIs(self.user1.has_perm("anon", TestObj()), True) + async def test_ahas_perm(self): + self.assertIs(await self.user1.ahas_perm("perm", TestObj()), False) + self.assertIs(await self.user1.ahas_perm("anon", TestObj()), True) + def test_has_perms(self): self.assertIs(self.user1.has_perms(["anon"], TestObj()), True) self.assertIs(self.user1.has_perms(["anon", "perm"], TestObj()), False) + async def test_ahas_perms(self): + self.assertIs(await self.user1.ahas_perms(["anon"], TestObj()), True) + self.assertIs(await self.user1.ahas_perms(["anon", "perm"], TestObj()), False) + def test_has_perms_perm_list_invalid(self): msg = "perm_list must be an iterable of permissions." with self.assertRaisesMessage(ValueError, msg): @@ -535,13 +787,27 @@ class AnonymousUserBackendTest(SimpleTestCase): with self.assertRaisesMessage(ValueError, msg): self.user1.has_perms(object()) + async def test_ahas_perms_perm_list_invalid(self): + msg = "perm_list must be an iterable of permissions." + with self.assertRaisesMessage(ValueError, msg): + await self.user1.ahas_perms("perm") + with self.assertRaisesMessage(ValueError, msg): + await self.user1.ahas_perms(object()) + def test_has_module_perms(self): self.assertIs(self.user1.has_module_perms("app1"), True) self.assertIs(self.user1.has_module_perms("app2"), False) + async def test_ahas_module_perms(self): + self.assertIs(await self.user1.ahas_module_perms("app1"), True) + self.assertIs(await self.user1.ahas_module_perms("app2"), False) + def test_get_all_permissions(self): self.assertEqual(self.user1.get_all_permissions(TestObj()), {"anon"}) + async def test_aget_all_permissions(self): + self.assertEqual(await self.user1.aget_all_permissions(TestObj()), {"anon"}) + @override_settings(AUTHENTICATION_BACKENDS=[]) class NoBackendsTest(TestCase): @@ -561,6 +827,14 @@ class NoBackendsTest(TestCase): with self.assertRaisesMessage(ImproperlyConfigured, msg): self.user.has_perm(("perm", TestObj())) + async def test_araises_exception(self): + msg = ( + "No authentication backends have been defined. " + "Does AUTHENTICATION_BACKENDS contain anything?" + ) + with self.assertRaisesMessage(ImproperlyConfigured, msg): + await self.user.ahas_perm(("perm", TestObj())) + @override_settings( AUTHENTICATION_BACKENDS=["auth_tests.test_auth_backends.SimpleRowlevelBackend"] @@ -593,12 +867,21 @@ class PermissionDeniedBackend: def authenticate(self, request, username=None, password=None): raise PermissionDenied + async def aauthenticate(self, request, username=None, password=None): + raise PermissionDenied + def has_perm(self, user_obj, perm, obj=None): raise PermissionDenied + async def ahas_perm(self, user_obj, perm, obj=None): + raise PermissionDenied + def has_module_perms(self, user_obj, app_label): raise PermissionDenied + async def ahas_module_perms(self, user_obj, app_label): + raise PermissionDenied + class PermissionDeniedBackendTest(TestCase): """ @@ -631,10 +914,25 @@ class PermissionDeniedBackendTest(TestCase): [{"password": "********************", "username": "test"}], ) + @modify_settings(AUTHENTICATION_BACKENDS={"prepend": backend}) + async def test_aauthenticate_permission_denied(self): + self.assertIsNone(await aauthenticate(username="test", password="test")) + # user_login_failed signal is sent. + self.assertEqual( + self.user_login_failed, + [{"password": "********************", "username": "test"}], + ) + @modify_settings(AUTHENTICATION_BACKENDS={"append": backend}) def test_authenticates(self): self.assertEqual(authenticate(username="test", password="test"), self.user1) + @modify_settings(AUTHENTICATION_BACKENDS={"append": backend}) + async def test_aauthenticate(self): + self.assertEqual( + await aauthenticate(username="test", password="test"), self.user1 + ) + @modify_settings(AUTHENTICATION_BACKENDS={"prepend": backend}) def test_has_perm_denied(self): content_type = ContentType.objects.get_for_model(Group) @@ -646,6 +944,17 @@ class PermissionDeniedBackendTest(TestCase): self.assertIs(self.user1.has_perm("auth.test"), False) self.assertIs(self.user1.has_module_perms("auth"), False) + @modify_settings(AUTHENTICATION_BACKENDS={"prepend": backend}) + async def test_ahas_perm_denied(self): + content_type = await sync_to_async(ContentType.objects.get_for_model)(Group) + perm = await Permission.objects.acreate( + name="test", content_type=content_type, codename="test" + ) + await self.user1.user_permissions.aadd(perm) + + self.assertIs(await self.user1.ahas_perm("auth.test"), False) + self.assertIs(await self.user1.ahas_module_perms("auth"), False) + @modify_settings(AUTHENTICATION_BACKENDS={"append": backend}) def test_has_perm(self): content_type = ContentType.objects.get_for_model(Group) @@ -657,6 +966,17 @@ class PermissionDeniedBackendTest(TestCase): self.assertIs(self.user1.has_perm("auth.test"), True) self.assertIs(self.user1.has_module_perms("auth"), True) + @modify_settings(AUTHENTICATION_BACKENDS={"append": backend}) + async def test_ahas_perm(self): + content_type = await sync_to_async(ContentType.objects.get_for_model)(Group) + perm = await Permission.objects.acreate( + name="test", content_type=content_type, codename="test" + ) + await self.user1.user_permissions.aadd(perm) + + self.assertIs(await self.user1.ahas_perm("auth.test"), True) + self.assertIs(await self.user1.ahas_module_perms("auth"), True) + class NewModelBackend(ModelBackend): pass @@ -715,6 +1035,10 @@ class TypeErrorBackend: def authenticate(self, request, username=None, password=None): raise TypeError + @sensitive_variables("password") + async def aauthenticate(self, request, username=None, password=None): + raise TypeError + class SkippedBackend: def authenticate(self): |
