diff --git a/packages/mizan-django/src/mizan/client/executor.py b/packages/mizan-django/src/mizan/client/executor.py index 2930b6c..2917dff 100644 --- a/packages/mizan-django/src/mizan/client/executor.py +++ b/packages/mizan-django/src/mizan/client/executor.py @@ -457,6 +457,40 @@ def execute_function( return FunctionResult(data=output.model_dump()) +def _try_mwt_auth(request: HttpRequest) -> bool: + """ + Attempt to authenticate the request using MWT (Mizan Web Token). + + Checks the X-Mizan-Token header. If present and valid, sets request.user + to an MWTUser. Returns True on success, False if no MWT header or invalid. + """ + token = request.META.get("HTTP_X_MIZAN_TOKEN", "") + if not token: + return False + + try: + settings = get_settings() + if not settings.cache_secret: + return False + + from mizan.mwt import decode_mwt, MWTUser + + payload = decode_mwt(token, settings.cache_secret) + if payload is None: + return False + + request.user = MWTUser(payload) + request._mizan_mwt_authenticated = True + return True + except Exception: + return False + + +def _has_mwt_header(request: HttpRequest) -> bool: + """Check if request has an X-Mizan-Token header.""" + return bool(request.META.get("HTTP_X_MIZAN_TOKEN", "")) + + def _try_jwt_auth(request: HttpRequest) -> bool: """ Attempt to authenticate the request using JWT. @@ -504,36 +538,38 @@ def _has_jwt_header(request: HttpRequest) -> bool: def _csrf_protect_unless_jwt(view_func): """ - Decorator that applies CSRF protection unless JWT auth is used. + Decorator that applies CSRF protection unless token auth is used. - JWT tokens are self-authenticating (the token itself proves the request - is legitimate), so CSRF protection is not needed. + MWT (X-Mizan-Token) is checked first, then legacy JWT (Authorization: Bearer). + Both are self-authenticating, so CSRF protection is not needed. - Security: If JWT is provided but invalid, reject the request - do NOT - fall back to session auth. This prevents attacks where an invalid token - is sent alongside a valid session cookie. + Security: If a token is provided but invalid, reject the request - do NOT + fall back to session auth. """ csrf_protected_view = csrf_protect(view_func) @wraps(view_func) def wrapper(request: HttpRequest, *args, **kwargs): - # Check if JWT header is present - has_jwt = _has_jwt_header(request) - - if has_jwt: - # JWT header present - try to authenticate - if _try_jwt_auth(request): - # JWT valid - skip CSRF, proceed + # MWT takes priority + if _has_mwt_header(request): + if _try_mwt_auth(request): return view_func(request, *args, **kwargs) - else: - # JWT invalid - reject (do NOT fall back to session) - return FunctionError( - code=ErrorCode.UNAUTHORIZED, - message="Invalid or expired JWT token", - ).to_response(status=401) - else: - # No JWT - use session auth with CSRF - return csrf_protected_view(request, *args, **kwargs) + return FunctionError( + code=ErrorCode.UNAUTHORIZED, + message="Invalid or expired MWT", + ).to_response(status=401) + + # Legacy JWT fallback + if _has_jwt_header(request): + if _try_jwt_auth(request): + return view_func(request, *args, **kwargs) + return FunctionError( + code=ErrorCode.UNAUTHORIZED, + message="Invalid or expired JWT token", + ).to_response(status=401) + + # No token — session auth with CSRF + return csrf_protected_view(request, *args, **kwargs) return wrapper @@ -732,20 +768,30 @@ def execute_context( def _jwt_auth_only(view_func): """ - Decorator that handles JWT auth for GET endpoints (no CSRF needed for GET). + Decorator that handles token auth for GET endpoints (no CSRF needed for GET). + Checks MWT first, then legacy JWT. """ @wraps(view_func) def wrapper(request: HttpRequest, *args, **kwargs): - has_jwt = _has_jwt_header(request) - if has_jwt: + # MWT takes priority + if _has_mwt_header(request): + if _try_mwt_auth(request): + return view_func(request, *args, **kwargs) + return FunctionError( + code=ErrorCode.UNAUTHORIZED, + message="Invalid or expired MWT", + ).to_response(status=401) + + # Legacy JWT fallback + if _has_jwt_header(request): if _try_jwt_auth(request): return view_func(request, *args, **kwargs) - else: - return FunctionError( - code=ErrorCode.UNAUTHORIZED, - message="Invalid or expired JWT token", - ).to_response(status=401) - # No JWT — session auth (no CSRF needed for GET) + return FunctionError( + code=ErrorCode.UNAUTHORIZED, + message="Invalid or expired JWT token", + ).to_response(status=401) + + # No token — session auth (no CSRF needed for GET) return view_func(request, *args, **kwargs) return wrapper diff --git a/packages/mizan-django/src/mizan/jwt/functions.py b/packages/mizan-django/src/mizan/jwt/functions.py index d699a6f..366d31b 100644 --- a/packages/mizan-django/src/mizan/jwt/functions.py +++ b/packages/mizan-django/src/mizan/jwt/functions.py @@ -1,7 +1,7 @@ """ -JWT Server Functions +JWT & MWT Server Functions -JWT token operations exposed as mizan server functions. +Token operations exposed as mizan server functions. Works over WebSocket RPC (primary) or HTTP fallback. """ @@ -10,6 +10,7 @@ from pydantic import BaseModel from mizan.client import client from mizan.jwt.tokens import create_token_pair, refresh_tokens +from mizan.mwt import create_mwt class TokenPairOutput(BaseModel): @@ -99,3 +100,42 @@ def jwt_refresh(request: HttpRequest, refresh_token: str) -> TokenPairOutput: refresh_token=tokens.refresh_token, expires_in=tokens.expires_in, ) + + +# ── MWT (Mizan Web Token) ────────────────────────────────────────────── + + +class MWTOutput(BaseModel): + """MWT token response.""" + token: str + expires_in: int + + +@client +def mwt_obtain(request: HttpRequest) -> MWTOutput: + """ + Obtain a Mizan Web Token from an authenticated session. + + Requires session authentication (cookie-based login). + Returns an MWT for the X-Mizan-Token header — stateless, + cache-aware authentication with permission staleness detection. + + Usage (from frontend): + const { token, expires_in } = await call('mwt_obtain') + // Use token in X-Mizan-Token header + """ + user = request.user + + if not user.is_authenticated: + raise PermissionError("Authentication required") + + from mizan.setup.settings import get_settings + settings = get_settings() + + if not settings.cache_secret: + raise ValueError( + "MIZAN_CACHE_SECRET is not configured. MWT requires a signing secret." + ) + + token = create_mwt(user, settings.cache_secret, ttl=settings.mwt_ttl) + return MWTOutput(token=token, expires_in=settings.mwt_ttl) diff --git a/packages/mizan-django/src/mizan/mwt.py b/packages/mizan-django/src/mizan/mwt.py new file mode 100644 index 0000000..ed08ed9 --- /dev/null +++ b/packages/mizan-django/src/mizan/mwt.py @@ -0,0 +1,157 @@ +""" +MWT (Mizan Web Token) — Protocol-owned identity layer. + +MWT is a standard JWT (RFC 7519, HMAC-SHA256) with Mizan-specific claims, +traveling on the `X-Mizan-Token` header. It provides: + +- `sub`: user_id for HMAC cache key derivation +- `pkey`: permission state hash for staleness detection +- `kid`: key ID for secret rotation +- `aud`: audience binding to prevent cross-tenant replay + +MWT is issued from an authenticated Django session. The app handles +authentication (session, social auth, etc.); Mizan issues MWT from +the authenticated identity. Edge Workers and the origin-side cache +validate MWT to extract user identity for cache operations. + +Usage: + from mizan.mwt import create_mwt, decode_mwt, MWTUser + +Configuration: + MIZAN_CACHE_SECRET: signing key (shared with cache key derivation) + MIZAN_MWT_TTL: token lifetime in seconds (default: 300) +""" + +from __future__ import annotations + +import hashlib +import time +from dataclasses import dataclass +from typing import Any + +import jwt + + +@dataclass +class MWTPayload: + """Decoded MWT claims.""" + sub: str # user_id + staff: bool # is_staff + super: bool # is_superuser + pkey: str # permission state hash + kid: str # key ID + aud: str # audience + iat: int # issued at + exp: int # expiration + + +class MWTUser: + """ + Minimal user object created from MWT claims. + + Used as request.user for MWT-authenticated requests. + No database query required — all data comes from the token. + """ + + def __init__(self, payload: MWTPayload): + self.id = int(payload.sub) + self.pk = self.id + self.is_staff = payload.staff + self.is_superuser = payload.super + self.is_authenticated = True + self.is_anonymous = False + self.is_active = True + self.pkey = payload.pkey + + def __str__(self) -> str: + return f"MWTUser(id={self.id})" + + def __repr__(self) -> str: + return f"MWTUser(id={self.id}, pkey={self.pkey[:8]}...)" + + +def compute_permission_key(user: Any) -> str: + """ + Compute a deterministic hash of the user's permission state. + + Includes is_staff, is_superuser, and all Django permissions. + When the MWT expires and is refreshed, the new pkey reflects + any permission changes. The short TTL controls the staleness window. + + Returns a 16-character hex digest (SHA-256 truncated). + """ + perms = sorted(user.get_all_permissions()) if hasattr(user, "get_all_permissions") else [] + staff = "1" if getattr(user, "is_staff", False) else "0" + superuser = "1" if getattr(user, "is_superuser", False) else "0" + blob = f"{staff}:{superuser}:{','.join(perms)}" + return hashlib.sha256(blob.encode("utf-8")).hexdigest()[:16] + + +def create_mwt( + user: Any, + secret: str, + ttl: int = 300, + audience: str = "mizan", + kid: str = "v1", +) -> str: + """ + Create an MWT from an authenticated Django user. + + Args: + user: Django user object (must have pk, is_staff, is_superuser). + secret: MIZAN_CACHE_SECRET signing key. + ttl: Token lifetime in seconds (default: 300 = 5 minutes). + audience: Audience claim for cross-tenant protection. + kid: Key ID for future secret rotation. + + Returns: + Encoded JWT string. + """ + now = int(time.time()) + payload = { + "sub": str(user.pk), + "staff": getattr(user, "is_staff", False), + "super": getattr(user, "is_superuser", False), + "pkey": compute_permission_key(user), + "aud": audience, + "kid": kid, + "iat": now, + "exp": now + ttl, + } + return jwt.encode(payload, secret, algorithm="HS256") + + +def decode_mwt( + token: str, + secret: str, + audience: str = "mizan", +) -> MWTPayload | None: + """ + Decode and validate an MWT. + + Returns MWTPayload on success, None on any failure (expired, invalid + signature, wrong audience, malformed). + """ + try: + data = jwt.decode( + token, + secret, + algorithms=["HS256"], + audience=audience, + ) + except jwt.PyJWTError: + return None + + try: + return MWTPayload( + sub=data["sub"], + staff=data.get("staff", False), + super=data.get("super", False), + pkey=data.get("pkey", ""), + kid=data.get("kid", "v1"), + aud=audience, + iat=data["iat"], + exp=data["exp"], + ) + except (KeyError, TypeError): + return None diff --git a/packages/mizan-django/src/mizan/setup/settings.py b/packages/mizan-django/src/mizan/setup/settings.py index 0561e85..36f6c58 100644 --- a/packages/mizan-django/src/mizan/setup/settings.py +++ b/packages/mizan-django/src/mizan/setup/settings.py @@ -23,6 +23,9 @@ class mizanSettings: # Redis URL for cache backend (None = cache disabled) cache_redis_url: str | None + # MWT token lifetime in seconds (default: 300 = 5 minutes) + mwt_ttl: int + @lru_cache def get_settings() -> mizanSettings: @@ -38,6 +41,7 @@ def get_settings() -> mizanSettings: debug_expose_names=getattr(django_settings, "mizan_DEBUG_EXPOSE_NAMES", True), cache_secret=getattr(django_settings, "MIZAN_CACHE_SECRET", None), cache_redis_url=getattr(django_settings, "MIZAN_CACHE_REDIS_URL", None), + mwt_ttl=getattr(django_settings, "MIZAN_MWT_TTL", 300), ) diff --git a/packages/mizan-django/src/mizan/tests/test_core.py b/packages/mizan-django/src/mizan/tests/test_core.py index 633a748..154735d 100644 --- a/packages/mizan-django/src/mizan/tests/test_core.py +++ b/packages/mizan-django/src/mizan/tests/test_core.py @@ -3252,3 +3252,201 @@ class CachePolicyIntegrationTests(TestCase): reset_cache() clear_settings_cache() + + +# ── MWT (Mizan Web Token) tests ──────────────────────────────────────────── + + +class MWTCreationTests(TestCase): + """Tests for MWT creation and decoding.""" + + SECRET = "test-mwt-secret-that-is-32bytes!" + + def _make_user(self, **kwargs): + user = MagicMock() + user.pk = kwargs.get("pk", 1) + user.is_staff = kwargs.get("is_staff", False) + user.is_superuser = kwargs.get("is_superuser", False) + user.get_all_permissions = MagicMock(return_value=kwargs.get("perms", set())) + return user + + def test_create_and_decode(self): + """Create an MWT and decode it successfully.""" + from mizan.mwt import create_mwt, decode_mwt + + user = self._make_user(pk=42, is_staff=True) + token = create_mwt(user, self.SECRET, ttl=300) + payload = decode_mwt(token, self.SECRET) + + self.assertIsNotNone(payload) + self.assertEqual(payload.sub, "42") + self.assertTrue(payload.staff) + self.assertFalse(payload.super) + self.assertEqual(payload.kid, "v1") + self.assertEqual(len(payload.pkey), 16) + + def test_decode_expired(self): + """Expired MWT returns None.""" + from mizan.mwt import create_mwt, decode_mwt + + user = self._make_user() + token = create_mwt(user, self.SECRET, ttl=-1) + payload = decode_mwt(token, self.SECRET) + self.assertIsNone(payload) + + def test_decode_wrong_secret(self): + """MWT signed with wrong secret returns None.""" + from mizan.mwt import create_mwt, decode_mwt + + user = self._make_user() + token = create_mwt(user, self.SECRET) + payload = decode_mwt(token, "wrong-secret") + self.assertIsNone(payload) + + def test_decode_wrong_audience(self): + """MWT with wrong audience returns None.""" + from mizan.mwt import create_mwt, decode_mwt + + user = self._make_user() + token = create_mwt(user, self.SECRET, audience="app1") + payload = decode_mwt(token, self.SECRET, audience="app2") + self.assertIsNone(payload) + + def test_mwt_user_has_pkey(self): + """MWTUser carries the permission key.""" + from mizan.mwt import create_mwt, decode_mwt, MWTUser + + user = self._make_user(pk=5, perms={"app.view_thing"}) + token = create_mwt(user, self.SECRET) + payload = decode_mwt(token, self.SECRET) + mwt_user = MWTUser(payload) + + self.assertEqual(mwt_user.pk, 5) + self.assertTrue(mwt_user.is_authenticated) + self.assertEqual(len(mwt_user.pkey), 16) + + +class PermissionKeyTests(TestCase): + """Tests for pkey determinism and sensitivity.""" + + def _make_user(self, **kwargs): + user = MagicMock() + user.pk = kwargs.get("pk", 1) + user.is_staff = kwargs.get("is_staff", False) + user.is_superuser = kwargs.get("is_superuser", False) + user.get_all_permissions = MagicMock(return_value=kwargs.get("perms", set())) + return user + + def test_deterministic(self): + """Same permissions produce same pkey.""" + from mizan.mwt import compute_permission_key + + user = self._make_user(perms={"app.view_thing", "app.add_thing"}) + pkey1 = compute_permission_key(user) + pkey2 = compute_permission_key(user) + self.assertEqual(pkey1, pkey2) + + def test_changes_on_permission_change(self): + """Different permissions produce different pkey.""" + from mizan.mwt import compute_permission_key + + user1 = self._make_user(perms={"app.view_thing"}) + user2 = self._make_user(perms={"app.view_thing", "app.add_thing"}) + self.assertNotEqual(compute_permission_key(user1), compute_permission_key(user2)) + + def test_changes_on_staff_change(self): + """Staff status change produces different pkey.""" + from mizan.mwt import compute_permission_key + + user_normal = self._make_user(is_staff=False) + user_staff = self._make_user(is_staff=True) + self.assertNotEqual( + compute_permission_key(user_normal), + compute_permission_key(user_staff), + ) + + +class MWTAuthIntegrationTests(TestCase): + """Tests for MWT authentication in the executor.""" + + SECRET = "test-mwt-auth-secret-thats-32bytes!" # 32+ bytes for HS256 + + def setUp(self): + clear_registry() + self.factory = RequestFactory() + from mizan.setup.settings import clear_settings_cache + clear_settings_cache() + + UserCtx = ReactContext("user") + + @client(context=UserCtx, auth=True) + def protected_fn(request: HttpRequest, user_id: int) -> dict: + return {"viewer": request.user.pk} + + register(protected_fn, "protected_fn") + + def tearDown(self): + clear_registry() + from mizan.setup.settings import clear_settings_cache + clear_settings_cache() + + def test_mwt_auth_via_header(self): + """Request with valid X-Mizan-Token authenticates.""" + from mizan.mwt import create_mwt + from mizan.client.executor import _try_mwt_auth + from django.test import override_settings + + user = MagicMock() + user.pk = 5 + user.is_staff = False + user.is_superuser = False + user.get_all_permissions = MagicMock(return_value=set()) + + token = create_mwt(user, self.SECRET) + + request = self.factory.get("/") + request.META["HTTP_X_MIZAN_TOKEN"] = token + request.user = MagicMock(is_authenticated=False) + + with override_settings(MIZAN_CACHE_SECRET=self.SECRET): + from mizan.setup.settings import clear_settings_cache + clear_settings_cache() + result = _try_mwt_auth(request) + + self.assertTrue(result) + self.assertEqual(request.user.pk, 5) + self.assertTrue(request.user.is_authenticated) + + def test_mwt_invalid_returns_401(self): + """Invalid X-Mizan-Token returns 401 on context fetch.""" + from django.test import override_settings + + with override_settings(MIZAN_CACHE_SECRET=self.SECRET): + from mizan.setup.settings import clear_settings_cache + clear_settings_cache() + response = self.client.get( + "/api/mizan/ctx/user/?user_id=5", + HTTP_X_MIZAN_TOKEN="invalid-token", + ) + + self.assertEqual(response.status_code, 401) + + def test_legacy_jwt_still_works(self): + """Authorization: Bearer still accepted alongside MWT.""" + from mizan.jwt.tokens import create_token_pair + from tests.models import EmailUser + + user = EmailUser.objects.create_user(email="legacy@test.com", password="pass") + self.client.login(email="legacy@test.com", password="pass") + session_key = self.client.session.session_key + + tokens = create_token_pair( + user.pk, session_key, + is_staff=False, is_superuser=False, + ) + + response = self.client.get( + "/api/mizan/ctx/user/?user_id=5", + HTTP_AUTHORIZATION=f"Bearer {tokens.access_token}", + ) + self.assertEqual(response.status_code, 200)