diff options
Diffstat (limited to 'tests/auth_tests/test_remote_user.py')
| -rw-r--r-- | tests/auth_tests/test_remote_user.py | 180 |
1 files changed, 178 insertions, 2 deletions
diff --git a/tests/auth_tests/test_remote_user.py b/tests/auth_tests/test_remote_user.py index d3cf4b9da5..85de931c1a 100644 --- a/tests/auth_tests/test_remote_user.py +++ b/tests/auth_tests/test_remote_user.py @@ -1,12 +1,18 @@ from datetime import datetime, timezone from django.conf import settings -from django.contrib.auth import authenticate +from django.contrib.auth import aauthenticate, authenticate from django.contrib.auth.backends import RemoteUserBackend from django.contrib.auth.middleware import RemoteUserMiddleware from django.contrib.auth.models import User from django.middleware.csrf import _get_new_csrf_string, _mask_cipher_secret -from django.test import Client, TestCase, modify_settings, override_settings +from django.test import ( + AsyncClient, + Client, + TestCase, + modify_settings, + override_settings, +) @override_settings(ROOT_URLCONF="auth_tests.urls") @@ -30,6 +36,11 @@ class RemoteUserTest(TestCase): ) super().setUpClass() + def test_passing_explicit_none(self): + msg = "get_response must be provided." + with self.assertRaisesMessage(ValueError, msg): + RemoteUserMiddleware(None) + def test_no_remote_user(self): """Users are not created when remote user is not specified.""" num_users = User.objects.count() @@ -46,6 +57,18 @@ class RemoteUserTest(TestCase): self.assertTrue(response.context["user"].is_anonymous) self.assertEqual(User.objects.count(), num_users) + async def test_no_remote_user_async(self): + """See test_no_remote_user.""" + num_users = await User.objects.acount() + + response = await self.async_client.get("/remote_user/") + self.assertTrue(response.context["user"].is_anonymous) + self.assertEqual(await User.objects.acount(), num_users) + + response = await self.async_client.get("/remote_user/", **{self.header: ""}) + self.assertTrue(response.context["user"].is_anonymous) + self.assertEqual(await User.objects.acount(), num_users) + def test_csrf_validation_passes_after_process_request_login(self): """ CSRF check must access the CSRF token from the session or cookie, @@ -75,6 +98,31 @@ class RemoteUserTest(TestCase): response = csrf_client.post("/remote_user/", data, **headers) self.assertEqual(response.status_code, 200) + async def test_csrf_validation_passes_after_process_request_login_async(self): + """See test_csrf_validation_passes_after_process_request_login.""" + csrf_client = AsyncClient(enforce_csrf_checks=True) + csrf_secret = _get_new_csrf_string() + csrf_token = _mask_cipher_secret(csrf_secret) + csrf_token_form = _mask_cipher_secret(csrf_secret) + headers = {self.header: "fakeuser"} + data = {"csrfmiddlewaretoken": csrf_token_form} + + # Verify that CSRF is configured for the view + csrf_client.cookies.load({settings.CSRF_COOKIE_NAME: csrf_token}) + response = await csrf_client.post("/remote_user/", **headers) + self.assertEqual(response.status_code, 403) + self.assertIn(b"CSRF verification failed.", response.content) + + # This request will call django.contrib.auth.alogin() which will call + # django.middleware.csrf.rotate_token() thus changing the value of + # request.META['CSRF_COOKIE'] from the user submitted value set by + # CsrfViewMiddleware.process_request() to the new csrftoken value set + # by rotate_token(). Csrf validation should still pass when the view is + # later processed by CsrfViewMiddleware.process_view() + csrf_client.cookies.load({settings.CSRF_COOKIE_NAME: csrf_token}) + response = await csrf_client.post("/remote_user/", data, **headers) + self.assertEqual(response.status_code, 200) + def test_unknown_user(self): """ Tests the case where the username passed in the header does not exist @@ -90,6 +138,22 @@ class RemoteUserTest(TestCase): response = self.client.get("/remote_user/", **{self.header: "newuser"}) self.assertEqual(User.objects.count(), num_users + 1) + async def test_unknown_user_async(self): + """See test_unknown_user.""" + num_users = await User.objects.acount() + response = await self.async_client.get( + "/remote_user/", **{self.header: "newuser"} + ) + self.assertEqual(response.context["user"].username, "newuser") + self.assertEqual(await User.objects.acount(), num_users + 1) + await User.objects.aget(username="newuser") + + # Another request with same user should not create any new users. + response = await self.async_client.get( + "/remote_user/", **{self.header: "newuser"} + ) + self.assertEqual(await User.objects.acount(), num_users + 1) + def test_known_user(self): """ Tests the case where the username passed in the header is a valid User. @@ -106,6 +170,24 @@ class RemoteUserTest(TestCase): self.assertEqual(response.context["user"].username, "knownuser2") self.assertEqual(User.objects.count(), num_users) + async def test_known_user_async(self): + """See test_known_user.""" + await User.objects.acreate(username="knownuser") + await User.objects.acreate(username="knownuser2") + num_users = await User.objects.acount() + response = await self.async_client.get( + "/remote_user/", **{self.header: self.known_user} + ) + self.assertEqual(response.context["user"].username, "knownuser") + self.assertEqual(await User.objects.acount(), num_users) + # A different user passed in the headers causes the new user + # to be logged in. + response = await self.async_client.get( + "/remote_user/", **{self.header: self.known_user2} + ) + self.assertEqual(response.context["user"].username, "knownuser2") + self.assertEqual(await User.objects.acount(), num_users) + def test_last_login(self): """ A user's last_login is set the first time they make a @@ -128,6 +210,29 @@ class RemoteUserTest(TestCase): response = self.client.get("/remote_user/", **{self.header: self.known_user}) self.assertEqual(default_login, response.context["user"].last_login) + async def test_last_login_async(self): + """See test_last_login.""" + user = await User.objects.acreate(username="knownuser") + # Set last_login to something so we can determine if it changes. + default_login = datetime(2000, 1, 1) + if settings.USE_TZ: + default_login = default_login.replace(tzinfo=timezone.utc) + user.last_login = default_login + await user.asave() + + response = await self.async_client.get( + "/remote_user/", **{self.header: self.known_user} + ) + self.assertNotEqual(default_login, response.context["user"].last_login) + + user = await User.objects.aget(username="knownuser") + user.last_login = default_login + await user.asave() + response = await self.async_client.get( + "/remote_user/", **{self.header: self.known_user} + ) + self.assertEqual(default_login, response.context["user"].last_login) + def test_header_disappears(self): """ A logged in user is logged out automatically when @@ -148,6 +253,25 @@ class RemoteUserTest(TestCase): response = self.client.get("/remote_user/") self.assertEqual(response.context["user"].username, "modeluser") + async def test_header_disappears_async(self): + """See test_header_disappears.""" + await User.objects.acreate(username="knownuser") + # Known user authenticates + response = await self.async_client.get( + "/remote_user/", **{self.header: self.known_user} + ) + self.assertEqual(response.context["user"].username, "knownuser") + # During the session, the REMOTE_USER header disappears. Should trigger logout. + response = await self.async_client.get("/remote_user/") + self.assertTrue(response.context["user"].is_anonymous) + # verify the remoteuser middleware will not remove a user + # authenticated via another backend + await User.objects.acreate_user(username="modeluser", password="foo") + await self.async_client.alogin(username="modeluser", password="foo") + await aauthenticate(username="modeluser", password="foo") + response = await self.async_client.get("/remote_user/") + self.assertEqual(response.context["user"].username, "modeluser") + def test_user_switch_forces_new_login(self): """ If the username in the header changes between requests @@ -164,11 +288,35 @@ class RemoteUserTest(TestCase): # In backends that do not create new users, it is '' (anonymous user) self.assertNotEqual(response.context["user"].username, "knownuser") + async def test_user_switch_forces_new_login_async(self): + """See test_user_switch_forces_new_login.""" + await User.objects.acreate(username="knownuser") + # Known user authenticates + response = await self.async_client.get( + "/remote_user/", **{self.header: self.known_user} + ) + self.assertEqual(response.context["user"].username, "knownuser") + # During the session, the REMOTE_USER changes to a different user. + response = await self.async_client.get( + "/remote_user/", **{self.header: "newnewuser"} + ) + # The current user is not the prior remote_user. + # In backends that create a new user, username is "newnewuser" + # In backends that do not create new users, it is '' (anonymous user) + self.assertNotEqual(response.context["user"].username, "knownuser") + def test_inactive_user(self): User.objects.create(username="knownuser", is_active=False) response = self.client.get("/remote_user/", **{self.header: "knownuser"}) self.assertTrue(response.context["user"].is_anonymous) + async def test_inactive_user_async(self): + await User.objects.acreate(username="knownuser", is_active=False) + response = await self.async_client.get( + "/remote_user/", **{self.header: "knownuser"} + ) + self.assertTrue(response.context["user"].is_anonymous) + class RemoteUserNoCreateBackend(RemoteUserBackend): """Backend that doesn't create unknown users.""" @@ -190,6 +338,14 @@ class RemoteUserNoCreateTest(RemoteUserTest): self.assertTrue(response.context["user"].is_anonymous) self.assertEqual(User.objects.count(), num_users) + async def test_unknown_user_async(self): + num_users = await User.objects.acount() + response = await self.async_client.get( + "/remote_user/", **{self.header: "newuser"} + ) + self.assertTrue(response.context["user"].is_anonymous) + self.assertEqual(await User.objects.acount(), num_users) + class AllowAllUsersRemoteUserBackendTest(RemoteUserTest): """Backend that allows inactive users.""" @@ -201,6 +357,13 @@ class AllowAllUsersRemoteUserBackendTest(RemoteUserTest): response = self.client.get("/remote_user/", **{self.header: self.known_user}) self.assertEqual(response.context["user"].username, user.username) + async def test_inactive_user_async(self): + user = await User.objects.acreate(username="knownuser", is_active=False) + response = await self.async_client.get( + "/remote_user/", **{self.header: self.known_user} + ) + self.assertEqual(response.context["user"].username, user.username) + class CustomRemoteUserBackend(RemoteUserBackend): """ @@ -311,3 +474,16 @@ class PersistentRemoteUserTest(RemoteUserTest): response = self.client.get("/remote_user/") self.assertFalse(response.context["user"].is_anonymous) self.assertEqual(response.context["user"].username, "knownuser") + + async def test_header_disappears_async(self): + """See test_header_disappears.""" + await User.objects.acreate(username="knownuser") + # Known user authenticates + response = await self.async_client.get( + "/remote_user/", **{self.header: self.known_user} + ) + self.assertEqual(response.context["user"].username, "knownuser") + # Should stay logged in if the REMOTE_USER header disappears. + response = await self.async_client.get("/remote_user/") + self.assertFalse(response.context["user"].is_anonymous) + self.assertEqual(response.context["user"].username, "knownuser") |
