summaryrefslogtreecommitdiff
path: root/tests/auth_tests/test_auth_backends.py
diff options
context:
space:
mode:
authorJon Janzen <jon@jonjanzen.com>2024-03-31 12:29:10 -0700
committerSarah Boyce <42296566+sarahboyce@users.noreply.github.com>2024-10-07 14:19:41 +0200
commit50f89ae850f6b4e35819fe725a08c7e579bfd099 (patch)
tree856a0e954e0be928c55f6070f2ac8b766459b3e7 /tests/auth_tests/test_auth_backends.py
parent4cad317ff1f9a79d54c1d5b12f1ccbd260ca009f (diff)
Fixed #35303 -- Implemented async auth backends and utils.
Diffstat (limited to 'tests/auth_tests/test_auth_backends.py')
-rw-r--r--tests/auth_tests/test_auth_backends.py324
1 files changed, 324 insertions, 0 deletions
diff --git a/tests/auth_tests/test_auth_backends.py b/tests/auth_tests/test_auth_backends.py
index 3b4f40e6e0..b612d27ab0 100644
--- a/tests/auth_tests/test_auth_backends.py
+++ b/tests/auth_tests/test_auth_backends.py
@@ -2,6 +2,8 @@ import sys
from datetime import date
from unittest import mock
+from asgiref.sync import sync_to_async
+
from django.contrib.auth import (
BACKEND_SESSION_KEY,
SESSION_KEY,
@@ -55,17 +57,33 @@ class BaseBackendTest(TestCase):
def test_get_user_permissions(self):
self.assertEqual(self.user.get_user_permissions(), {"user_perm"})
+ async def test_aget_user_permissions(self):
+ self.assertEqual(await self.user.aget_user_permissions(), {"user_perm"})
+
def test_get_group_permissions(self):
self.assertEqual(self.user.get_group_permissions(), {"group_perm"})
+ async def test_aget_group_permissions(self):
+ self.assertEqual(await self.user.aget_group_permissions(), {"group_perm"})
+
def test_get_all_permissions(self):
self.assertEqual(self.user.get_all_permissions(), {"user_perm", "group_perm"})
+ async def test_aget_all_permissions(self):
+ self.assertEqual(
+ await self.user.aget_all_permissions(), {"user_perm", "group_perm"}
+ )
+
def test_has_perm(self):
self.assertIs(self.user.has_perm("user_perm"), True)
self.assertIs(self.user.has_perm("group_perm"), True)
self.assertIs(self.user.has_perm("other_perm", TestObj()), False)
+ async def test_ahas_perm(self):
+ self.assertIs(await self.user.ahas_perm("user_perm"), True)
+ self.assertIs(await self.user.ahas_perm("group_perm"), True)
+ self.assertIs(await self.user.ahas_perm("other_perm", TestObj()), False)
+
def test_has_perms_perm_list_invalid(self):
msg = "perm_list must be an iterable of permissions."
with self.assertRaisesMessage(ValueError, msg):
@@ -73,6 +91,13 @@ class BaseBackendTest(TestCase):
with self.assertRaisesMessage(ValueError, msg):
self.user.has_perms(object())
+ async def test_ahas_perms_perm_list_invalid(self):
+ msg = "perm_list must be an iterable of permissions."
+ with self.assertRaisesMessage(ValueError, msg):
+ await self.user.ahas_perms("user_perm")
+ with self.assertRaisesMessage(ValueError, msg):
+ await self.user.ahas_perms(object())
+
class CountingMD5PasswordHasher(MD5PasswordHasher):
"""Hasher that counts how many times it computes a hash."""
@@ -125,6 +150,25 @@ class BaseModelBackendTest:
user.save()
self.assertIs(user.has_perm("auth.test"), False)
+ async def test_ahas_perm(self):
+ user = await self.UserModel._default_manager.aget(pk=self.user.pk)
+ self.assertIs(await user.ahas_perm("auth.test"), False)
+
+ user.is_staff = True
+ await user.asave()
+ self.assertIs(await user.ahas_perm("auth.test"), False)
+
+ user.is_superuser = True
+ await user.asave()
+ self.assertIs(await user.ahas_perm("auth.test"), True)
+ self.assertIs(await user.ahas_module_perms("auth"), True)
+
+ user.is_staff = True
+ user.is_superuser = True
+ user.is_active = False
+ await user.asave()
+ self.assertIs(await user.ahas_perm("auth.test"), False)
+
def test_custom_perms(self):
user = self.UserModel._default_manager.get(pk=self.user.pk)
content_type = ContentType.objects.get_for_model(Group)
@@ -174,6 +218,55 @@ class BaseModelBackendTest:
self.assertIs(user.has_perm("test"), False)
self.assertIs(user.has_perms(["auth.test2", "auth.test3"]), False)
+ async def test_acustom_perms(self):
+ user = await self.UserModel._default_manager.aget(pk=self.user.pk)
+ content_type = await sync_to_async(ContentType.objects.get_for_model)(Group)
+ perm = await Permission.objects.acreate(
+ name="test", content_type=content_type, codename="test"
+ )
+ await user.user_permissions.aadd(perm)
+
+ # Reloading user to purge the _perm_cache.
+ user = await self.UserModel._default_manager.aget(pk=self.user.pk)
+ self.assertEqual(await user.aget_all_permissions(), {"auth.test"})
+ self.assertEqual(await user.aget_user_permissions(), {"auth.test"})
+ self.assertEqual(await user.aget_group_permissions(), set())
+ self.assertIs(await user.ahas_module_perms("Group"), False)
+ self.assertIs(await user.ahas_module_perms("auth"), True)
+
+ perm = await Permission.objects.acreate(
+ name="test2", content_type=content_type, codename="test2"
+ )
+ await user.user_permissions.aadd(perm)
+ perm = await Permission.objects.acreate(
+ name="test3", content_type=content_type, codename="test3"
+ )
+ await user.user_permissions.aadd(perm)
+ user = await self.UserModel._default_manager.aget(pk=self.user.pk)
+ expected_user_perms = {"auth.test2", "auth.test", "auth.test3"}
+ self.assertEqual(await user.aget_all_permissions(), expected_user_perms)
+ self.assertIs(await user.ahas_perm("test"), False)
+ self.assertIs(await user.ahas_perm("auth.test"), True)
+ self.assertIs(await user.ahas_perms(["auth.test2", "auth.test3"]), True)
+
+ perm = await Permission.objects.acreate(
+ name="test_group", content_type=content_type, codename="test_group"
+ )
+ group = await Group.objects.acreate(name="test_group")
+ await group.permissions.aadd(perm)
+ await user.groups.aadd(group)
+ user = await self.UserModel._default_manager.aget(pk=self.user.pk)
+ self.assertEqual(
+ await user.aget_all_permissions(), {*expected_user_perms, "auth.test_group"}
+ )
+ self.assertEqual(await user.aget_user_permissions(), expected_user_perms)
+ self.assertEqual(await user.aget_group_permissions(), {"auth.test_group"})
+ self.assertIs(await user.ahas_perms(["auth.test3", "auth.test_group"]), True)
+
+ user = AnonymousUser()
+ self.assertIs(await user.ahas_perm("test"), False)
+ self.assertIs(await user.ahas_perms(["auth.test2", "auth.test3"]), False)
+
def test_has_no_object_perm(self):
"""Regressiontest for #12462"""
user = self.UserModel._default_manager.get(pk=self.user.pk)
@@ -188,6 +281,20 @@ class BaseModelBackendTest:
self.assertIs(user.has_perm("auth.test"), True)
self.assertEqual(user.get_all_permissions(), {"auth.test"})
+ async def test_ahas_no_object_perm(self):
+ """See test_has_no_object_perm()"""
+ user = await self.UserModel._default_manager.aget(pk=self.user.pk)
+ content_type = await sync_to_async(ContentType.objects.get_for_model)(Group)
+ perm = await Permission.objects.acreate(
+ name="test", content_type=content_type, codename="test"
+ )
+ await user.user_permissions.aadd(perm)
+
+ self.assertIs(await user.ahas_perm("auth.test", "object"), False)
+ self.assertEqual(await user.aget_all_permissions("object"), set())
+ self.assertIs(await user.ahas_perm("auth.test"), True)
+ self.assertEqual(await user.aget_all_permissions(), {"auth.test"})
+
def test_anonymous_has_no_permissions(self):
"""
#17903 -- Anonymous users shouldn't have permissions in
@@ -220,6 +327,38 @@ class BaseModelBackendTest:
self.assertEqual(backend.get_user_permissions(user), set())
self.assertEqual(backend.get_group_permissions(user), set())
+ async def test_aanonymous_has_no_permissions(self):
+ """See test_anonymous_has_no_permissions()"""
+ backend = ModelBackend()
+
+ user = await self.UserModel._default_manager.aget(pk=self.user.pk)
+ content_type = await sync_to_async(ContentType.objects.get_for_model)(Group)
+ user_perm = await Permission.objects.acreate(
+ name="test", content_type=content_type, codename="test_user"
+ )
+ group_perm = await Permission.objects.acreate(
+ name="test2", content_type=content_type, codename="test_group"
+ )
+ await user.user_permissions.aadd(user_perm)
+
+ group = await Group.objects.acreate(name="test_group")
+ await user.groups.aadd(group)
+ await group.permissions.aadd(group_perm)
+
+ self.assertEqual(
+ await backend.aget_all_permissions(user),
+ {"auth.test_user", "auth.test_group"},
+ )
+ self.assertEqual(await backend.aget_user_permissions(user), {"auth.test_user"})
+ self.assertEqual(
+ await backend.aget_group_permissions(user), {"auth.test_group"}
+ )
+
+ with mock.patch.object(self.UserModel, "is_anonymous", True):
+ self.assertEqual(await backend.aget_all_permissions(user), set())
+ self.assertEqual(await backend.aget_user_permissions(user), set())
+ self.assertEqual(await backend.aget_group_permissions(user), set())
+
def test_inactive_has_no_permissions(self):
"""
#17903 -- Inactive users shouldn't have permissions in
@@ -254,11 +393,52 @@ class BaseModelBackendTest:
self.assertEqual(backend.get_user_permissions(user), set())
self.assertEqual(backend.get_group_permissions(user), set())
+ async def test_ainactive_has_no_permissions(self):
+ """See test_inactive_has_no_permissions()"""
+ backend = ModelBackend()
+
+ user = await self.UserModel._default_manager.aget(pk=self.user.pk)
+ content_type = await sync_to_async(ContentType.objects.get_for_model)(Group)
+ user_perm = await Permission.objects.acreate(
+ name="test", content_type=content_type, codename="test_user"
+ )
+ group_perm = await Permission.objects.acreate(
+ name="test2", content_type=content_type, codename="test_group"
+ )
+ await user.user_permissions.aadd(user_perm)
+
+ group = await Group.objects.acreate(name="test_group")
+ await user.groups.aadd(group)
+ await group.permissions.aadd(group_perm)
+
+ self.assertEqual(
+ await backend.aget_all_permissions(user),
+ {"auth.test_user", "auth.test_group"},
+ )
+ self.assertEqual(await backend.aget_user_permissions(user), {"auth.test_user"})
+ self.assertEqual(
+ await backend.aget_group_permissions(user), {"auth.test_group"}
+ )
+
+ user.is_active = False
+ await user.asave()
+
+ self.assertEqual(await backend.aget_all_permissions(user), set())
+ self.assertEqual(await backend.aget_user_permissions(user), set())
+ self.assertEqual(await backend.aget_group_permissions(user), set())
+
def test_get_all_superuser_permissions(self):
"""A superuser has all permissions. Refs #14795."""
user = self.UserModel._default_manager.get(pk=self.superuser.pk)
self.assertEqual(len(user.get_all_permissions()), len(Permission.objects.all()))
+ async def test_aget_all_superuser_permissions(self):
+ """See test_get_all_superuser_permissions()"""
+ user = await self.UserModel._default_manager.aget(pk=self.superuser.pk)
+ self.assertEqual(
+ len(await user.aget_all_permissions()), await Permission.objects.acount()
+ )
+
@override_settings(
PASSWORD_HASHERS=["auth_tests.test_auth_backends.CountingMD5PasswordHasher"]
)
@@ -280,6 +460,24 @@ class BaseModelBackendTest:
@override_settings(
PASSWORD_HASHERS=["auth_tests.test_auth_backends.CountingMD5PasswordHasher"]
)
+ async def test_aauthentication_timing(self):
+ """See test_authentication_timing()"""
+ # Re-set the password, because this tests overrides PASSWORD_HASHERS.
+ self.user.set_password("test")
+ await self.user.asave()
+
+ CountingMD5PasswordHasher.calls = 0
+ username = getattr(self.user, self.UserModel.USERNAME_FIELD)
+ await aauthenticate(username=username, password="test")
+ self.assertEqual(CountingMD5PasswordHasher.calls, 1)
+
+ CountingMD5PasswordHasher.calls = 0
+ await aauthenticate(username="no_such_user", password="test")
+ self.assertEqual(CountingMD5PasswordHasher.calls, 1)
+
+ @override_settings(
+ PASSWORD_HASHERS=["auth_tests.test_auth_backends.CountingMD5PasswordHasher"]
+ )
def test_authentication_without_credentials(self):
CountingMD5PasswordHasher.calls = 0
for credentials in (
@@ -320,6 +518,15 @@ class ModelBackendTest(BaseModelBackendTest, TestCase):
self.user.save()
self.assertIsNone(authenticate(**self.user_credentials))
+ async def test_aauthenticate_inactive(self):
+ """
+ An inactive user can't authenticate.
+ """
+ self.assertEqual(await aauthenticate(**self.user_credentials), self.user)
+ self.user.is_active = False
+ await self.user.asave()
+ self.assertIsNone(await aauthenticate(**self.user_credentials))
+
@override_settings(AUTH_USER_MODEL="auth_tests.CustomUserWithoutIsActiveField")
def test_authenticate_user_without_is_active_field(self):
"""
@@ -332,6 +539,18 @@ class ModelBackendTest(BaseModelBackendTest, TestCase):
)
self.assertEqual(authenticate(username="test", password="test"), user)
+ @override_settings(AUTH_USER_MODEL="auth_tests.CustomUserWithoutIsActiveField")
+ async def test_aauthenticate_user_without_is_active_field(self):
+ """
+ A custom user without an `is_active` field is allowed to authenticate.
+ """
+ user = await CustomUserWithoutIsActiveField.objects._acreate_user(
+ username="test",
+ email="test@example.com",
+ password="test",
+ )
+ self.assertEqual(await aauthenticate(username="test", password="test"), user)
+
@override_settings(AUTH_USER_MODEL="auth_tests.ExtensionUser")
class ExtensionUserModelBackendTest(BaseModelBackendTest, TestCase):
@@ -403,6 +622,15 @@ class CustomUserModelBackendAuthenticateTest(TestCase):
authenticated_user = authenticate(email="test@example.com", password="test")
self.assertEqual(test_user, authenticated_user)
+ async def test_aauthenticate(self):
+ test_user = await CustomUser._default_manager.acreate_user(
+ email="test@example.com", password="test", date_of_birth=date(2006, 4, 25)
+ )
+ authenticated_user = await aauthenticate(
+ email="test@example.com", password="test"
+ )
+ self.assertEqual(test_user, authenticated_user)
+
@override_settings(AUTH_USER_MODEL="auth_tests.UUIDUser")
class UUIDUserTests(TestCase):
@@ -416,6 +644,13 @@ class UUIDUserTests(TestCase):
UUIDUser.objects.get(pk=self.client.session[SESSION_KEY]), user
)
+ async def test_alogin(self):
+ """See test_login()"""
+ user = await UUIDUser.objects.acreate_user(username="uuid", password="test")
+ self.assertTrue(await self.client.alogin(username="uuid", password="test"))
+ session_key = await self.client.session.aget(SESSION_KEY)
+ self.assertEqual(await UUIDUser.objects.aget(pk=session_key), user)
+
class TestObj:
pass
@@ -435,9 +670,15 @@ class SimpleRowlevelBackend:
return True
return False
+ async def ahas_perm(self, user, perm, obj=None):
+ return self.has_perm(user, perm, obj)
+
def has_module_perms(self, user, app_label):
return (user.is_anonymous or user.is_active) and app_label == "app1"
+ async def ahas_module_perms(self, user, app_label):
+ return self.has_module_perms(user, app_label)
+
def get_all_permissions(self, user, obj=None):
if not obj:
return [] # We only support row level perms
@@ -452,6 +693,9 @@ class SimpleRowlevelBackend:
else:
return ["simple"]
+ async def aget_all_permissions(self, user, obj=None):
+ return self.get_all_permissions(user, obj)
+
def get_group_permissions(self, user, obj=None):
if not obj:
return # We only support row level perms
@@ -524,10 +768,18 @@ class AnonymousUserBackendTest(SimpleTestCase):
self.assertIs(self.user1.has_perm("perm", TestObj()), False)
self.assertIs(self.user1.has_perm("anon", TestObj()), True)
+ async def test_ahas_perm(self):
+ self.assertIs(await self.user1.ahas_perm("perm", TestObj()), False)
+ self.assertIs(await self.user1.ahas_perm("anon", TestObj()), True)
+
def test_has_perms(self):
self.assertIs(self.user1.has_perms(["anon"], TestObj()), True)
self.assertIs(self.user1.has_perms(["anon", "perm"], TestObj()), False)
+ async def test_ahas_perms(self):
+ self.assertIs(await self.user1.ahas_perms(["anon"], TestObj()), True)
+ self.assertIs(await self.user1.ahas_perms(["anon", "perm"], TestObj()), False)
+
def test_has_perms_perm_list_invalid(self):
msg = "perm_list must be an iterable of permissions."
with self.assertRaisesMessage(ValueError, msg):
@@ -535,13 +787,27 @@ class AnonymousUserBackendTest(SimpleTestCase):
with self.assertRaisesMessage(ValueError, msg):
self.user1.has_perms(object())
+ async def test_ahas_perms_perm_list_invalid(self):
+ msg = "perm_list must be an iterable of permissions."
+ with self.assertRaisesMessage(ValueError, msg):
+ await self.user1.ahas_perms("perm")
+ with self.assertRaisesMessage(ValueError, msg):
+ await self.user1.ahas_perms(object())
+
def test_has_module_perms(self):
self.assertIs(self.user1.has_module_perms("app1"), True)
self.assertIs(self.user1.has_module_perms("app2"), False)
+ async def test_ahas_module_perms(self):
+ self.assertIs(await self.user1.ahas_module_perms("app1"), True)
+ self.assertIs(await self.user1.ahas_module_perms("app2"), False)
+
def test_get_all_permissions(self):
self.assertEqual(self.user1.get_all_permissions(TestObj()), {"anon"})
+ async def test_aget_all_permissions(self):
+ self.assertEqual(await self.user1.aget_all_permissions(TestObj()), {"anon"})
+
@override_settings(AUTHENTICATION_BACKENDS=[])
class NoBackendsTest(TestCase):
@@ -561,6 +827,14 @@ class NoBackendsTest(TestCase):
with self.assertRaisesMessage(ImproperlyConfigured, msg):
self.user.has_perm(("perm", TestObj()))
+ async def test_araises_exception(self):
+ msg = (
+ "No authentication backends have been defined. "
+ "Does AUTHENTICATION_BACKENDS contain anything?"
+ )
+ with self.assertRaisesMessage(ImproperlyConfigured, msg):
+ await self.user.ahas_perm(("perm", TestObj()))
+
@override_settings(
AUTHENTICATION_BACKENDS=["auth_tests.test_auth_backends.SimpleRowlevelBackend"]
@@ -593,12 +867,21 @@ class PermissionDeniedBackend:
def authenticate(self, request, username=None, password=None):
raise PermissionDenied
+ async def aauthenticate(self, request, username=None, password=None):
+ raise PermissionDenied
+
def has_perm(self, user_obj, perm, obj=None):
raise PermissionDenied
+ async def ahas_perm(self, user_obj, perm, obj=None):
+ raise PermissionDenied
+
def has_module_perms(self, user_obj, app_label):
raise PermissionDenied
+ async def ahas_module_perms(self, user_obj, app_label):
+ raise PermissionDenied
+
class PermissionDeniedBackendTest(TestCase):
"""
@@ -631,10 +914,25 @@ class PermissionDeniedBackendTest(TestCase):
[{"password": "********************", "username": "test"}],
)
+ @modify_settings(AUTHENTICATION_BACKENDS={"prepend": backend})
+ async def test_aauthenticate_permission_denied(self):
+ self.assertIsNone(await aauthenticate(username="test", password="test"))
+ # user_login_failed signal is sent.
+ self.assertEqual(
+ self.user_login_failed,
+ [{"password": "********************", "username": "test"}],
+ )
+
@modify_settings(AUTHENTICATION_BACKENDS={"append": backend})
def test_authenticates(self):
self.assertEqual(authenticate(username="test", password="test"), self.user1)
+ @modify_settings(AUTHENTICATION_BACKENDS={"append": backend})
+ async def test_aauthenticate(self):
+ self.assertEqual(
+ await aauthenticate(username="test", password="test"), self.user1
+ )
+
@modify_settings(AUTHENTICATION_BACKENDS={"prepend": backend})
def test_has_perm_denied(self):
content_type = ContentType.objects.get_for_model(Group)
@@ -646,6 +944,17 @@ class PermissionDeniedBackendTest(TestCase):
self.assertIs(self.user1.has_perm("auth.test"), False)
self.assertIs(self.user1.has_module_perms("auth"), False)
+ @modify_settings(AUTHENTICATION_BACKENDS={"prepend": backend})
+ async def test_ahas_perm_denied(self):
+ content_type = await sync_to_async(ContentType.objects.get_for_model)(Group)
+ perm = await Permission.objects.acreate(
+ name="test", content_type=content_type, codename="test"
+ )
+ await self.user1.user_permissions.aadd(perm)
+
+ self.assertIs(await self.user1.ahas_perm("auth.test"), False)
+ self.assertIs(await self.user1.ahas_module_perms("auth"), False)
+
@modify_settings(AUTHENTICATION_BACKENDS={"append": backend})
def test_has_perm(self):
content_type = ContentType.objects.get_for_model(Group)
@@ -657,6 +966,17 @@ class PermissionDeniedBackendTest(TestCase):
self.assertIs(self.user1.has_perm("auth.test"), True)
self.assertIs(self.user1.has_module_perms("auth"), True)
+ @modify_settings(AUTHENTICATION_BACKENDS={"append": backend})
+ async def test_ahas_perm(self):
+ content_type = await sync_to_async(ContentType.objects.get_for_model)(Group)
+ perm = await Permission.objects.acreate(
+ name="test", content_type=content_type, codename="test"
+ )
+ await self.user1.user_permissions.aadd(perm)
+
+ self.assertIs(await self.user1.ahas_perm("auth.test"), True)
+ self.assertIs(await self.user1.ahas_module_perms("auth"), True)
+
class NewModelBackend(ModelBackend):
pass
@@ -715,6 +1035,10 @@ class TypeErrorBackend:
def authenticate(self, request, username=None, password=None):
raise TypeError
+ @sensitive_variables("password")
+ async def aauthenticate(self, request, username=None, password=None):
+ raise TypeError
+
class SkippedBackend:
def authenticate(self):