Files
mizan/backends/mizan-django/src/mizan/tests/test_auth.py
Ryth Azhur 76fce2dc85 Split the registry — function/composition core + backend extensions
The original registry tangled function, channel, composition, and form
registration in a single file with polymorphic register() dispatch.
That predates the household discipline; it was the design that was
supposed to ship but didn't. Re-implementing the original intent.

cores/mizan-python/src/mizan_core/registry.py (new):
- _functions, _compositions dicts
- register() — ServerFunction-only, no polymorphic dispatch
- register_as(), register_compose()
- register_extension(name, extension) — hook interface
- get_function/get_compose/get_all_functions/get_all_compositions
- get_contexts, get_context_groups
- get_registry, get_schema — aggregate extension contributions
- validate_registry, clear_registry — cascade-clear extensions

RegistryExtension Protocol:
- schema() returns the extension's schema subdict (keyed under its name)
- clear() resets extension state (called by clear_registry)

mizan-django/src/mizan/channels/__init__.py:
- _ChannelsExtension wraps the channel _registry, plugs into core via
  register_extension('channels', ...). Schema output preserves the
  same shape codegen consumed before (snake_case keys, type+bidirectional).

mizan-django/src/mizan/forms/__init__.py:
- register_form() and get_forms() helpers moved here (were in setup/registry.py)
- Both use mizan_core.registry under the hood. Forms don't need a
  separate extension because form sub-functions register as regular
  ServerFunctions with meta.form set.

mizan-django/src/mizan/setup/registry.py: deleted.
mizan-django/src/mizan/setup/__init__.py: re-exports the registry helpers
from mizan_core.registry / mizan.channels / mizan.forms — the Django
adapter's curated public API surface stays stable for users.

Consumers updated: ~10 files imported `from mizan.setup.registry`;
all switched to direct imports from mizan_core.registry, mizan.channels,
or mizan.forms as appropriate. ChannelTests in test_core.py rewritten
to use mizan.channels.register directly (no more polymorphic
@register_as on ReactChannel subclasses).

Verified:
- mizan-core: 15/15
- mizan-django: 348 pass, 21 skip, 0 fail
- mizan-ts edge-compat: 34/34 (cross-language pin holds)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-06 15:21:16 -04:00

555 lines
17 KiB
Python

"""
Authentication Tests for mizan Server Functions
Tests all combinations of:
- Transport: HTTP vs WebSocket RPC
- JWT: Present (valid), Present (invalid), Absent
- Session: Present (valid), Absent
Expected behavior:
- JWT present (valid) → JWTUser (no DB query)
- JWT present (invalid) → Reject (401), do NOT fall back to session
- JWT absent + Session present → Session auth (DB query)
- JWT absent + Session absent → AnonymousUser
"""
from django.test import TestCase, RequestFactory, override_settings
from django.contrib.auth import get_user_model
from django.contrib.sessions.backends.db import SessionStore
from unittest.mock import patch, MagicMock
import json
from mizan.jwt.tokens import (
create_token_pair,
decode_token,
JWTUser,
)
from mizan.client.executor import (
_try_jwt_auth,
execute_function,
FunctionError,
FunctionResult,
ErrorCode,
)
from mizan.client import client
from mizan_core.registry import clear_registry, register
from pydantic import BaseModel
User = get_user_model()
# =============================================================================
# Test Output Models (proper Pydantic models, not raw dicts)
# =============================================================================
class WhoamiOutput(BaseModel):
is_authenticated: bool
user_id: int | None
user_type: str
is_staff: bool
class OkOutput(BaseModel):
ok: bool
class UserTypeOutput(BaseModel):
user_type: str
# =============================================================================
# Test Server Functions - defined as plain functions, registered in setUp
# =============================================================================
def _whoami_fn(request) -> WhoamiOutput:
"""Returns info about the authenticated user."""
user = request.user
return WhoamiOutput(
is_authenticated=user.is_authenticated,
user_id=getattr(user, "id", None),
user_type=type(user).__name__,
is_staff=getattr(user, "is_staff", False),
)
@override_settings(
JWT_PRIVATE_KEY="test-secret-key-for-testing-only",
JWT_ALGORITHM="HS256",
)
class HTTPAuthTests(TestCase):
"""Test HTTP transport authentication combinations."""
def setUp(self):
clear_registry()
self.factory = RequestFactory()
self.user = User.objects.create_user(
email="test@example.com",
password="testpass123",
is_staff=True,
is_superuser=False,
)
# Create a session
self.session = SessionStore()
self.session.create()
self.session_key = self.session.session_key
# Register test function
@client
def whoami(request) -> WhoamiOutput:
user = request.user
return WhoamiOutput(
is_authenticated=user.is_authenticated,
user_id=getattr(user, "id", None),
user_type=type(user).__name__,
is_staff=getattr(user, "is_staff", False),
)
register(whoami, "whoami")
def tearDown(self):
self.user.delete()
self.session.delete()
clear_registry()
def test_jwt_valid_no_session(self):
"""Valid JWT without session → JWTUser (no DB query)."""
tokens = create_token_pair(
self.user.pk,
self.session_key,
is_staff=True,
is_superuser=False,
)
request = self.factory.post("/")
request.META["HTTP_AUTHORIZATION"] = f"Bearer {tokens.access_token}"
request.user = MagicMock(is_authenticated=False) # No session auth
# Try JWT auth
result = _try_jwt_auth(request)
self.assertTrue(result)
self.assertIsInstance(request.user, JWTUser)
self.assertEqual(request.user.id, self.user.pk)
self.assertTrue(request.user.is_staff)
self.assertTrue(request.user.is_authenticated)
def test_jwt_valid_with_session(self):
"""Valid JWT with session → JWT takes precedence (no DB query)."""
tokens = create_token_pair(
self.user.pk,
self.session_key,
is_staff=True,
is_superuser=False,
)
request = self.factory.post("/")
request.META["HTTP_AUTHORIZATION"] = f"Bearer {tokens.access_token}"
request.user = self.user # Session auth already set user
# JWT should still be processed and take precedence
result = _try_jwt_auth(request)
self.assertTrue(result)
self.assertIsInstance(request.user, JWTUser)
def test_jwt_invalid_with_session(self):
"""Invalid JWT with valid session → Reject (do NOT fall back)."""
request = self.factory.post("/")
request.META["HTTP_AUTHORIZATION"] = "Bearer invalid-token-here"
request.user = self.user # Session would work
# JWT auth should fail
result = _try_jwt_auth(request)
self.assertFalse(result)
# User should NOT be changed to session user - that happens elsewhere
# The point is _try_jwt_auth returns False, indicating JWT failed
def test_jwt_expired_with_session(self):
"""Expired JWT with valid session → Reject (do NOT fall back)."""
# Create token with past expiration by mocking time
with patch("mizan.jwt.tokens.time.time", return_value=0):
tokens = create_token_pair(
self.user.pk,
self.session_key,
is_staff=True,
is_superuser=False,
)
request = self.factory.post("/")
request.META["HTTP_AUTHORIZATION"] = f"Bearer {tokens.access_token}"
request.user = self.user # Session would work
# JWT auth should fail (expired)
result = _try_jwt_auth(request)
self.assertFalse(result)
def test_no_jwt_with_session(self):
"""No JWT with valid session → Session auth (normal Django flow)."""
request = self.factory.post("/")
request.user = self.user # Session auth set user
# No JWT auth attempted
result = _try_jwt_auth(request)
self.assertFalse(result) # No JWT to process
# User remains the session user
self.assertEqual(request.user, self.user)
def test_no_jwt_no_session(self):
"""No JWT, no session → AnonymousUser."""
from django.contrib.auth.models import AnonymousUser
request = self.factory.post("/")
request.user = AnonymousUser()
result = _try_jwt_auth(request)
self.assertFalse(result)
self.assertIsInstance(request.user, AnonymousUser)
def test_execute_function_with_jwt(self):
"""Execute server function with JWT auth."""
tokens = create_token_pair(
self.user.pk,
self.session_key,
is_staff=True,
is_superuser=False,
)
request = self.factory.post("/")
request.META["HTTP_AUTHORIZATION"] = f"Bearer {tokens.access_token}"
# Simulate what the view does: try JWT auth first
_try_jwt_auth(request)
# Use the whoami function which returns WhoamiOutput (Pydantic model)
result = execute_function(request, "whoami", {})
self.assertIsInstance(result, FunctionResult)
self.assertTrue(result.data["is_authenticated"])
self.assertEqual(result.data["user_type"], "JWTUser")
self.assertTrue(result.data["is_staff"])
@override_settings(
JWT_PRIVATE_KEY="test-secret-key-for-testing-only",
JWT_ALGORITHM="HS256",
)
class JWTUserTests(TestCase):
"""Test JWTUser behavior."""
def setUp(self):
clear_registry()
def tearDown(self):
clear_registry()
def test_jwt_user_attributes(self):
"""JWTUser has expected attributes."""
from mizan.jwt.tokens import TokenPayload
payload = TokenPayload(
user_id=42,
session_key="test-session",
token_type="access",
is_staff=True,
is_superuser=False,
exp=9999999999,
iat=0,
)
user = JWTUser(payload)
self.assertEqual(user.id, 42)
self.assertEqual(user.pk, 42)
self.assertTrue(user.is_staff)
self.assertFalse(user.is_superuser)
self.assertTrue(user.is_authenticated)
self.assertFalse(user.is_anonymous)
self.assertTrue(user.is_active)
def test_jwt_user_string_id(self):
"""JWTUser handles string user_id (converted to int)."""
from mizan.jwt.tokens import TokenPayload
payload = TokenPayload(
user_id="42", # String, as stored in JWT
session_key="test-session",
token_type="access",
is_staff=False,
is_superuser=False,
exp=9999999999,
iat=0,
)
user = JWTUser(payload)
self.assertEqual(user.id, 42)
self.assertIsInstance(user.id, int)
@override_settings(
JWT_PRIVATE_KEY="test-secret-key-for-testing-only",
JWT_ALGORITHM="HS256",
)
class AuthDecoratorTests(TestCase):
"""Test @client(auth=...) decorator."""
def setUp(self):
clear_registry()
self.factory = RequestFactory()
self.user = User.objects.create_user(
email="test@example.com",
password="testpass123",
is_staff=False,
is_superuser=False,
)
self.staff_user = User.objects.create_user(
email="staff@example.com",
password="testpass123",
is_staff=True,
is_superuser=False,
)
self.superuser = User.objects.create_user(
email="super@example.com",
password="testpass123",
is_staff=True,
is_superuser=True,
)
def tearDown(self):
self.user.delete()
self.staff_user.delete()
self.superuser.delete()
clear_registry()
def test_auth_required_with_anonymous(self):
"""@client(auth=True) rejects anonymous users."""
from django.contrib.auth.models import AnonymousUser
# Register a test function with proper Pydantic model
@client(auth=True)
def protected_fn(request) -> OkOutput:
return OkOutput(ok=True)
register(protected_fn, "protected_fn")
request = self.factory.post("/")
request.user = AnonymousUser()
result = execute_function(request, "protected_fn", {})
self.assertIsInstance(result, FunctionError)
self.assertEqual(result.code, ErrorCode.UNAUTHORIZED)
def test_auth_required_with_authenticated(self):
"""@client(auth=True) allows authenticated users."""
@client(auth=True)
def protected_fn2(request) -> OkOutput:
return OkOutput(ok=True)
register(protected_fn2, "protected_fn2")
request = self.factory.post("/")
request.user = self.user
result = execute_function(request, "protected_fn2", {})
self.assertIsInstance(result, FunctionResult)
self.assertEqual(result.data["ok"], True)
def test_auth_staff_with_regular_user(self):
"""@client(auth='staff') rejects non-staff users."""
@client(auth="staff")
def staff_fn(request) -> OkOutput:
return OkOutput(ok=True)
register(staff_fn, "staff_fn")
request = self.factory.post("/")
request.user = self.user # Not staff
result = execute_function(request, "staff_fn", {})
self.assertIsInstance(result, FunctionError)
self.assertEqual(result.code, ErrorCode.FORBIDDEN)
def test_auth_staff_with_staff_user(self):
"""@client(auth='staff') allows staff users."""
@client(auth="staff")
def staff_fn2(request) -> OkOutput:
return OkOutput(ok=True)
register(staff_fn2, "staff_fn2")
request = self.factory.post("/")
request.user = self.staff_user
result = execute_function(request, "staff_fn2", {})
self.assertIsInstance(result, FunctionResult)
def test_auth_superuser_with_staff(self):
"""@client(auth='superuser') rejects non-superusers."""
@client(auth="superuser")
def super_fn(request) -> OkOutput:
return OkOutput(ok=True)
register(super_fn, "super_fn")
request = self.factory.post("/")
request.user = self.staff_user # Staff but not superuser
result = execute_function(request, "super_fn", {})
self.assertIsInstance(result, FunctionError)
self.assertEqual(result.code, ErrorCode.FORBIDDEN)
def test_auth_superuser_with_superuser(self):
"""@client(auth='superuser') allows superusers."""
@client(auth="superuser")
def super_fn2(request) -> OkOutput:
return OkOutput(ok=True)
register(super_fn2, "super_fn2")
request = self.factory.post("/")
request.user = self.superuser
result = execute_function(request, "super_fn2", {})
self.assertIsInstance(result, FunctionResult)
def test_auth_with_jwt_user(self):
"""Auth checks work with JWTUser (stateless)."""
from mizan.jwt.tokens import TokenPayload
@client(auth="staff")
def jwt_staff_fn(request) -> UserTypeOutput:
return UserTypeOutput(user_type=type(request.user).__name__)
register(jwt_staff_fn, "jwt_staff_fn")
# Create JWTUser with is_staff=True
payload = TokenPayload(
user_id=99,
session_key="test",
token_type="access",
is_staff=True,
is_superuser=False,
exp=9999999999,
iat=0,
)
jwt_user = JWTUser(payload)
request = self.factory.post("/")
request.user = jwt_user
result = execute_function(request, "jwt_staff_fn", {})
self.assertIsInstance(result, FunctionResult)
self.assertEqual(result.data["user_type"], "JWTUser")
def test_auth_invalid_string_raises(self):
"""Invalid auth string raises ValueError at decoration time."""
with self.assertRaises(ValueError) as ctx:
@client(auth="admin") # 'admin' is not valid
def bad_fn(request) -> OkOutput:
return OkOutput(ok=True)
self.assertIn("Invalid auth value 'admin'", str(ctx.exception))
self.assertIn("required", str(ctx.exception))
def test_auth_callable_returns_true(self):
"""Callable auth returning True allows access."""
@client(auth=lambda r: r.user.email.endswith("@example.com"))
def email_check_fn(request) -> OkOutput:
return OkOutput(ok=True)
register(email_check_fn, "email_check_fn")
request = self.factory.post("/")
request.user = self.user # email is test@example.com
result = execute_function(request, "email_check_fn", {})
self.assertIsInstance(result, FunctionResult)
self.assertTrue(result.data["ok"])
def test_auth_callable_returns_false(self):
"""Callable auth returning False denies access."""
@client(auth=lambda r: r.user.email.endswith("@admin.com"))
def admin_email_fn(request) -> OkOutput:
return OkOutput(ok=True)
register(admin_email_fn, "admin_email_fn")
request = self.factory.post("/")
request.user = self.user # email is test@example.com, not @admin.com
result = execute_function(request, "admin_email_fn", {})
self.assertIsInstance(result, FunctionError)
self.assertEqual(result.code, ErrorCode.FORBIDDEN)
self.assertEqual(result.message, "Access denied")
def test_auth_callable_raises_permission_error(self):
"""Callable auth raising PermissionError uses custom message."""
def check_premium(request):
if not getattr(request.user, "is_premium", False):
raise PermissionError("Premium subscription required")
return True
@client(auth=check_premium)
def premium_fn(request) -> OkOutput:
return OkOutput(ok=True)
register(premium_fn, "premium_fn")
request = self.factory.post("/")
request.user = self.user # No is_premium attribute
result = execute_function(request, "premium_fn", {})
self.assertIsInstance(result, FunctionError)
self.assertEqual(result.code, ErrorCode.FORBIDDEN)
self.assertEqual(result.message, "Premium subscription required")
def test_auth_callable_with_anonymous_user(self):
"""Callable auth can check for anonymous users."""
from django.contrib.auth.models import AnonymousUser
def must_be_authenticated(request):
if not request.user.is_authenticated:
raise PermissionError("Please log in")
return True
@client(auth=must_be_authenticated)
def needs_login_fn(request) -> OkOutput:
return OkOutput(ok=True)
register(needs_login_fn, "needs_login_fn")
request = self.factory.post("/")
request.user = AnonymousUser()
result = execute_function(request, "needs_login_fn", {})
self.assertIsInstance(result, FunctionError)
self.assertEqual(result.code, ErrorCode.FORBIDDEN)
self.assertEqual(result.message, "Please log in")