From 63c9a9c4ce243b65f7e3dda4eae1f4cf925ab4a6 Mon Sep 17 00:00:00 2001 From: Ryth Azhur Date: Wed, 6 May 2026 16:51:03 -0400 Subject: [PATCH] mizan-fastapi: more Pythonic and declarative MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reworked the MVP code along the lines Ryth flagged. Same behavior (11/11 tests still pass), tighter idiom. executor.py: - Replaced FunctionResult / FunctionError dataclasses with a MizanError exception hierarchy (NotFound, BadRequest, ValidationFailed, Unauthorized, Forbidden, NotImplementedYet, InternalError). Each carries its own ErrorCode + HTTP status; the dispatcher path raises rather than returning sentinel objects. - Auth check uses match/case for the requirement (True / 'staff' / 'superuser' / callable / other) — single declarative dispatch instead of an if/elif chain. - Broke up the single 80-line execute_function into focused helpers: _resolve_function, _enforce_auth, _validate_input, _serialize, _invalidation_target. The execute_function body now reads as five declarative steps. - Input validation uses Pydantic's model_fields[name].is_required() directly and a list comprehension for required-field reporting, instead of round-tripping through model_json_schema(). router.py: - POST /call/ now declares its body as a Pydantic CallBody model; FastAPI handles parsing + envelope validation. No more manual await request.json() + dict[get] dancing. - Endpoint bodies shrink to 3-5 lines each. Context fetch uses a dict comprehension over the function group. - mizan_exception_handler renders MizanError to the protocol's {error: {code, message, details}} envelope. - mizan_validation_handler maps FastAPI's RequestValidationError to the same envelope under BAD_REQUEST so the wire format is uniform whether the failure is body-shape or business validation. __init__.py: exposes the full exception hierarchy + both handlers so consumers can wire them onto their FastAPI app declaratively: app.add_exception_handler(MizanError, mizan_exception_handler) app.add_exception_handler(RequestValidationError, mizan_validation_handler) Verified: mizan-core 15/15, mizan-django 348 pass, mizan-fastapi 11/11. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/mizan_fastapi/__init__.py | 46 ++- .../src/mizan_fastapi/executor.py | 269 +++++++++--------- .../mizan-fastapi/src/mizan_fastapi/router.py | 124 ++++---- backends/mizan-fastapi/tests/test_dispatch.py | 10 +- 4 files changed, 228 insertions(+), 221 deletions(-) diff --git a/backends/mizan-fastapi/src/mizan_fastapi/__init__.py b/backends/mizan-fastapi/src/mizan_fastapi/__init__.py index 99e79ed..13c2bc4 100644 --- a/backends/mizan-fastapi/src/mizan_fastapi/__init__.py +++ b/backends/mizan-fastapi/src/mizan_fastapi/__init__.py @@ -1,30 +1,54 @@ """ mizan-fastapi — FastAPI backend adapter for the Mizan protocol. -Provides HTTP RPC dispatch and context bundling on top of mizan-core's -function registry. Channels, Forms, Shapes, SSR are out of scope for -the FastAPI adapter — FastAPI projects use native equivalents (WebSocket, -Pydantic models, ORM-of-choice, server-side rendering frameworks). +HTTP RPC dispatch and context bundling on top of mizan-core's function +registry. Channels, Forms, Shapes, SSR are out of scope — FastAPI +projects use native equivalents (WebSocket, Pydantic, ORM-of-choice, +SSR frameworks). Usage: from fastapi import FastAPI - from mizan_fastapi import router as mizan_router + from mizan_fastapi import router, mizan_exception_handler, MizanError app = FastAPI() - app.include_router(mizan_router, prefix="/api/mizan") + app.include_router(router, prefix="/api/mizan") + app.add_exception_handler(MizanError, mizan_exception_handler) # Register your @client-decorated functions from mizan_core.client.function import client - from .my_functions import * # noqa + from mizan_core.registry import register + from .my_functions import echo + register(echo, "echo") """ -from .router import router -from .executor import execute_function, ErrorCode, FunctionError, FunctionResult +from .executor import ( + ErrorCode, + MizanError, + NotFound, + BadRequest, + ValidationFailed, + Unauthorized, + Forbidden, + NotImplementedYet, + InternalError, + compute_invalidation, + execute_function, +) +from .router import router, mizan_exception_handler, mizan_validation_handler __all__ = [ "router", + "mizan_exception_handler", + "mizan_validation_handler", "execute_function", + "compute_invalidation", "ErrorCode", - "FunctionError", - "FunctionResult", + "MizanError", + "NotFound", + "BadRequest", + "ValidationFailed", + "Unauthorized", + "Forbidden", + "NotImplementedYet", + "InternalError", ] diff --git a/backends/mizan-fastapi/src/mizan_fastapi/executor.py b/backends/mizan-fastapi/src/mizan_fastapi/executor.py index 5e99bce..8f6bf0c 100644 --- a/backends/mizan-fastapi/src/mizan_fastapi/executor.py +++ b/backends/mizan-fastapi/src/mizan_fastapi/executor.py @@ -1,17 +1,14 @@ """ -RPC dispatch — looks up registered functions by name, validates input -against the function's Pydantic Input model, executes, and returns the -result wrapped in a normalized FunctionResult / FunctionError. +RPC dispatch — looks up registered functions, validates input against the +function's Pydantic Input model, executes, and returns the serialized result. -Backend-agnostic where possible. The only FastAPI-specific bits are the -Request type hint (kept loose as Any) and the auth-check mechanism, which -expects FastAPI to populate request.state.user via dependency injection -or middleware before dispatch. +Errors raise typed exceptions (MizanError subclasses). Wire those to JSON +responses by registering `mizan_exception_handler` on the FastAPI app, or +let them propagate to your own handler. """ from __future__ import annotations -from dataclasses import dataclass from enum import Enum from typing import Any @@ -20,7 +17,7 @@ from pydantic import BaseModel, ValidationError from mizan_core.registry import get_function -# ─── Error / Result types ─────────────────────────────────────────────────── +# ─── Error taxonomy ───────────────────────────────────────────────────────── class ErrorCode(str, Enum): @@ -33,164 +30,168 @@ class ErrorCode(str, Enum): INTERNAL_ERROR = "INTERNAL_ERROR" -@dataclass -class FunctionError: - code: ErrorCode - message: str - details: dict[str, Any] | None = None +_STATUS = { + ErrorCode.NOT_FOUND: 404, + ErrorCode.BAD_REQUEST: 400, + ErrorCode.VALIDATION_ERROR: 422, + ErrorCode.UNAUTHORIZED: 401, + ErrorCode.FORBIDDEN: 403, + ErrorCode.NOT_IMPLEMENTED: 501, + ErrorCode.INTERNAL_ERROR: 500, +} -@dataclass -class FunctionResult: - data: Any # serialized return value (dict, list, primitive, or Pydantic model_dump) +class MizanError(Exception): + """Base for protocol-level dispatch errors.""" + + code: ErrorCode = ErrorCode.INTERNAL_ERROR + + def __init__(self, message: str, *, details: dict[str, Any] | None = None) -> None: + super().__init__(message) + self.message = message + self.details = details + + @property + def status_code(self) -> int: + return _STATUS[self.code] + + +class NotFound(MizanError): code = ErrorCode.NOT_FOUND # noqa: E701 +class BadRequest(MizanError): code = ErrorCode.BAD_REQUEST # noqa: E701 +class ValidationFailed(MizanError): code = ErrorCode.VALIDATION_ERROR # noqa: E701 +class Unauthorized(MizanError): code = ErrorCode.UNAUTHORIZED # noqa: E701 +class Forbidden(MizanError): code = ErrorCode.FORBIDDEN # noqa: E701 +class NotImplementedYet(MizanError): code = ErrorCode.NOT_IMPLEMENTED # noqa: E701 +class InternalError(MizanError): code = ErrorCode.INTERNAL_ERROR # noqa: E701 # ─── Auth ─────────────────────────────────────────────────────────────────── -def _check_auth(request: Any, auth_requirement: Any) -> FunctionError | None: - """ - Verify the request meets the function's auth requirement. +def _user(request: Any) -> Any: + return getattr(getattr(request, "state", None), "user", None) - The auth value comes from @client(auth=...). FastAPI projects are expected - to populate `request.state.user` (or compatible) via middleware. If - request has no `.state` or `.state.user`, treats the user as anonymous. - """ - if auth_requirement is None: + +def _is_authenticated(user: Any) -> bool: + return bool(user) and getattr(user, "is_authenticated", True) + + +def _enforce_auth(request: Any, requirement: Any) -> None: + """Verify the request meets the function's @client(auth=...) requirement, or raise.""" + if requirement is None: + return + + user = _user(request) + + match requirement: + case True: + if not _is_authenticated(user): + raise Unauthorized("Authentication required") + case "staff": + if not _is_authenticated(user): + raise Unauthorized("Authentication required") + if not getattr(user, "is_staff", False): + raise Forbidden("Staff access required") + case "superuser": + if not _is_authenticated(user): + raise Unauthorized("Authentication required") + if not getattr(user, "is_superuser", False): + raise Forbidden("Superuser access required") + case f if callable(f): + if not f(request): + raise Forbidden("Permission denied") + case other: + raise InternalError(f"Unknown auth requirement: {other!r}") + + +# ─── Input validation ─────────────────────────────────────────────────────── + + +def _validate_input(input_cls: Any, input_data: Any) -> BaseModel | None: + """Validate input_data against the function's Input model. Returns the instance or None.""" + if input_cls in (None, BaseModel) or not getattr(input_cls, "model_fields", None): return None - user = getattr(getattr(request, "state", None), "user", None) - is_authenticated = bool(user) and getattr(user, "is_authenticated", True) + fields = input_cls.model_fields + required = [name for name, f in fields.items() if f.is_required()] - if auth_requirement is True: - if not is_authenticated: - return FunctionError(ErrorCode.UNAUTHORIZED, "Authentication required") - return None + if not input_data: + if required: + raise ValidationFailed( + "Input validation failed", + details={"fields": {name: ["Field required"] for name in required}}, + ) + return input_cls() - if auth_requirement == "staff": - if not is_authenticated: - return FunctionError(ErrorCode.UNAUTHORIZED, "Authentication required") - if not getattr(user, "is_staff", False): - return FunctionError(ErrorCode.FORBIDDEN, "Staff access required") - return None + if not isinstance(input_data, dict): + raise BadRequest(f"Input must be an object, got {type(input_data).__name__}") - if auth_requirement == "superuser": - if not is_authenticated: - return FunctionError(ErrorCode.UNAUTHORIZED, "Authentication required") - if not getattr(user, "is_superuser", False): - return FunctionError(ErrorCode.FORBIDDEN, "Superuser access required") - return None - - if callable(auth_requirement): - if not auth_requirement(request): - return FunctionError(ErrorCode.FORBIDDEN, "Permission denied") - return None - - return FunctionError( - ErrorCode.INTERNAL_ERROR, - f"Unknown auth requirement: {auth_requirement!r}", - ) + try: + return input_cls(**input_data) + except ValidationError as e: + raise ValidationFailed( + "Input validation failed", + details={"errors": e.errors()}, + ) from e # ─── Dispatch ─────────────────────────────────────────────────────────────── +def _resolve_function(fn_name: str) -> Any: + view_class = get_function(fn_name) + if view_class is None: + raise NotFound("Function not found") + if getattr(view_class, "_meta", {}).get("private"): + raise Forbidden("Function is not client-callable") + return view_class + + +def _serialize(result: Any) -> Any: + return result.model_dump(mode="json") if isinstance(result, BaseModel) else result + + def execute_function( request: Any, fn_name: str, input_data: dict[str, Any] | None = None, -) -> FunctionResult | FunctionError: - """ - Look up a registered function by name, validate input, execute, return result. - """ - view_class = get_function(fn_name) - if view_class is None: - return FunctionError(ErrorCode.NOT_FOUND, "Function not found") - - meta = getattr(view_class, "_meta", {}) - - if meta.get("private"): - return FunctionError(ErrorCode.FORBIDDEN, "Function is not client-callable") - - auth_error = _check_auth(request, meta.get("auth")) - if auth_error is not None: - return auth_error +) -> Any: + """Dispatch a registered function. Returns the serialized result, or raises MizanError.""" + view_class = _resolve_function(fn_name) + _enforce_auth(request, view_class._meta.get("auth")) view = view_class(request) - input_cls = view.Input + validated = _validate_input(view.Input, input_data) - # Pydantic input validation - has_fields = bool(getattr(input_cls, "model_fields", None)) if input_cls else False - - if input_data is not None and has_fields: - if not isinstance(input_data, dict): - return FunctionError( - ErrorCode.BAD_REQUEST, - f"Input must be an object, got {type(input_data).__name__}", - ) - try: - validated_input = input_cls(**input_data) - except ValidationError as e: - return FunctionError( - ErrorCode.VALIDATION_ERROR, - "Input validation failed", - details={"errors": e.errors()}, - ) - elif has_fields: - # Function expects input but none provided - required = input_cls.model_json_schema().get("required", []) - if required: - return FunctionError( - ErrorCode.VALIDATION_ERROR, - "Input validation failed", - details={"fields": {field: ["Field required"] for field in required}}, - ) - validated_input = input_cls() - else: - validated_input = None - - # Execute. The wrapper's call(input) always takes the input arg, - # passing None when the function has no fields. try: - result = view.call(validated_input) + result = view.call(validated) except NotImplementedError as e: - return FunctionError(ErrorCode.NOT_IMPLEMENTED, str(e) or "Not implemented") + raise NotImplementedYet(str(e) or "Not implemented") from e + except MizanError: + raise except Exception as e: - return FunctionError(ErrorCode.INTERNAL_ERROR, str(e)) + raise InternalError(str(e)) from e - # Serialize Pydantic models to plain dicts - if isinstance(result, BaseModel): - return FunctionResult(data=result.model_dump(mode="json")) - return FunctionResult(data=result) + return _serialize(result) # ─── Invalidation ─────────────────────────────────────────────────────────── def compute_invalidation(view_class: Any, input_data: dict[str, Any] | None) -> list[Any]: - """ - Build the invalidate list for a mutation response from the function's - @client(affects=...) metadata. Auto-scopes to params when the mutation's - arg names overlap with a context's params. - """ - meta = getattr(view_class, "_meta", {}) - affects = meta.get("affects") - if not affects: - return [] + """Build the `invalidate` list from @client(affects=...) metadata, auto-scoping when arg names match context params.""" + affects = getattr(view_class, "_meta", {}).get("affects") or [] + return [_invalidation_target(target, input_data or {}) for target in affects] - out: list[Any] = [] - for target in affects: - target_type = target.get("type") - target_name = target.get("name") - if target_type == "context": - scope_params = target.get("params") or {} - if scope_params and input_data: - # Auto-scope: include matching param values - matched = {k: input_data[k] for k in scope_params if k in input_data} - if matched: - out.append({"context": target_name, "params": matched}) - continue - out.append(target_name) - elif target_type == "function": - out.append({"function": target_name}) - return out + +def _invalidation_target(target: dict[str, Any], input_data: dict[str, Any]) -> Any: + match target.get("type"): + case "context": + name = target["name"] + scope_keys = (target.get("params") or {}).keys() + scoped = {k: input_data[k] for k in scope_keys if k in input_data} + return {"context": name, "params": scoped} if scoped else name + case "function": + return {"function": target["name"]} + case _: + return target diff --git a/backends/mizan-fastapi/src/mizan_fastapi/router.py b/backends/mizan-fastapi/src/mizan_fastapi/router.py index 129c338..febab0f 100644 --- a/backends/mizan-fastapi/src/mizan_fastapi/router.py +++ b/backends/mizan-fastapi/src/mizan_fastapi/router.py @@ -4,13 +4,12 @@ FastAPI router exposing Mizan's HTTP endpoints: POST /call/ — RPC dispatch GET /ctx/{context_name}/ — bundled context fetch -Mount under /api/mizan in your FastAPI app: - from fastapi import FastAPI - from mizan_fastapi import router as mizan_router + from mizan_fastapi import router, mizan_exception_handler, MizanError app = FastAPI() - app.include_router(mizan_router, prefix="/api/mizan") + app.include_router(router, prefix="/api/mizan") + app.add_exception_handler(MizanError, mizan_exception_handler) """ from __future__ import annotations @@ -18,14 +17,16 @@ from __future__ import annotations from typing import Any from fastapi import APIRouter, Request +from fastapi.exceptions import RequestValidationError from fastapi.responses import JSONResponse +from pydantic import BaseModel, Field from mizan_core.registry import get_context_groups, get_function from .executor import ( ErrorCode, - FunctionError, - FunctionResult, + MizanError, + NotFound, compute_invalidation, execute_function, ) @@ -34,85 +35,58 @@ from .executor import ( router = APIRouter() -_HTTP_STATUS_FOR_ERROR = { - ErrorCode.NOT_FOUND: 404, - ErrorCode.BAD_REQUEST: 400, - ErrorCode.VALIDATION_ERROR: 422, - ErrorCode.UNAUTHORIZED: 401, - ErrorCode.FORBIDDEN: 403, - ErrorCode.NOT_IMPLEMENTED: 501, - ErrorCode.INTERNAL_ERROR: 500, -} +def _no_store(payload: Any, status_code: int = 200) -> JSONResponse: + return JSONResponse(payload, status_code=status_code, headers={"Cache-Control": "no-store"}) -def _error_response(err: FunctionError) -> JSONResponse: - body: dict[str, Any] = { - "error": { - "code": err.code.value, - "message": err.message, - } - } - if err.details: - body["error"]["details"] = err.details - return JSONResponse( - body, - status_code=_HTTP_STATUS_FOR_ERROR.get(err.code, 500), - headers={"Cache-Control": "no-store"}, - ) +# ─── Endpoints ────────────────────────────────────────────────────────────── + + +class CallBody(BaseModel): + fn: str = Field(..., min_length=1) + args: dict[str, Any] = Field(default_factory=dict) @router.post("/call/") -async def function_call(request: Request) -> JSONResponse: - """RPC dispatch — `{"fn": "name", "args": {...}}` → `{"result": ..., "invalidate": [...]}`.""" - try: - body = await request.json() - except Exception: - return _error_response( - FunctionError(ErrorCode.BAD_REQUEST, "Invalid JSON body") - ) - - if not isinstance(body, dict): - return _error_response( - FunctionError(ErrorCode.BAD_REQUEST, "Body must be an object") - ) - - fn_name = body.get("fn") - if not fn_name or not isinstance(fn_name, str): - return _error_response( - FunctionError(ErrorCode.BAD_REQUEST, "Missing or invalid 'fn' field") - ) - - args = body.get("args", {}) - - outcome = execute_function(request, fn_name, args) - if isinstance(outcome, FunctionError): - return _error_response(outcome) - - view_class = get_function(fn_name) - invalidate = compute_invalidation(view_class, args) - - return JSONResponse( - {"result": outcome.data, "invalidate": invalidate}, - headers={"Cache-Control": "no-store"}, - ) +async def function_call(body: CallBody, request: Request) -> JSONResponse: + """RPC dispatch — `{"fn": "...", "args": {...}}` → `{"result": ..., "invalidate": [...]}`.""" + result = execute_function(request, body.fn, body.args) + invalidate = compute_invalidation(get_function(body.fn), body.args) + return _no_store({"result": result, "invalidate": invalidate}) @router.get("/ctx/{context_name}/") async def context_fetch(context_name: str, request: Request) -> JSONResponse: - """Bundled context fetch — returns `{function_name: result, ...}` for every function in the named context.""" - groups = get_context_groups() - fn_names = groups.get(context_name) + """Bundled context fetch — `{function_name: result, ...}` for every function in the context.""" + fn_names = get_context_groups().get(context_name) if not fn_names: - return _error_response( - FunctionError(ErrorCode.NOT_FOUND, f"Context '{context_name}' not found") - ) + raise NotFound(f"Context '{context_name}' not found") params = dict(request.query_params) - result: dict[str, Any] = {} - for fn_name in fn_names: - outcome = execute_function(request, fn_name, params) - if isinstance(outcome, FunctionError): - return _error_response(outcome) - result[fn_name] = outcome.data + bundled = {fn: execute_function(request, fn, params) for fn in fn_names} + return _no_store(bundled) - return JSONResponse(result, headers={"Cache-Control": "no-store"}) + +# ─── Exception handler ────────────────────────────────────────────────────── + + +async def mizan_exception_handler(_request: Request, exc: MizanError) -> JSONResponse: + """FastAPI exception handler — renders MizanError to the protocol's error envelope.""" + body: dict[str, Any] = {"error": {"code": exc.code.value, "message": exc.message}} + if exc.details: + body["error"]["details"] = exc.details + return _no_store(body, status_code=exc.status_code) + + +async def mizan_validation_handler(_request: Request, exc: RequestValidationError) -> JSONResponse: + """Maps malformed request bodies (invalid JSON, missing top-level fields) to BAD_REQUEST.""" + return _no_store( + { + "error": { + "code": ErrorCode.BAD_REQUEST.value, + "message": "Invalid request body", + "details": {"errors": exc.errors()}, + } + }, + status_code=400, + ) diff --git a/backends/mizan-fastapi/tests/test_dispatch.py b/backends/mizan-fastapi/tests/test_dispatch.py index 4e24cc1..ffbd1d5 100644 --- a/backends/mizan-fastapi/tests/test_dispatch.py +++ b/backends/mizan-fastapi/tests/test_dispatch.py @@ -4,12 +4,18 @@ from __future__ import annotations import pytest from fastapi import FastAPI +from fastapi.exceptions import RequestValidationError from fastapi.testclient import TestClient from pydantic import BaseModel from mizan_core.client.function import client from mizan_core.registry import clear_registry, register -from mizan_fastapi import router as mizan_router +from mizan_fastapi import ( + MizanError, + mizan_exception_handler, + mizan_validation_handler, + router as mizan_router, +) # ─── Fixtures ─────────────────────────────────────────────────────────────── @@ -61,6 +67,8 @@ def app(): fastapi_app = FastAPI() fastapi_app.include_router(mizan_router, prefix="/api/mizan") + fastapi_app.add_exception_handler(MizanError, mizan_exception_handler) + fastapi_app.add_exception_handler(RequestValidationError, mizan_validation_handler) yield fastapi_app