123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489 |
- from datetime import datetime, timezone
- from django.conf import settings
- from django.contrib.auth import aauthenticate, authenticate
- from django.contrib.auth.backends import RemoteUserBackend
- from django.contrib.auth.middleware import RemoteUserMiddleware
- from django.contrib.auth.models import User
- from django.middleware.csrf import _get_new_csrf_string, _mask_cipher_secret
- from django.test import (
- AsyncClient,
- Client,
- TestCase,
- modify_settings,
- override_settings,
- )
- @override_settings(ROOT_URLCONF="auth_tests.urls")
- class RemoteUserTest(TestCase):
- middleware = "django.contrib.auth.middleware.RemoteUserMiddleware"
- backend = "django.contrib.auth.backends.RemoteUserBackend"
- header = "REMOTE_USER"
- email_header = "REMOTE_EMAIL"
- # Usernames to be passed in REMOTE_USER for the test_known_user test case.
- known_user = "knownuser"
- known_user2 = "knownuser2"
- @classmethod
- def setUpClass(cls):
- cls.enterClassContext(
- modify_settings(
- AUTHENTICATION_BACKENDS={"append": cls.backend},
- MIDDLEWARE={"append": cls.middleware},
- )
- )
- super().setUpClass()
- def test_passing_explicit_none(self):
- msg = "get_response must be provided."
- with self.assertRaisesMessage(ValueError, msg):
- RemoteUserMiddleware(None)
- def test_no_remote_user(self):
- """Users are not created when remote user is not specified."""
- num_users = User.objects.count()
- response = self.client.get("/remote_user/")
- self.assertTrue(response.context["user"].is_anonymous)
- self.assertEqual(User.objects.count(), num_users)
- response = self.client.get("/remote_user/", **{self.header: None})
- self.assertTrue(response.context["user"].is_anonymous)
- self.assertEqual(User.objects.count(), num_users)
- response = self.client.get("/remote_user/", **{self.header: ""})
- self.assertTrue(response.context["user"].is_anonymous)
- self.assertEqual(User.objects.count(), num_users)
- async def test_no_remote_user_async(self):
- """See test_no_remote_user."""
- num_users = await User.objects.acount()
- response = await self.async_client.get("/remote_user/")
- self.assertTrue(response.context["user"].is_anonymous)
- self.assertEqual(await User.objects.acount(), num_users)
- response = await self.async_client.get("/remote_user/", **{self.header: ""})
- self.assertTrue(response.context["user"].is_anonymous)
- self.assertEqual(await User.objects.acount(), num_users)
- def test_csrf_validation_passes_after_process_request_login(self):
- """
- CSRF check must access the CSRF token from the session or cookie,
- rather than the request, as rotate_token() may have been called by an
- authentication middleware during the process_request() phase.
- """
- csrf_client = Client(enforce_csrf_checks=True)
- csrf_secret = _get_new_csrf_string()
- csrf_token = _mask_cipher_secret(csrf_secret)
- csrf_token_form = _mask_cipher_secret(csrf_secret)
- headers = {self.header: "fakeuser"}
- data = {"csrfmiddlewaretoken": csrf_token_form}
- # Verify that CSRF is configured for the view
- csrf_client.cookies.load({settings.CSRF_COOKIE_NAME: csrf_token})
- response = csrf_client.post("/remote_user/", **headers)
- self.assertEqual(response.status_code, 403)
- self.assertIn(b"CSRF verification failed.", response.content)
- # This request will call django.contrib.auth.login() which will call
- # django.middleware.csrf.rotate_token() thus changing the value of
- # request.META['CSRF_COOKIE'] from the user submitted value set by
- # CsrfViewMiddleware.process_request() to the new csrftoken value set
- # by rotate_token(). Csrf validation should still pass when the view is
- # later processed by CsrfViewMiddleware.process_view()
- csrf_client.cookies.load({settings.CSRF_COOKIE_NAME: csrf_token})
- response = csrf_client.post("/remote_user/", data, **headers)
- self.assertEqual(response.status_code, 200)
- async def test_csrf_validation_passes_after_process_request_login_async(self):
- """See test_csrf_validation_passes_after_process_request_login."""
- csrf_client = AsyncClient(enforce_csrf_checks=True)
- csrf_secret = _get_new_csrf_string()
- csrf_token = _mask_cipher_secret(csrf_secret)
- csrf_token_form = _mask_cipher_secret(csrf_secret)
- headers = {self.header: "fakeuser"}
- data = {"csrfmiddlewaretoken": csrf_token_form}
- # Verify that CSRF is configured for the view
- csrf_client.cookies.load({settings.CSRF_COOKIE_NAME: csrf_token})
- response = await csrf_client.post("/remote_user/", **headers)
- self.assertEqual(response.status_code, 403)
- self.assertIn(b"CSRF verification failed.", response.content)
- # This request will call django.contrib.auth.alogin() which will call
- # django.middleware.csrf.rotate_token() thus changing the value of
- # request.META['CSRF_COOKIE'] from the user submitted value set by
- # CsrfViewMiddleware.process_request() to the new csrftoken value set
- # by rotate_token(). Csrf validation should still pass when the view is
- # later processed by CsrfViewMiddleware.process_view()
- csrf_client.cookies.load({settings.CSRF_COOKIE_NAME: csrf_token})
- response = await csrf_client.post("/remote_user/", data, **headers)
- self.assertEqual(response.status_code, 200)
- def test_unknown_user(self):
- """
- Tests the case where the username passed in the header does not exist
- as a User.
- """
- num_users = User.objects.count()
- response = self.client.get("/remote_user/", **{self.header: "newuser"})
- self.assertEqual(response.context["user"].username, "newuser")
- self.assertEqual(User.objects.count(), num_users + 1)
- User.objects.get(username="newuser")
- # Another request with same user should not create any new users.
- response = self.client.get("/remote_user/", **{self.header: "newuser"})
- self.assertEqual(User.objects.count(), num_users + 1)
- async def test_unknown_user_async(self):
- """See test_unknown_user."""
- num_users = await User.objects.acount()
- response = await self.async_client.get(
- "/remote_user/", **{self.header: "newuser"}
- )
- self.assertEqual(response.context["user"].username, "newuser")
- self.assertEqual(await User.objects.acount(), num_users + 1)
- await User.objects.aget(username="newuser")
- # Another request with same user should not create any new users.
- response = await self.async_client.get(
- "/remote_user/", **{self.header: "newuser"}
- )
- self.assertEqual(await User.objects.acount(), num_users + 1)
- def test_known_user(self):
- """
- Tests the case where the username passed in the header is a valid User.
- """
- User.objects.create(username="knownuser")
- User.objects.create(username="knownuser2")
- num_users = User.objects.count()
- response = self.client.get("/remote_user/", **{self.header: self.known_user})
- self.assertEqual(response.context["user"].username, "knownuser")
- self.assertEqual(User.objects.count(), num_users)
- # A different user passed in the headers causes the new user
- # to be logged in.
- response = self.client.get("/remote_user/", **{self.header: self.known_user2})
- self.assertEqual(response.context["user"].username, "knownuser2")
- self.assertEqual(User.objects.count(), num_users)
- async def test_known_user_async(self):
- """See test_known_user."""
- await User.objects.acreate(username="knownuser")
- await User.objects.acreate(username="knownuser2")
- num_users = await User.objects.acount()
- response = await self.async_client.get(
- "/remote_user/", **{self.header: self.known_user}
- )
- self.assertEqual(response.context["user"].username, "knownuser")
- self.assertEqual(await User.objects.acount(), num_users)
- # A different user passed in the headers causes the new user
- # to be logged in.
- response = await self.async_client.get(
- "/remote_user/", **{self.header: self.known_user2}
- )
- self.assertEqual(response.context["user"].username, "knownuser2")
- self.assertEqual(await User.objects.acount(), num_users)
- def test_last_login(self):
- """
- A user's last_login is set the first time they make a
- request but not updated in subsequent requests with the same session.
- """
- user = User.objects.create(username="knownuser")
- # Set last_login to something so we can determine if it changes.
- default_login = datetime(2000, 1, 1)
- if settings.USE_TZ:
- default_login = default_login.replace(tzinfo=timezone.utc)
- user.last_login = default_login
- user.save()
- response = self.client.get("/remote_user/", **{self.header: self.known_user})
- self.assertNotEqual(default_login, response.context["user"].last_login)
- user = User.objects.get(username="knownuser")
- user.last_login = default_login
- user.save()
- response = self.client.get("/remote_user/", **{self.header: self.known_user})
- self.assertEqual(default_login, response.context["user"].last_login)
- async def test_last_login_async(self):
- """See test_last_login."""
- user = await User.objects.acreate(username="knownuser")
- # Set last_login to something so we can determine if it changes.
- default_login = datetime(2000, 1, 1)
- if settings.USE_TZ:
- default_login = default_login.replace(tzinfo=timezone.utc)
- user.last_login = default_login
- await user.asave()
- response = await self.async_client.get(
- "/remote_user/", **{self.header: self.known_user}
- )
- self.assertNotEqual(default_login, response.context["user"].last_login)
- user = await User.objects.aget(username="knownuser")
- user.last_login = default_login
- await user.asave()
- response = await self.async_client.get(
- "/remote_user/", **{self.header: self.known_user}
- )
- self.assertEqual(default_login, response.context["user"].last_login)
- def test_header_disappears(self):
- """
- A logged in user is logged out automatically when
- the REMOTE_USER header disappears during the same browser session.
- """
- User.objects.create(username="knownuser")
- # Known user authenticates
- response = self.client.get("/remote_user/", **{self.header: self.known_user})
- self.assertEqual(response.context["user"].username, "knownuser")
- # During the session, the REMOTE_USER header disappears. Should trigger logout.
- response = self.client.get("/remote_user/")
- self.assertTrue(response.context["user"].is_anonymous)
- # verify the remoteuser middleware will not remove a user
- # authenticated via another backend
- User.objects.create_user(username="modeluser", password="foo")
- self.client.login(username="modeluser", password="foo")
- authenticate(username="modeluser", password="foo")
- response = self.client.get("/remote_user/")
- self.assertEqual(response.context["user"].username, "modeluser")
- async def test_header_disappears_async(self):
- """See test_header_disappears."""
- await User.objects.acreate(username="knownuser")
- # Known user authenticates
- response = await self.async_client.get(
- "/remote_user/", **{self.header: self.known_user}
- )
- self.assertEqual(response.context["user"].username, "knownuser")
- # During the session, the REMOTE_USER header disappears. Should trigger logout.
- response = await self.async_client.get("/remote_user/")
- self.assertTrue(response.context["user"].is_anonymous)
- # verify the remoteuser middleware will not remove a user
- # authenticated via another backend
- await User.objects.acreate_user(username="modeluser", password="foo")
- await self.async_client.alogin(username="modeluser", password="foo")
- await aauthenticate(username="modeluser", password="foo")
- response = await self.async_client.get("/remote_user/")
- self.assertEqual(response.context["user"].username, "modeluser")
- def test_user_switch_forces_new_login(self):
- """
- If the username in the header changes between requests
- that the original user is logged out
- """
- User.objects.create(username="knownuser")
- # Known user authenticates
- response = self.client.get("/remote_user/", **{self.header: self.known_user})
- self.assertEqual(response.context["user"].username, "knownuser")
- # During the session, the REMOTE_USER changes to a different user.
- response = self.client.get("/remote_user/", **{self.header: "newnewuser"})
- # The current user is not the prior remote_user.
- # In backends that create a new user, username is "newnewuser"
- # In backends that do not create new users, it is '' (anonymous user)
- self.assertNotEqual(response.context["user"].username, "knownuser")
- async def test_user_switch_forces_new_login_async(self):
- """See test_user_switch_forces_new_login."""
- await User.objects.acreate(username="knownuser")
- # Known user authenticates
- response = await self.async_client.get(
- "/remote_user/", **{self.header: self.known_user}
- )
- self.assertEqual(response.context["user"].username, "knownuser")
- # During the session, the REMOTE_USER changes to a different user.
- response = await self.async_client.get(
- "/remote_user/", **{self.header: "newnewuser"}
- )
- # The current user is not the prior remote_user.
- # In backends that create a new user, username is "newnewuser"
- # In backends that do not create new users, it is '' (anonymous user)
- self.assertNotEqual(response.context["user"].username, "knownuser")
- def test_inactive_user(self):
- User.objects.create(username="knownuser", is_active=False)
- response = self.client.get("/remote_user/", **{self.header: "knownuser"})
- self.assertTrue(response.context["user"].is_anonymous)
- async def test_inactive_user_async(self):
- await User.objects.acreate(username="knownuser", is_active=False)
- response = await self.async_client.get(
- "/remote_user/", **{self.header: "knownuser"}
- )
- self.assertTrue(response.context["user"].is_anonymous)
- class RemoteUserNoCreateBackend(RemoteUserBackend):
- """Backend that doesn't create unknown users."""
- create_unknown_user = False
- class RemoteUserNoCreateTest(RemoteUserTest):
- """
- Contains the same tests as RemoteUserTest, but using a custom auth backend
- class that doesn't create unknown users.
- """
- backend = "auth_tests.test_remote_user.RemoteUserNoCreateBackend"
- def test_unknown_user(self):
- num_users = User.objects.count()
- response = self.client.get("/remote_user/", **{self.header: "newuser"})
- self.assertTrue(response.context["user"].is_anonymous)
- self.assertEqual(User.objects.count(), num_users)
- async def test_unknown_user_async(self):
- num_users = await User.objects.acount()
- response = await self.async_client.get(
- "/remote_user/", **{self.header: "newuser"}
- )
- self.assertTrue(response.context["user"].is_anonymous)
- self.assertEqual(await User.objects.acount(), num_users)
- class AllowAllUsersRemoteUserBackendTest(RemoteUserTest):
- """Backend that allows inactive users."""
- backend = "django.contrib.auth.backends.AllowAllUsersRemoteUserBackend"
- def test_inactive_user(self):
- user = User.objects.create(username="knownuser", is_active=False)
- response = self.client.get("/remote_user/", **{self.header: self.known_user})
- self.assertEqual(response.context["user"].username, user.username)
- async def test_inactive_user_async(self):
- user = await User.objects.acreate(username="knownuser", is_active=False)
- response = await self.async_client.get(
- "/remote_user/", **{self.header: self.known_user}
- )
- self.assertEqual(response.context["user"].username, user.username)
- class CustomRemoteUserBackend(RemoteUserBackend):
- """
- Backend that overrides RemoteUserBackend methods.
- """
- def clean_username(self, username):
- """
- Grabs username before the @ character.
- """
- return username.split("@")[0]
- def configure_user(self, request, user, created=True):
- """
- Sets user's email address using the email specified in an HTTP header.
- Sets user's last name for existing users.
- """
- user.email = request.META.get(RemoteUserTest.email_header, "")
- if not created:
- user.last_name = user.username
- user.save()
- return user
- class RemoteUserCustomTest(RemoteUserTest):
- """
- Tests a custom RemoteUserBackend subclass that overrides the clean_username
- and configure_user methods.
- """
- backend = "auth_tests.test_remote_user.CustomRemoteUserBackend"
- # REMOTE_USER strings with email addresses for the custom backend to
- # clean.
- known_user = "knownuser@example.com"
- known_user2 = "knownuser2@example.com"
- def test_known_user(self):
- """
- The strings passed in REMOTE_USER should be cleaned and the known users
- should not have been configured with an email address.
- """
- super().test_known_user()
- knownuser = User.objects.get(username="knownuser")
- knownuser2 = User.objects.get(username="knownuser2")
- self.assertEqual(knownuser.email, "")
- self.assertEqual(knownuser2.email, "")
- self.assertEqual(knownuser.last_name, "knownuser")
- self.assertEqual(knownuser2.last_name, "knownuser2")
- def test_unknown_user(self):
- """
- The unknown user created should be configured with an email address
- provided in the request header.
- """
- num_users = User.objects.count()
- response = self.client.get(
- "/remote_user/",
- **{
- self.header: "newuser",
- self.email_header: "user@example.com",
- },
- )
- self.assertEqual(response.context["user"].username, "newuser")
- self.assertEqual(response.context["user"].email, "user@example.com")
- self.assertEqual(response.context["user"].last_name, "")
- self.assertEqual(User.objects.count(), num_users + 1)
- newuser = User.objects.get(username="newuser")
- self.assertEqual(newuser.email, "user@example.com")
- class CustomHeaderMiddleware(RemoteUserMiddleware):
- """
- Middleware that overrides custom HTTP auth user header.
- """
- header = "HTTP_AUTHUSER"
- class CustomHeaderRemoteUserTest(RemoteUserTest):
- """
- Tests a custom RemoteUserMiddleware subclass with custom HTTP auth user
- header.
- """
- middleware = "auth_tests.test_remote_user.CustomHeaderMiddleware"
- header = "HTTP_AUTHUSER"
- class PersistentRemoteUserTest(RemoteUserTest):
- """
- PersistentRemoteUserMiddleware keeps the user logged in even if the
- subsequent calls do not contain the header value.
- """
- middleware = "django.contrib.auth.middleware.PersistentRemoteUserMiddleware"
- require_header = False
- def test_header_disappears(self):
- """
- A logged in user is kept logged in even if the REMOTE_USER header
- disappears during the same browser session.
- """
- User.objects.create(username="knownuser")
- # Known user authenticates
- response = self.client.get("/remote_user/", **{self.header: self.known_user})
- self.assertEqual(response.context["user"].username, "knownuser")
- # Should stay logged in if the REMOTE_USER header disappears.
- response = self.client.get("/remote_user/")
- self.assertFalse(response.context["user"].is_anonymous)
- self.assertEqual(response.context["user"].username, "knownuser")
- async def test_header_disappears_async(self):
- """See test_header_disappears."""
- await User.objects.acreate(username="knownuser")
- # Known user authenticates
- response = await self.async_client.get(
- "/remote_user/", **{self.header: self.known_user}
- )
- self.assertEqual(response.context["user"].username, "knownuser")
- # Should stay logged in if the REMOTE_USER header disappears.
- response = await self.async_client.get("/remote_user/")
- self.assertFalse(response.context["user"].is_anonymous)
- self.assertEqual(response.context["user"].username, "knownuser")
|