|
@@ -1,3 +1,7 @@
|
|
|
+from asyncio import iscoroutinefunction
|
|
|
+
|
|
|
+from asgiref.sync import sync_to_async
|
|
|
+
|
|
|
from django.conf import settings
|
|
|
from django.contrib.auth import models
|
|
|
from django.contrib.auth.decorators import (
|
|
@@ -19,6 +23,22 @@ class LoginRequiredTestCase(AuthViewsTestCase):
|
|
|
Tests the login_required decorators
|
|
|
"""
|
|
|
|
|
|
+ factory = RequestFactory()
|
|
|
+
|
|
|
+ def test_wrapped_sync_function_is_not_coroutine_function(self):
|
|
|
+ def sync_view(request):
|
|
|
+ return HttpResponse()
|
|
|
+
|
|
|
+ wrapped_view = login_required(sync_view)
|
|
|
+ self.assertIs(iscoroutinefunction(wrapped_view), False)
|
|
|
+
|
|
|
+ def test_wrapped_async_function_is_coroutine_function(self):
|
|
|
+ async def async_view(request):
|
|
|
+ return HttpResponse()
|
|
|
+
|
|
|
+ wrapped_view = login_required(async_view)
|
|
|
+ self.assertIs(iscoroutinefunction(wrapped_view), True)
|
|
|
+
|
|
|
def test_callable(self):
|
|
|
"""
|
|
|
login_required is assignable to callable objects.
|
|
@@ -63,6 +83,35 @@ class LoginRequiredTestCase(AuthViewsTestCase):
|
|
|
view_url="/login_required_login_url/", login_url="/somewhere/"
|
|
|
)
|
|
|
|
|
|
+ async def test_login_required_async_view(self, login_url=None):
|
|
|
+ async def async_view(request):
|
|
|
+ return HttpResponse()
|
|
|
+
|
|
|
+ async def auser_anonymous():
|
|
|
+ return models.AnonymousUser()
|
|
|
+
|
|
|
+ async def auser():
|
|
|
+ return self.u1
|
|
|
+
|
|
|
+ if login_url is None:
|
|
|
+ async_view = login_required(async_view)
|
|
|
+ login_url = settings.LOGIN_URL
|
|
|
+ else:
|
|
|
+ async_view = login_required(async_view, login_url=login_url)
|
|
|
+
|
|
|
+ request = self.factory.get("/rand")
|
|
|
+ request.auser = auser_anonymous
|
|
|
+ response = await async_view(request)
|
|
|
+ self.assertEqual(response.status_code, 302)
|
|
|
+ self.assertIn(login_url, response.url)
|
|
|
+
|
|
|
+ request.auser = auser
|
|
|
+ response = await async_view(request)
|
|
|
+ self.assertEqual(response.status_code, 200)
|
|
|
+
|
|
|
+ async def test_login_required_next_url_async_view(self):
|
|
|
+ await self.test_login_required_async_view(login_url="/somewhere/")
|
|
|
+
|
|
|
|
|
|
class PermissionsRequiredDecoratorTest(TestCase):
|
|
|
"""
|
|
@@ -80,6 +129,24 @@ class PermissionsRequiredDecoratorTest(TestCase):
|
|
|
)
|
|
|
cls.user.user_permissions.add(*perms)
|
|
|
|
|
|
+ @classmethod
|
|
|
+ async def auser(cls):
|
|
|
+ return cls.user
|
|
|
+
|
|
|
+ def test_wrapped_sync_function_is_not_coroutine_function(self):
|
|
|
+ def sync_view(request):
|
|
|
+ return HttpResponse()
|
|
|
+
|
|
|
+ wrapped_view = permission_required([])(sync_view)
|
|
|
+ self.assertIs(iscoroutinefunction(wrapped_view), False)
|
|
|
+
|
|
|
+ def test_wrapped_async_function_is_coroutine_function(self):
|
|
|
+ async def async_view(request):
|
|
|
+ return HttpResponse()
|
|
|
+
|
|
|
+ wrapped_view = permission_required([])(async_view)
|
|
|
+ self.assertIs(iscoroutinefunction(wrapped_view), True)
|
|
|
+
|
|
|
def test_many_permissions_pass(self):
|
|
|
@permission_required(
|
|
|
["auth_tests.add_customuser", "auth_tests.change_customuser"]
|
|
@@ -147,6 +214,73 @@ class PermissionsRequiredDecoratorTest(TestCase):
|
|
|
with self.assertRaises(PermissionDenied):
|
|
|
a_view(request)
|
|
|
|
|
|
+ async def test_many_permissions_pass_async_view(self):
|
|
|
+ @permission_required(
|
|
|
+ ["auth_tests.add_customuser", "auth_tests.change_customuser"]
|
|
|
+ )
|
|
|
+ async def async_view(request):
|
|
|
+ return HttpResponse()
|
|
|
+
|
|
|
+ request = self.factory.get("/rand")
|
|
|
+ request.auser = self.auser
|
|
|
+ response = await async_view(request)
|
|
|
+ self.assertEqual(response.status_code, 200)
|
|
|
+
|
|
|
+ async def test_many_permissions_in_set_pass_async_view(self):
|
|
|
+ @permission_required(
|
|
|
+ {"auth_tests.add_customuser", "auth_tests.change_customuser"}
|
|
|
+ )
|
|
|
+ async def async_view(request):
|
|
|
+ return HttpResponse()
|
|
|
+
|
|
|
+ request = self.factory.get("/rand")
|
|
|
+ request.auser = self.auser
|
|
|
+ response = await async_view(request)
|
|
|
+ self.assertEqual(response.status_code, 200)
|
|
|
+
|
|
|
+ async def test_single_permission_pass_async_view(self):
|
|
|
+ @permission_required("auth_tests.add_customuser")
|
|
|
+ async def async_view(request):
|
|
|
+ return HttpResponse()
|
|
|
+
|
|
|
+ request = self.factory.get("/rand")
|
|
|
+ request.auser = self.auser
|
|
|
+ response = await async_view(request)
|
|
|
+ self.assertEqual(response.status_code, 200)
|
|
|
+
|
|
|
+ async def test_permissioned_denied_redirect_async_view(self):
|
|
|
+ @permission_required(
|
|
|
+ [
|
|
|
+ "auth_tests.add_customuser",
|
|
|
+ "auth_tests.change_customuser",
|
|
|
+ "nonexistent-permission",
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ async def async_view(request):
|
|
|
+ return HttpResponse()
|
|
|
+
|
|
|
+ request = self.factory.get("/rand")
|
|
|
+ request.auser = self.auser
|
|
|
+ response = await async_view(request)
|
|
|
+ self.assertEqual(response.status_code, 302)
|
|
|
+
|
|
|
+ async def test_permissioned_denied_exception_raised_async_view(self):
|
|
|
+ @permission_required(
|
|
|
+ [
|
|
|
+ "auth_tests.add_customuser",
|
|
|
+ "auth_tests.change_customuser",
|
|
|
+ "nonexistent-permission",
|
|
|
+ ],
|
|
|
+ raise_exception=True,
|
|
|
+ )
|
|
|
+ async def async_view(request):
|
|
|
+ return HttpResponse()
|
|
|
+
|
|
|
+ request = self.factory.get("/rand")
|
|
|
+ request.auser = self.auser
|
|
|
+ with self.assertRaises(PermissionDenied):
|
|
|
+ await async_view(request)
|
|
|
+
|
|
|
|
|
|
class UserPassesTestDecoratorTest(TestCase):
|
|
|
factory = RequestFactory()
|
|
@@ -162,6 +296,28 @@ class UserPassesTestDecoratorTest(TestCase):
|
|
|
)
|
|
|
cls.user_pass.user_permissions.add(*perms)
|
|
|
|
|
|
+ @classmethod
|
|
|
+ async def auser_pass(cls):
|
|
|
+ return cls.user_pass
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ async def auser_deny(cls):
|
|
|
+ return cls.user_deny
|
|
|
+
|
|
|
+ def test_wrapped_sync_function_is_not_coroutine_function(self):
|
|
|
+ def sync_view(request):
|
|
|
+ return HttpResponse()
|
|
|
+
|
|
|
+ wrapped_view = user_passes_test(lambda user: True)(sync_view)
|
|
|
+ self.assertIs(iscoroutinefunction(wrapped_view), False)
|
|
|
+
|
|
|
+ def test_wrapped_async_function_is_coroutine_function(self):
|
|
|
+ async def async_view(request):
|
|
|
+ return HttpResponse()
|
|
|
+
|
|
|
+ wrapped_view = user_passes_test(lambda user: True)(async_view)
|
|
|
+ self.assertIs(iscoroutinefunction(wrapped_view), True)
|
|
|
+
|
|
|
def test_decorator(self):
|
|
|
def sync_test_func(user):
|
|
|
return bool(
|
|
@@ -180,3 +336,56 @@ class UserPassesTestDecoratorTest(TestCase):
|
|
|
request.user = self.user_deny
|
|
|
response = sync_view(request)
|
|
|
self.assertEqual(response.status_code, 302)
|
|
|
+
|
|
|
+ def test_decorator_async_test_func(self):
|
|
|
+ async def async_test_func(user):
|
|
|
+ return await sync_to_async(user.has_perms)(["auth_tests.add_customuser"])
|
|
|
+
|
|
|
+ @user_passes_test(async_test_func)
|
|
|
+ def sync_view(request):
|
|
|
+ return HttpResponse()
|
|
|
+
|
|
|
+ request = self.factory.get("/rand")
|
|
|
+ request.user = self.user_pass
|
|
|
+ response = sync_view(request)
|
|
|
+ self.assertEqual(response.status_code, 200)
|
|
|
+
|
|
|
+ request.user = self.user_deny
|
|
|
+ response = sync_view(request)
|
|
|
+ self.assertEqual(response.status_code, 302)
|
|
|
+
|
|
|
+ async def test_decorator_async_view(self):
|
|
|
+ def sync_test_func(user):
|
|
|
+ return bool(
|
|
|
+ models.Group.objects.filter(name__istartswith=user.username).exists()
|
|
|
+ )
|
|
|
+
|
|
|
+ @user_passes_test(sync_test_func)
|
|
|
+ async def async_view(request):
|
|
|
+ return HttpResponse()
|
|
|
+
|
|
|
+ request = self.factory.get("/rand")
|
|
|
+ request.auser = self.auser_pass
|
|
|
+ response = await async_view(request)
|
|
|
+ self.assertEqual(response.status_code, 200)
|
|
|
+
|
|
|
+ request.auser = self.auser_deny
|
|
|
+ response = await async_view(request)
|
|
|
+ self.assertEqual(response.status_code, 302)
|
|
|
+
|
|
|
+ async def test_decorator_async_view_async_test_func(self):
|
|
|
+ async def async_test_func(user):
|
|
|
+ return await sync_to_async(user.has_perms)(["auth_tests.add_customuser"])
|
|
|
+
|
|
|
+ @user_passes_test(async_test_func)
|
|
|
+ async def async_view(request):
|
|
|
+ return HttpResponse()
|
|
|
+
|
|
|
+ request = self.factory.get("/rand")
|
|
|
+ request.auser = self.auser_pass
|
|
|
+ response = await async_view(request)
|
|
|
+ self.assertEqual(response.status_code, 200)
|
|
|
+
|
|
|
+ request.auser = self.auser_deny
|
|
|
+ response = await async_view(request)
|
|
|
+ self.assertEqual(response.status_code, 302)
|