123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426 |
- from asyncio import iscoroutinefunction
- from asgiref.sync import sync_to_async
- from django.conf import settings
- from django.contrib.auth import models
- from django.contrib.auth.decorators import (
- login_not_required,
- login_required,
- permission_required,
- user_passes_test,
- )
- from django.core.exceptions import PermissionDenied
- from django.http import HttpResponse
- from django.test import TestCase, override_settings
- from django.test.client import RequestFactory
- from .test_views import AuthViewsTestCase
- @override_settings(ROOT_URLCONF="auth_tests.urls")
- class LoginRequiredTestCase(AuthViewsTestCase):
- """
- Tests the login_required decorators
- """
- factory = RequestFactory()
- def test_wrapped_sync_function_is_not_coroutine_function(self):
- def sync_view(request):
- return HttpResponse()
- wrapped_view = login_required(sync_view)
- self.assertIs(iscoroutinefunction(wrapped_view), False)
- def test_wrapped_async_function_is_coroutine_function(self):
- async def async_view(request):
- return HttpResponse()
- wrapped_view = login_required(async_view)
- self.assertIs(iscoroutinefunction(wrapped_view), True)
- def test_callable(self):
- """
- login_required is assignable to callable objects.
- """
- class CallableView:
- def __call__(self, *args, **kwargs):
- pass
- login_required(CallableView())
- def test_view(self):
- """
- login_required is assignable to normal views.
- """
- def normal_view(request):
- pass
- login_required(normal_view)
- def test_login_required(self, view_url="/login_required/", login_url=None):
- """
- login_required works on a simple view wrapped in a login_required
- decorator.
- """
- if login_url is None:
- login_url = settings.LOGIN_URL
- response = self.client.get(view_url)
- self.assertEqual(response.status_code, 302)
- self.assertIn(login_url, response.url)
- self.login()
- response = self.client.get(view_url)
- self.assertEqual(response.status_code, 200)
- def test_login_required_next_url(self):
- """
- login_required works on a simple view wrapped in a login_required
- decorator with a login_url set.
- """
- self.test_login_required(
- view_url="/login_required_login_url/", login_url="/somewhere/"
- )
- async def test_login_required_async_view(self, login_url=None):
- async def async_view(request):
- return HttpResponse()
- async def auser_anonymous():
- return models.AnonymousUser()
- async def auser():
- return self.u1
- if login_url is None:
- async_view = login_required(async_view)
- login_url = settings.LOGIN_URL
- else:
- async_view = login_required(async_view, login_url=login_url)
- request = self.factory.get("/rand")
- request.auser = auser_anonymous
- response = await async_view(request)
- self.assertEqual(response.status_code, 302)
- self.assertIn(login_url, response.url)
- request.auser = auser
- response = await async_view(request)
- self.assertEqual(response.status_code, 200)
- async def test_login_required_next_url_async_view(self):
- await self.test_login_required_async_view(login_url="/somewhere/")
- class LoginNotRequiredTestCase(TestCase):
- """
- Tests the login_not_required decorators
- """
- def test_callable(self):
- """
- login_not_required is assignable to callable objects.
- """
- class CallableView:
- def __call__(self, *args, **kwargs):
- pass
- login_not_required(CallableView())
- def test_view(self):
- """
- login_not_required is assignable to normal views.
- """
- def normal_view(request):
- pass
- login_not_required(normal_view)
- def test_decorator_marks_view_as_login_not_required(self):
- @login_not_required
- def view(request):
- return HttpResponse()
- self.assertFalse(view.login_required)
- class PermissionsRequiredDecoratorTest(TestCase):
- """
- Tests for the permission_required decorator
- """
- factory = RequestFactory()
- @classmethod
- def setUpTestData(cls):
- cls.user = models.User.objects.create(username="joe", password="qwerty")
- # Add permissions auth.add_customuser and auth.change_customuser
- perms = models.Permission.objects.filter(
- codename__in=("add_customuser", "change_customuser")
- )
- cls.user.user_permissions.add(*perms)
- @classmethod
- async def auser(cls):
- return cls.user
- def test_wrapped_sync_function_is_not_coroutine_function(self):
- def sync_view(request):
- return HttpResponse()
- wrapped_view = permission_required([])(sync_view)
- self.assertIs(iscoroutinefunction(wrapped_view), False)
- def test_wrapped_async_function_is_coroutine_function(self):
- async def async_view(request):
- return HttpResponse()
- wrapped_view = permission_required([])(async_view)
- self.assertIs(iscoroutinefunction(wrapped_view), True)
- def test_many_permissions_pass(self):
- @permission_required(
- ["auth_tests.add_customuser", "auth_tests.change_customuser"]
- )
- def a_view(request):
- return HttpResponse()
- request = self.factory.get("/rand")
- request.user = self.user
- resp = a_view(request)
- self.assertEqual(resp.status_code, 200)
- def test_many_permissions_in_set_pass(self):
- @permission_required(
- {"auth_tests.add_customuser", "auth_tests.change_customuser"}
- )
- def a_view(request):
- return HttpResponse()
- request = self.factory.get("/rand")
- request.user = self.user
- resp = a_view(request)
- self.assertEqual(resp.status_code, 200)
- def test_single_permission_pass(self):
- @permission_required("auth_tests.add_customuser")
- def a_view(request):
- return HttpResponse()
- request = self.factory.get("/rand")
- request.user = self.user
- resp = a_view(request)
- self.assertEqual(resp.status_code, 200)
- def test_permissioned_denied_redirect(self):
- @permission_required(
- [
- "auth_tests.add_customuser",
- "auth_tests.change_customuser",
- "nonexistent-permission",
- ]
- )
- def a_view(request):
- return HttpResponse()
- request = self.factory.get("/rand")
- request.user = self.user
- resp = a_view(request)
- self.assertEqual(resp.status_code, 302)
- def test_permissioned_denied_exception_raised(self):
- @permission_required(
- [
- "auth_tests.add_customuser",
- "auth_tests.change_customuser",
- "nonexistent-permission",
- ],
- raise_exception=True,
- )
- def a_view(request):
- return HttpResponse()
- request = self.factory.get("/rand")
- request.user = self.user
- with self.assertRaises(PermissionDenied):
- a_view(request)
- async def test_many_permissions_pass_async_view(self):
- @permission_required(
- ["auth_tests.add_customuser", "auth_tests.change_customuser"]
- )
- async def async_view(request):
- return HttpResponse()
- request = self.factory.get("/rand")
- request.auser = self.auser
- response = await async_view(request)
- self.assertEqual(response.status_code, 200)
- async def test_many_permissions_in_set_pass_async_view(self):
- @permission_required(
- {"auth_tests.add_customuser", "auth_tests.change_customuser"}
- )
- async def async_view(request):
- return HttpResponse()
- request = self.factory.get("/rand")
- request.auser = self.auser
- response = await async_view(request)
- self.assertEqual(response.status_code, 200)
- async def test_single_permission_pass_async_view(self):
- @permission_required("auth_tests.add_customuser")
- async def async_view(request):
- return HttpResponse()
- request = self.factory.get("/rand")
- request.auser = self.auser
- response = await async_view(request)
- self.assertEqual(response.status_code, 200)
- async def test_permissioned_denied_redirect_async_view(self):
- @permission_required(
- [
- "auth_tests.add_customuser",
- "auth_tests.change_customuser",
- "nonexistent-permission",
- ]
- )
- async def async_view(request):
- return HttpResponse()
- request = self.factory.get("/rand")
- request.auser = self.auser
- response = await async_view(request)
- self.assertEqual(response.status_code, 302)
- async def test_permissioned_denied_exception_raised_async_view(self):
- @permission_required(
- [
- "auth_tests.add_customuser",
- "auth_tests.change_customuser",
- "nonexistent-permission",
- ],
- raise_exception=True,
- )
- async def async_view(request):
- return HttpResponse()
- request = self.factory.get("/rand")
- request.auser = self.auser
- with self.assertRaises(PermissionDenied):
- await async_view(request)
- class UserPassesTestDecoratorTest(TestCase):
- factory = RequestFactory()
- @classmethod
- def setUpTestData(cls):
- cls.user_pass = models.User.objects.create(username="joe", password="qwerty")
- cls.user_deny = models.User.objects.create(username="jim", password="qwerty")
- models.Group.objects.create(name="Joe group")
- # Add permissions auth.add_customuser and auth.change_customuser
- perms = models.Permission.objects.filter(
- codename__in=("add_customuser", "change_customuser")
- )
- cls.user_pass.user_permissions.add(*perms)
- @classmethod
- async def auser_pass(cls):
- return cls.user_pass
- @classmethod
- async def auser_deny(cls):
- return cls.user_deny
- def test_wrapped_sync_function_is_not_coroutine_function(self):
- def sync_view(request):
- return HttpResponse()
- wrapped_view = user_passes_test(lambda user: True)(sync_view)
- self.assertIs(iscoroutinefunction(wrapped_view), False)
- def test_wrapped_async_function_is_coroutine_function(self):
- async def async_view(request):
- return HttpResponse()
- wrapped_view = user_passes_test(lambda user: True)(async_view)
- self.assertIs(iscoroutinefunction(wrapped_view), True)
- def test_decorator(self):
- def sync_test_func(user):
- return bool(
- models.Group.objects.filter(name__istartswith=user.username).exists()
- )
- @user_passes_test(sync_test_func)
- def sync_view(request):
- return HttpResponse()
- request = self.factory.get("/rand")
- request.user = self.user_pass
- response = sync_view(request)
- self.assertEqual(response.status_code, 200)
- request.user = self.user_deny
- response = sync_view(request)
- self.assertEqual(response.status_code, 302)
- def test_decorator_async_test_func(self):
- async def async_test_func(user):
- return await sync_to_async(user.has_perms)(["auth_tests.add_customuser"])
- @user_passes_test(async_test_func)
- def sync_view(request):
- return HttpResponse()
- request = self.factory.get("/rand")
- request.user = self.user_pass
- response = sync_view(request)
- self.assertEqual(response.status_code, 200)
- request.user = self.user_deny
- response = sync_view(request)
- self.assertEqual(response.status_code, 302)
- async def test_decorator_async_view(self):
- def sync_test_func(user):
- return bool(
- models.Group.objects.filter(name__istartswith=user.username).exists()
- )
- @user_passes_test(sync_test_func)
- async def async_view(request):
- return HttpResponse()
- request = self.factory.get("/rand")
- request.auser = self.auser_pass
- response = await async_view(request)
- self.assertEqual(response.status_code, 200)
- request.auser = self.auser_deny
- response = await async_view(request)
- self.assertEqual(response.status_code, 302)
- async def test_decorator_async_view_async_test_func(self):
- async def async_test_func(user):
- return await sync_to_async(user.has_perms)(["auth_tests.add_customuser"])
- @user_passes_test(async_test_func)
- async def async_view(request):
- return HttpResponse()
- request = self.factory.get("/rand")
- request.auser = self.auser_pass
- response = await async_view(request)
- self.assertEqual(response.status_code, 200)
- request.auser = self.auser_deny
- response = await async_view(request)
- self.assertEqual(response.status_code, 302)
|