FastAPI and TypeScript improved

This commit is contained in:
2026-06-04 05:14:29 -04:00
parent 67ad91b673
commit 66b2db81fb
28 changed files with 1864 additions and 717 deletions

View File

@@ -30,6 +30,9 @@ from pydantic import BaseModel, ValidationError
from mizan.cache import get_cache, cache_get, cache_put, cache_purge
from mizan_core.registry import get_function, get_context_groups
from mizan_core.upload import UploadedFile, bind_uploads
from mizan_core import invalidation as _core_inval
from mizan_core.authguard import enforce_auth as _core_enforce_auth
from mizan_core.errors import MizanError as _CoreMizanError
from mizan.setup.settings import get_settings
if TYPE_CHECKING:
@@ -113,53 +116,14 @@ def _check_auth_requirement(
Django User (from session). Either way, no additional DB query is made
for the built-in checks. Custom callables may query DB if they choose.
"""
if auth_requirement is None:
# Evaluation lives in the shared core (mizan_core.authguard); the callable
# path receives the native Django request. Core raises; we render to the
# Django-shim FunctionError shape the executor expects.
try:
_core_enforce_auth(getattr(request, "user", None), auth_requirement, request)
return None
user = request.user
# Handle callable auth
if callable(auth_requirement):
try:
result = auth_requirement(request)
if result:
return None # Authorized
else:
return FunctionError(
code=ErrorCode.FORBIDDEN,
message="Access denied",
)
except PermissionError as e:
# Custom error message from the callable
return FunctionError(
code=ErrorCode.FORBIDDEN,
message=str(e) or "Access denied",
)
# Check authentication (required for all string-based auth)
if not getattr(user, "is_authenticated", False):
return FunctionError(
code=ErrorCode.UNAUTHORIZED,
message="Authentication required",
)
# Check staff requirement
if auth_requirement == "staff":
if not getattr(user, "is_staff", False):
return FunctionError(
code=ErrorCode.FORBIDDEN,
message="Staff access required",
)
# Check superuser requirement
elif auth_requirement == "superuser":
if not getattr(user, "is_superuser", False):
return FunctionError(
code=ErrorCode.FORBIDDEN,
message="Superuser access required",
)
return None
except _CoreMizanError as e:
return FunctionError(code=ErrorCode(e.code.value), message=e.message)
_cache_log = logging.getLogger("mizan.cache")
@@ -198,51 +162,6 @@ def _purge_cache_for_invalidation(
_cache_log.warning("Cache purge failed", exc_info=True)
def _resolve_affects_target(target_name: str) -> tuple[str, str, str | None]:
"""
Determine whether an affects target is a context name or function name.
Returns:
("context", "user", None) — full context invalidation
("function", "user_profile", "user") — function within context
"""
groups = get_context_groups()
# Check if it's a context name directly
if target_name in groups:
return ("context", target_name, None)
# Check if it's a function name within a context
for ctx_name, fn_names in groups.items():
if target_name in fn_names:
return ("function", target_name, ctx_name)
# Not a context or context function — treat as context name anyway
# (it might be a non-context function or an as-yet-unregistered context)
return ("context", target_name, None)
def _get_context_param_names(context_name: str) -> set[str]:
"""
Get the set of parameter names used by functions in a context.
Returns the union of all Input field names across context functions.
"""
groups = get_context_groups()
fn_names = groups.get(context_name, [])
param_names: set[str] = set()
for fn_name in fn_names:
fn_cls = get_function(fn_name)
if fn_cls is None:
continue
input_cls = getattr(fn_cls, "Input", None)
if input_cls and input_cls is not BaseModel and hasattr(input_cls, "model_fields"):
param_names.update(input_cls.model_fields.keys())
return param_names
def _resolve_invalidation(
view_class: type | None,
input_data: dict[str, Any] | None = None,
@@ -261,49 +180,7 @@ def _resolve_invalidation(
Returns a list suitable for both JSON body and header serialization.
Returns None if no invalidation needed.
"""
if view_class is None:
return None
meta = getattr(view_class, "_meta", {})
affects = meta.get("affects")
if not affects:
return None
result = []
seen = set()
for target in affects:
if target["type"] == "context":
target_name = target["name"]
elif target["type"] == "function" and target.get("context"):
# Function-level: use the function name as the invalidation key
target_name = target["name"]
else:
continue
if target_name in seen:
continue
seen.add(target_name)
# Resolve the context this target belongs to (for param lookup)
resolved = _resolve_affects_target(target_name)
ctx_for_params = resolved[2] if resolved[0] == "function" else resolved[1]
# Tier 1: argument name matching
if input_data and ctx_for_params:
context_params = _get_context_param_names(ctx_for_params)
matched = {
k: v for k, v in input_data.items()
if k in context_params
}
if matched:
result.append({"context": target_name, "params": matched})
continue
# Tier 3: broad fallback
result.append(target_name)
return result if result else None
return _core_inval.resolve_invalidation(view_class, input_data)
def _resolve_merges(
@@ -322,94 +199,12 @@ def _resolve_merges(
Mirrors _resolve_invalidation's tier-1 auto-scoping for params.
Entries whose slot can't be uniquely resolved are dropped.
"""
if view_class is None:
return None
from mizan_core.type_utils import types_match_for_merge
meta = getattr(view_class, "_meta", {})
targets = meta.get("merge") or []
if not targets:
return None
mutation_output = getattr(view_class, "Output", None)
out: list[dict[str, Any]] = []
seen: set[str] = set()
for ctx_name in targets:
if ctx_name in seen:
continue
seen.add(ctx_name)
slot = _resolve_merge_slot(ctx_name, mutation_output, types_match_for_merge)
if slot is None:
continue
entry: dict[str, Any] = {"context": ctx_name, "slot": slot, "value": result_data}
if input_data:
context_params = _get_context_param_names(ctx_name)
matched = {
k: v for k, v in input_data.items()
if k in context_params
}
if matched:
entry["params"] = matched
out.append(entry)
return out
return _core_inval.resolve_merges(view_class, input_data, result_data)
def _resolve_merge_slot(context_name: str, mutation_output: Any, type_matcher: Any) -> str | None:
"""Find the unique function-name slot in context whose return type matches mutation's output."""
if mutation_output is None:
return None
groups = get_context_groups()
fn_names = groups.get(context_name, [])
matches: list[str] = []
for fn_name in fn_names:
fn_cls = get_function(fn_name)
if fn_cls is None:
continue
fn_output = getattr(fn_cls, "Output", None)
if fn_output is not None and type_matcher(fn_output, mutation_output):
matches.append(fn_name)
return matches[0] if len(matches) == 1 else None
def _format_invalidate_header(
invalidate: list[str | dict[str, Any]],
) -> str:
"""
Format invalidation targets as X-Mizan-Invalidate header value.
Format: comma-separated contexts. Semicolon-separated params per context.
Param values are URL-encoded to prevent delimiter collisions.
Examples:
["user"] → "user"
["user", "notifications"] → "user, notifications"
[{"context": "user", "params": {"user_id": 5}}]
"user;user_id=5"
[{"context": "search", "params": {"q": "hello world"}}]
"search;q=hello%20world"
"""
from urllib.parse import quote
parts = []
for entry in invalidate:
if isinstance(entry, str):
parts.append(entry)
elif isinstance(entry, dict):
ctx = entry["context"]
params = entry.get("params", {})
if params:
param_str = ";".join(
f"{quote(str(k), safe='')}={quote(str(v), safe='')}"
for k, v in sorted(params.items())
)
parts.append(f"{ctx};{param_str}")
else:
parts.append(ctx)
return ", ".join(parts)
def _format_invalidate_header(invalidate: list[str | dict[str, Any]]) -> str:
"""Format invalidation targets as the X-Mizan-Invalidate header value (shared core)."""
return _core_inval.format_invalidate_header(invalidate)
def execute_function(

View File

@@ -1,245 +1,79 @@
"""
JWT Token Creation and Validation
JWT tokens — the Django adapter over the shared core (`mizan_core.auth.jwt`).
Uses PyJWT directly - no allauth dependency.
Tokens are tied to Django sessions for immediate revocation on logout.
The token logic (mint/decode/refresh, `JWTUser`, `TokenPair`, `TokenPayload`)
lives in the core; this module binds it to Django settings and keeps the
session-revocation check (`validate_session`), which is Django-session-specific.
"""
import time
from typing import NamedTuple
from __future__ import annotations
import jwt
from django.contrib.sessions.backends.base import SessionBase
from mizan_core.auth import jwt as _core_jwt
from mizan_core.auth.jwt import JWTConfig, JWTUser, TokenPair, TokenPayload
from .settings import get_settings
class TokenPair(NamedTuple):
"""Access and refresh token pair."""
access_token: str
refresh_token: str
expires_in: int
__all__ = [
"TokenPair",
"TokenPayload",
"JWTUser",
"create_access_token",
"create_refresh_token",
"create_token_pair",
"decode_token",
"validate_session",
"refresh_tokens",
]
class TokenPayload(NamedTuple):
"""Decoded token payload."""
user_id: int | str
session_key: str
token_type: str
is_staff: bool
is_superuser: bool
exp: int
iat: int
class JWTUser:
"""
Minimal user object created from JWT claims.
Used as request.user for JWT-authenticated requests.
No database query required - all data comes from the token.
If you need the full User object with all fields, query explicitly:
user = User.objects.get(pk=request.user.id)
"""
def __init__(self, payload: TokenPayload):
self.id = int(payload.user_id) if isinstance(payload.user_id, str) else payload.user_id
self.pk = self.id
self.is_staff = payload.is_staff
self.is_superuser = payload.is_superuser
self.is_authenticated = True
self.is_anonymous = False
self.is_active = True # Assumed active if they have a valid token
def __str__(self):
return f"JWTUser(id={self.id})"
def __repr__(self):
return f"JWTUser(id={self.id}, is_staff={self.is_staff}, is_superuser={self.is_superuser})"
def create_access_token(
user_id: int | str,
session_key: str,
*,
is_staff: bool = False,
is_superuser: bool = False,
) -> str:
"""
Create a short-lived access token.
The token contains:
- sub: user ID
- sid: session key (for revocation checking)
- staff: is_staff flag
- super: is_superuser flag
- type: "access"
- iat: issued at
- exp: expiration
"""
settings = get_settings()
now = int(time.time())
payload = {
"sub": str(user_id),
"sid": session_key,
"staff": is_staff,
"super": is_superuser,
"type": "access",
"iat": now,
"exp": now + settings.access_token_expires_in,
}
return jwt.encode(
payload,
settings.private_key,
algorithm=settings.algorithm,
def _config() -> JWTConfig:
s = get_settings()
return JWTConfig(
private_key=s.private_key,
public_key=s.public_key,
algorithm=s.algorithm,
access_token_expires_in=s.access_token_expires_in,
refresh_token_expires_in=s.refresh_token_expires_in,
)
def create_refresh_token(
user_id: int | str,
session_key: str,
*,
is_staff: bool = False,
is_superuser: bool = False,
) -> str:
"""
Create a longer-lived refresh token.
The token contains:
- sub: user ID
- sid: session key (for revocation checking)
- staff: is_staff flag
- super: is_superuser flag
- type: "refresh"
- iat: issued at
- exp: expiration
"""
settings = get_settings()
now = int(time.time())
payload = {
"sub": str(user_id),
"sid": session_key,
"staff": is_staff,
"super": is_superuser,
"type": "refresh",
"iat": now,
"exp": now + settings.refresh_token_expires_in,
}
return jwt.encode(
payload,
settings.private_key,
algorithm=settings.algorithm,
)
def create_access_token(user_id, session_key, *, is_staff=False, is_superuser=False) -> str:
return _core_jwt.create_access_token(user_id, session_key, _config(),
is_staff=is_staff, is_superuser=is_superuser)
def create_token_pair(
user_id: int | str,
session_key: str,
*,
is_staff: bool = False,
is_superuser: bool = False,
) -> TokenPair:
"""Create both access and refresh tokens."""
settings = get_settings()
return TokenPair(
access_token=create_access_token(
user_id, session_key, is_staff=is_staff, is_superuser=is_superuser
),
refresh_token=create_refresh_token(
user_id, session_key, is_staff=is_staff, is_superuser=is_superuser
),
expires_in=settings.access_token_expires_in,
)
def create_refresh_token(user_id, session_key, *, is_staff=False, is_superuser=False) -> str:
return _core_jwt.create_refresh_token(user_id, session_key, _config(),
is_staff=is_staff, is_superuser=is_superuser)
def decode_token(token: str, expected_type: str = None) -> TokenPayload | None:
"""
Decode and validate a JWT token.
def create_token_pair(user_id, session_key, *, is_staff=False, is_superuser=False) -> TokenPair:
return _core_jwt.create_token_pair(user_id, session_key, _config(),
is_staff=is_staff, is_superuser=is_superuser)
Returns None if:
- Token is invalid or expired
- Token type doesn't match expected_type (if specified)
"""
settings = get_settings()
try:
payload = jwt.decode(
token,
settings.public_key,
algorithms=[settings.algorithm],
)
except jwt.PyJWTError:
return None
# Validate token type if specified
if expected_type and payload.get("type") != expected_type:
return None
return TokenPayload(
user_id=payload["sub"],
session_key=payload["sid"],
token_type=payload["type"],
is_staff=payload.get("staff", False),
is_superuser=payload.get("super", False),
exp=payload["exp"],
iat=payload["iat"],
)
def decode_token(token: str, expected_type: str | None = None) -> TokenPayload | None:
return _core_jwt.decode_token(token, _config(), expected_type=expected_type)
def validate_session(session_key: str) -> bool:
"""
Check if a session is still valid (exists and not expired).
"""Immediate-logout revocation: is this Django session still alive?
This is the key to immediate logout revocation - if the session
is destroyed, tokens tied to it become invalid.
Honors `JWT_VALIDATE_SESSION` — when disabled, always True. This is the one
Django-session-bound piece; the core's `refresh_tokens` takes it as an
injected `session_validator`.
"""
from importlib import import_module
from django.conf import settings as django_settings
jwt_settings = get_settings()
if not jwt_settings.validate_session:
if not get_settings().validate_session:
return True
# Use the configured session engine
engine = import_module(django_settings.SESSION_ENGINE)
SessionStore = engine.SessionStore
# Try to load the session
session = SessionStore(session_key=session_key)
# Check if session exists and is not empty
# exists() is more reliable than checking load() result
session = engine.SessionStore(session_key=session_key)
return session.exists(session_key)
def refresh_tokens(refresh_token: str) -> TokenPair | None:
"""
Use a refresh token to obtain new tokens.
Returns None if:
- Refresh token is invalid or expired
- Associated session no longer exists
"""
payload = decode_token(refresh_token, expected_type="refresh")
if payload is None:
return None
# Validate the session still exists
if not validate_session(payload.session_key):
return None
# Issue new token pair with same claims
return create_token_pair(
payload.user_id,
payload.session_key,
is_staff=payload.is_staff,
is_superuser=payload.is_superuser,
)
return _core_jwt.refresh_tokens(refresh_token, _config(), session_validator=validate_session)

View File

@@ -170,8 +170,8 @@ class HTTPAuthTests(TestCase):
def test_jwt_expired_with_session(self):
"""Expired JWT with valid session → Reject (do NOT fall back)."""
# Create token with past expiration by mocking time
with patch("mizan.jwt.tokens.time.time", return_value=0):
# Create token with past expiration by mocking time (minting lives in the core now)
with patch("mizan_core.auth.jwt.time.time", return_value=0):
tokens = create_token_pair(
self.user.pk,
self.session_key,