Restore approved state (tree of 4effcc7 "Added LICENSE")

Roll the working tree back to the last approved shape, before the post-LICENSE span that false-greened the AFI parity matrix with symbol-presence probes and smuggled an unauthorized SQLAlchemy dependency into FastAPI's Shapes binding.

Forward commit, not a history rewrite — the six commits since 4effcc7 stay in the log as the record of what happened.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-04 14:59:53 -04:00
parent adcc027894
commit ae684a36cb
126 changed files with 1711 additions and 13265 deletions

View File

@@ -8,13 +8,8 @@ dependencies = [
"mizan-core",
"fastapi>=0.110",
"pydantic>=2.0",
"python-multipart>=0.0.9",
"sqlalchemy>=2.0",
]
[project.scripts]
mizan-fastapi-edge-manifest = "mizan_fastapi.manifest:main"
[project.optional-dependencies]
dev = [
"pytest>=8.0",

View File

@@ -2,23 +2,9 @@
mizan-fastapi — FastAPI backend adapter for the Mizan protocol.
HTTP RPC dispatch and context bundling on top of mizan-core's function
registry, sharing the auth / invalidation / cache / upload core with the
Django adapter.
The full AFI-common surface is wired here over FastAPI-native primitives,
each riding the shared core:
- WebSocket RPC — `router`'s `/ws/` route dispatches `@client(websocket=True)`
functions through the same `mizan_core.dispatch` as `POST /call/`.
- SSR — `SSRRenderer` (`mizan_fastapi.ssr`) renders React via the shared
`mizan_core.ssr.SSRBridge` Bun subprocess.
- Edge manifest / PSR — `edge_manifest` (and the `mizan-fastapi-edge-manifest`
console entry) emit the manifest derived in `mizan_core.manifest`, including
each context's `render_strategy`.
- Shapes — `mizan_fastapi.shapes.Shape` is the typed query projection bound to
SQLAlchemy (same declaration surface as the Django `django-readers` binding).
- Forms — `mizan_fastapi.forms.mizanForm` exposes schema / validate / submit
role functions over Pydantic.
registry. Channels, Forms, Shapes, SSR are out of scope — FastAPI
projects use native equivalents (WebSocket, Pydantic, ORM-of-choice,
SSR frameworks).
Usage:
from fastapi import FastAPI
@@ -48,54 +34,14 @@ from .executor import (
compute_invalidation,
execute_function,
)
# Register the FastAPI/Starlette response base so view-path detection works in
# mizan_core.client.function (a @client function returning a Response is a
# view-path function — header-only invalidation, "view" in the edge manifest).
# Must run before any @client-decorated code is evaluated.
from starlette.responses import Response as _Response
from mizan_core.client.function import set_framework_response_base as _set_response_base
_set_response_base(_Response)
from . import shapes, forms
from .router import router, mizan_exception_handler, mizan_validation_handler
from .auth import MizanAuthMiddleware, mizan_auth
from .config import MizanConfig, from_env
from .manifest import edge_manifest, generate_edge_manifest, render_strategies
from .ssr import SSRRenderer
from mizan_core.upload import File, Upload, UploadedFile
# Shapes (SQLAlchemy query projection) and Forms (Pydantic schema/validate/submit)
# are submodule bindings; expose their public primitives at the package root.
Shape = shapes.Shape
Diff = shapes.Diff
NestedDiff = shapes.NestedDiff
mizanForm = forms.mizanForm
FormConfig = forms.FormConfig
__all__ = [
"Upload",
"File",
"UploadedFile",
"mizan_auth",
"MizanAuthMiddleware",
"MizanConfig",
"from_env",
"router",
"mizan_exception_handler",
"mizan_validation_handler",
"execute_function",
"compute_invalidation",
"edge_manifest",
"generate_edge_manifest",
"render_strategies",
"SSRRenderer",
"shapes",
"forms",
"Shape",
"Diff",
"NestedDiff",
"mizanForm",
"FormConfig",
"ErrorCode",
"MizanError",
"NotFound",

View File

@@ -1,54 +0,0 @@
"""
Built-in identity for FastAPI — Django-equivalent automatic `request.state.user`.
Opt in via `Depends(mizan_auth())` on a route/router, or mount `MizanAuthMiddleware`
app-wide. Both decode a bearer-JWT (`Authorization: Bearer`) or MWT (`X-Mizan-Token`)
via the shared core and set `request.state.user`. A present-but-invalid token is
rejected (401) rather than silently downgraded — the `INVALID` sentinel contract.
If you'd rather resolve identity yourself, set `request.state.user` upstream and skip
these; dispatch reads it directly.
"""
from __future__ import annotations
from typing import Callable
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware
from mizan_core.auth import INVALID, authenticate
from mizan_core.errors import Unauthorized
from .config import get_config
def _resolve(request: Request) -> None:
ident = authenticate(request.headers, get_config(request).auth)
if ident is INVALID:
raise Unauthorized("Invalid or expired token")
if ident is not None:
request.state.user = ident
def mizan_auth() -> Callable:
"""FastAPI dependency that populates `request.state.user` from a token."""
async def _dep(request: Request) -> None:
_resolve(request)
return _dep
class MizanAuthMiddleware(BaseHTTPMiddleware):
"""App-wide variant of `mizan_auth` — resolves identity on every request."""
async def dispatch(self, request, call_next):
try:
_resolve(request)
except Unauthorized:
from .router import _no_store
from mizan_core.errors import ErrorCode
return _no_store(
{"error": {"code": ErrorCode.UNAUTHORIZED.value, "message": "Invalid or expired token"}},
status_code=401,
)
return await call_next(request)

View File

@@ -1,80 +0,0 @@
"""
FastAPI configuration — the "no settings.py" seam.
Builds the shared core's `AuthConfig` (JWT + MWT) and a `CacheOrchestrator`
from environment variables, overridable per-app via `app.state.mizan_config`.
Env:
MIZAN_CACHE_SECRET HMAC cache signing key (enables origin cache)
MIZAN_CACHE_REDIS_URL Redis URL (else in-memory cache)
MIZAN_MWT_SECRET MWT signing key
MIZAN_MWT_AUDIENCE MWT audience (default "mizan")
JWT_PRIVATE_KEY JWT signing key (enables bearer-JWT auth)
JWT_PUBLIC_KEY JWT verify key (default: private key, HS256)
JWT_ALGORITHM default "HS256"
JWT_ACCESS_TOKEN_EXPIRES_IN / JWT_REFRESH_TOKEN_EXPIRES_IN
"""
from __future__ import annotations
import os
from dataclasses import dataclass
from mizan_core.auth import AuthConfig, JWTConfig
from mizan_core.cache.backend import CacheBackend, MemoryCache
from mizan_core.dispatch import CacheOrchestrator
@dataclass(frozen=True)
class MizanConfig:
auth: AuthConfig
cache: CacheOrchestrator
def _cache_backend(secret: str | None, redis_url: str | None) -> CacheBackend | None:
if not secret:
return None
if redis_url:
from mizan_core.cache.backend import RedisCache
return RedisCache(redis_url)
return MemoryCache()
def _jwt_config() -> JWTConfig | None:
key = os.getenv("JWT_PRIVATE_KEY")
if not key:
return None
return JWTConfig(
private_key=key,
public_key=os.getenv("JWT_PUBLIC_KEY", key),
algorithm=os.getenv("JWT_ALGORITHM", "HS256"),
access_token_expires_in=int(os.getenv("JWT_ACCESS_TOKEN_EXPIRES_IN", "300")),
refresh_token_expires_in=int(os.getenv("JWT_REFRESH_TOKEN_EXPIRES_IN", "604800")),
)
def from_env() -> MizanConfig:
secret = os.getenv("MIZAN_CACHE_SECRET")
backend = _cache_backend(secret, os.getenv("MIZAN_CACHE_REDIS_URL"))
auth = AuthConfig(
jwt=_jwt_config(),
mwt_secret=os.getenv("MIZAN_MWT_SECRET"),
mwt_audience=os.getenv("MIZAN_MWT_AUDIENCE", "mizan"),
)
return MizanConfig(auth=auth, cache=CacheOrchestrator(backend, secret))
def get_config(request) -> MizanConfig:
"""Per-app config: `app.state.mizan_config` if set, else built from env (cached)."""
app = getattr(request, "app", None)
state = getattr(app, "state", None)
override = getattr(state, "mizan_config", None) if state is not None else None
if override is not None:
return override
global _DEFAULT
if _DEFAULT is None:
_DEFAULT = from_env()
return _DEFAULT
_DEFAULT: MizanConfig | None = None

View File

@@ -1,69 +1,263 @@
"""
Dispatch — a thin shim over the shared core (`mizan_core.dispatch`).
RPC dispatch — looks up registered functions, validates input against the
function's Pydantic Input model, executes, and returns the serialized result.
The protocol machinery (auth, validation, execution, invalidation, merge, cache)
lives in `mizan_core`; this module re-exports the canonical error taxonomy and
keeps backward-compatible helpers. The router drives `dispatch_call` /
`dispatch_context` directly to get invalidation + origin cache.
Errors raise typed exceptions (MizanError subclasses). Wire those to JSON
responses by registering `mizan_exception_handler` on the FastAPI app, or
let them propagate to your own handler.
"""
from __future__ import annotations
from enum import Enum
from typing import Any
from mizan_core.dispatch import CacheOrchestrator, DispatchRequest, dispatch_call
from mizan_core.errors import (
BadRequest,
ErrorCode,
Forbidden,
InternalError,
MizanError,
NotFound,
NotImplementedYet,
Unauthorized,
ValidationFailed,
)
from mizan_core.invalidation import resolve_invalidation, resolve_merges
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel, ValidationError
__all__ = [
"ErrorCode",
"MizanError",
"NotFound",
"BadRequest",
"ValidationFailed",
"Unauthorized",
"Forbidden",
"NotImplementedYet",
"InternalError",
"compute_invalidation",
"compute_merges",
"execute_function",
]
from mizan_core.registry import get_context_groups, get_function
from mizan_core.type_utils import types_match_for_merge
_NO_CACHE = CacheOrchestrator(None, None)
# ─── Error taxonomy ─────────────────────────────────────────────────────────
class ErrorCode(str, Enum):
NOT_FOUND = "NOT_FOUND"
BAD_REQUEST = "BAD_REQUEST"
VALIDATION_ERROR = "VALIDATION_ERROR"
UNAUTHORIZED = "UNAUTHORIZED"
FORBIDDEN = "FORBIDDEN"
NOT_IMPLEMENTED = "NOT_IMPLEMENTED"
INTERNAL_ERROR = "INTERNAL_ERROR"
_STATUS = {
ErrorCode.NOT_FOUND: 404,
ErrorCode.BAD_REQUEST: 400,
ErrorCode.VALIDATION_ERROR: 422,
ErrorCode.UNAUTHORIZED: 401,
ErrorCode.FORBIDDEN: 403,
ErrorCode.NOT_IMPLEMENTED: 501,
ErrorCode.INTERNAL_ERROR: 500,
}
class MizanError(Exception):
"""Base for protocol-level dispatch errors."""
code: ErrorCode = ErrorCode.INTERNAL_ERROR
def __init__(self, message: str, *, details: dict[str, Any] | None = None) -> None:
super().__init__(message)
self.message = message
self.details = details
@property
def status_code(self) -> int:
return _STATUS[self.code]
class NotFound(MizanError): code = ErrorCode.NOT_FOUND # noqa: E701
class BadRequest(MizanError): code = ErrorCode.BAD_REQUEST # noqa: E701
class ValidationFailed(MizanError): code = ErrorCode.VALIDATION_ERROR # noqa: E701
class Unauthorized(MizanError): code = ErrorCode.UNAUTHORIZED # noqa: E701
class Forbidden(MizanError): code = ErrorCode.FORBIDDEN # noqa: E701
class NotImplementedYet(MizanError): code = ErrorCode.NOT_IMPLEMENTED # noqa: E701
class InternalError(MizanError): code = ErrorCode.INTERNAL_ERROR # noqa: E701
# ─── Auth ───────────────────────────────────────────────────────────────────
def _user(request: Any) -> Any:
return getattr(getattr(request, "state", None), "user", None)
def _is_authenticated(user: Any) -> bool:
return bool(user) and getattr(user, "is_authenticated", True)
def _enforce_auth(request: Any, requirement: Any) -> None:
"""Verify the request meets the function's @client(auth=...) requirement, or raise."""
if requirement is None:
return
user = _user(request)
match requirement:
case True | "required":
if not _is_authenticated(user):
raise Unauthorized("Authentication required")
case "staff":
if not _is_authenticated(user):
raise Unauthorized("Authentication required")
if not getattr(user, "is_staff", False):
raise Forbidden("Staff access required")
case "superuser":
if not _is_authenticated(user):
raise Unauthorized("Authentication required")
if not getattr(user, "is_superuser", False):
raise Forbidden("Superuser access required")
case f if callable(f):
if not f(request):
raise Forbidden("Permission denied")
case other:
raise InternalError(f"Unknown auth requirement: {other!r}")
# ─── Input validation ───────────────────────────────────────────────────────
def _validate_input(input_cls: Any, input_data: Any) -> BaseModel | None:
"""Validate input_data against the function's Input model. Returns the instance or None."""
if input_cls in (None, BaseModel) or not getattr(input_cls, "model_fields", None):
return None
fields = input_cls.model_fields
required = [name for name, f in fields.items() if f.is_required()]
if not input_data:
if required:
raise ValidationFailed(
"Input validation failed",
details={"fields": {name: ["Field required"] for name in required}},
)
return input_cls()
if not isinstance(input_data, dict):
raise BadRequest(f"Input must be an object, got {type(input_data).__name__}")
try:
return input_cls(**input_data)
except ValidationError as e:
raise ValidationFailed(
"Input validation failed",
details={"errors": e.errors()},
) from e
# ─── Dispatch ───────────────────────────────────────────────────────────────
def _resolve_function(fn_name: str) -> Any:
view_class = get_function(fn_name)
if view_class is None:
raise NotFound("Function not found")
if getattr(view_class, "_meta", {}).get("private"):
raise Forbidden("Function is not client-callable")
return view_class
def _serialize(result: Any) -> Any:
# jsonable_encoder walks BaseModel / list / dict recursively, so list[BaseModel]
# (and nested shapes) come out wire-ready without a per-shape branch here.
return jsonable_encoder(result)
async def execute_function(
request: Any,
fn_name: str,
input_data: dict[str, Any] | None = None,
) -> Any:
"""Dispatch a registered function. Returns the serialized result, or raises MizanError.
Awaits `view.acall` — async handlers run on the loop, sync handlers run
in the default threadpool, both via the same entrypoint.
"""
view_class = _resolve_function(fn_name)
_enforce_auth(request, view_class._meta.get("auth"))
view = view_class(request)
validated = _validate_input(view.Input, input_data)
try:
result = await view.acall(validated)
except NotImplementedError as e:
raise NotImplementedYet(str(e) or "Not implemented") from e
except MizanError:
raise
except Exception as e:
raise InternalError(str(e)) from e
return _serialize(result)
# ─── Invalidation ───────────────────────────────────────────────────────────
def compute_invalidation(view_class: Any, input_data: dict[str, Any] | None) -> list[Any]:
"""`@client(affects=...)` → invalidation list (empty when none). Shared core."""
return resolve_invalidation(view_class, input_data) or []
"""Build the `invalidate` list from @client(affects=...) metadata, auto-scoping when arg names match context params."""
affects = getattr(view_class, "_meta", {}).get("affects") or []
return [_invalidation_target(target, input_data or {}) for target in affects]
def compute_merges(view_class: Any, input_data: dict[str, Any] | None, result: Any) -> list[dict[str, Any]]:
"""`@client(merge=...)` → merge list (empty when none). Shared core."""
return resolve_merges(view_class, input_data, result) or []
"""Build the `merge` list from @client(merge=...) metadata.
async def execute_function(request: Any, fn_name: str, input_data: dict[str, Any] | None = None) -> Any:
"""Dispatch a function and return its serialized result (auth enforced via core).
Backward-compat entry point; the router uses `dispatch_call` directly to also
capture invalidation/merge and run the origin cache.
Each entry is `{context, slot, value, params?}` where `slot` names the
function inside the context bundle the value lands in. The slot is
resolved server-side via `types_match_for_merge` so the kernel does
no shape inference — the server has the schema, type-checked routing
lives here. Entries whose slot can't be uniquely resolved are dropped
with a warning; the consumer falls back to refetch via `affects`.
"""
identity = getattr(getattr(request, "state", None), "user", None)
res = await dispatch_call(
DispatchRequest(identity=identity, args=input_data, native_request=request),
fn_name,
_NO_CACHE,
)
return res.data
targets = getattr(view_class, "_meta", {}).get("merge") or []
if not targets:
return []
mutation_output = getattr(view_class, "Output", None)
out: list[dict[str, Any]] = []
for ctx_name in targets:
slot = _resolve_merge_slot(ctx_name, mutation_output)
if slot is None:
continue
entry: dict[str, Any] = {"context": ctx_name, "slot": slot, "value": result}
scoped = _scoped_params(ctx_name, input_data or {})
if scoped:
entry["params"] = scoped
out.append(entry)
return out
def _resolve_merge_slot(context_name: str, mutation_output: Any) -> str | None:
"""Find the unique function-name slot whose return type matches the mutation's output.
Returns None on no match or ambiguous match (multiple candidates).
"""
if mutation_output is None:
return None
matches: list[str] = []
for fn_name in get_context_groups().get(context_name, []):
fn_cls = get_function(fn_name)
if fn_cls is None:
continue
fn_output = getattr(fn_cls, "Output", None)
if fn_output is not None and types_match_for_merge(fn_output, mutation_output):
matches.append(fn_name)
return matches[0] if len(matches) == 1 else None
def _scoped_params(context_name: str, input_data: dict[str, Any]) -> dict[str, Any]:
"""Match input args against the context's declared Input field names."""
fn_names = get_context_groups().get(context_name, [])
declared: set[str] = set()
for fn_name in fn_names:
fn_cls = get_function(fn_name)
if fn_cls is None:
continue
input_cls = getattr(fn_cls, "Input", None)
if input_cls and input_cls is not BaseModel and hasattr(input_cls, "model_fields"):
declared.update(input_cls.model_fields.keys())
return {k: v for k, v in input_data.items() if k in declared}
def _invalidation_target(target: dict[str, Any], input_data: dict[str, Any]) -> Any:
match target.get("type"):
case "context":
name = target["name"]
scoped = _scoped_params(name, input_data)
return {"context": name, "params": scoped} if scoped else name
case "function":
return {"function": target["name"]}
case _:
return target

View File

@@ -1,245 +0,0 @@
"""
Forms — the Pydantic binding (schema / validate / submit roles).
A Mizan form is exposed as three server functions — `{name}.schema`,
`{name}.validate`, `{name}.submit` — carrying `_meta["form_role"]` of
`"schema"`, `"validate"`, `"submit"`. That role contract is AFI-common and
identical to the Django adapter's (`mizan.forms`); only the *binding* differs:
Django wraps a `forms.Form`, this wraps a Pydantic `BaseModel`.
from mizan_fastapi.forms import mizanForm, FormConfig
class ContactForm(mizanForm):
mizan = FormConfig(name="contact", title="Contact Us", submit_label="Send")
name: str
email: EmailStr
message: str
def on_submit_success(self, request) -> dict:
send_email(self.model_dump())
return {"sent": True}
Subclassing registers the three role functions automatically (parity with the
Django `mizanFormMixin.__init_subclass__` auto-registration):
contact.schema → field definitions (FormSchema)
contact.validate → structured field errors (FormValidation)
contact.submit → validate, then on_submit_success / on_submit_failure
"""
from __future__ import annotations
from typing import Any, ClassVar, get_args, get_origin
from pydantic import BaseModel, ValidationError, create_model
from mizan_core.client.function import ServerFunction
from mizan_core.registry import get_all_functions, register
from .schemas import (
FieldError,
FieldErrorList,
FieldSchema,
FormMeta,
FormSchema,
FormSubmitFail,
FormSubmitPass,
FormValidation,
)
__all__ = [
"FormConfig",
"mizanForm",
"get_forms",
"FormSchema",
"FormValidation",
"FormSubmitPass",
"FormSubmitFail",
]
# Pydantic annotation → the (type, widget) the frontend renders. Mirrors the
# Django binding's `_django_field_to_python_type` intent: hand the client a real
# field type instead of a generic string.
_TYPE_WIDGET = {
bool: ("checkbox", "CheckboxInput"),
int: ("number", "NumberInput"),
float: ("number", "NumberInput"),
str: ("text", "TextInput"),
}
class FormConfig(BaseModel):
"""Form metadata + frontend behavior (parity with `mizanFormMeta`)."""
name: str
title: str | None = None
subtitle: str | None = None
submit_label: str = "Submit"
live_validation: bool = True
live_form_errors: bool = False
refetch_schema_on_validate: bool = False
def _unwrap_optional(annotation: Any) -> Any:
"""`X | None` / `Optional[X]` → `X`; otherwise the annotation unchanged."""
if get_origin(annotation) in (None,):
return annotation
args = [a for a in get_args(annotation) if a is not type(None)]
if len(args) == 1 and type(None) in get_args(annotation):
return args[0]
return annotation
def _field_type_widget(annotation: Any) -> tuple[str, str]:
base = _unwrap_optional(annotation)
return _TYPE_WIDGET.get(base, ("text", "TextInput"))
def _humanize(name: str) -> str:
return name.replace("_", " ").title()
def build_form_schema(form_cls: type["mizanForm"]) -> FormSchema:
"""Derive a `FormSchema` from a Pydantic form's fields + config."""
cfg = form_cls.mizan
fields: list[FieldSchema] = []
for field_name, info in form_cls.model_fields.items():
type_str, widget = _field_type_widget(info.annotation)
required = info.is_required()
initial = None if required else info.get_default(call_default_factory=False)
if initial is None and info.default is not None and info.default is not ...:
initial = info.default
meta = info.json_schema_extra if isinstance(info.json_schema_extra, dict) else {}
fields.append(
FieldSchema(
name=field_name,
label=str(info.title or _humanize(field_name)),
type=type_str,
widget=widget,
required=required,
disabled=bool(meta.get("disabled", False)),
help_text=str(info.description or ""),
initial=initial if initial is not ... else None,
max_length=getattr(info, "max_length", None),
min_length=getattr(info, "min_length", None),
choices=None,
)
)
return FormSchema(
name=cfg.name,
title=cfg.title or _humanize(form_cls.__name__.removesuffix("Form")),
subtitle=cfg.subtitle,
submit_label=cfg.submit_label,
fields=fields,
meta=FormMeta(
refetch_schema_on_validate=cfg.refetch_schema_on_validate,
live_validation=cfg.live_validation,
live_form_errors=cfg.live_form_errors,
),
)
def _validation_from_error(exc: ValidationError) -> FormValidation:
"""Group a Pydantic `ValidationError` into the `FormValidation` wire shape."""
by_field: dict[str, list[FieldError]] = {}
for err in exc.errors():
loc = err.get("loc", ())
field = str(loc[0]) if loc else "__all__"
by_field.setdefault(field, []).append(
FieldError(message=err.get("msg", "Invalid value"), code=err.get("type"))
)
return FormValidation(
errors=[FieldErrorList(field=f, errors=errs) for f, errs in by_field.items()]
)
def _validate(form_cls: type["mizanForm"], data: dict[str, Any]) -> tuple["mizanForm | None", FormValidation]:
"""Validate `data`; return `(instance|None, validation)` — instance None on failure."""
try:
instance = form_cls(**(data or {}))
return instance, FormValidation(errors=[])
except ValidationError as exc:
return None, _validation_from_error(exc)
class mizanForm(BaseModel):
"""Base for a Pydantic-backed Mizan form.
Subclass with field annotations and a `mizan = FormConfig(...)`. Subclassing
auto-registers the schema/validate/submit role functions. Override
`on_submit_success` / `on_submit_failure` for submit-time behavior.
"""
mizan: ClassVar[FormConfig]
def on_submit_success(self, request: Any) -> dict | None:
"""Handle a validated submission. Override; returns optional result data."""
return None
def on_submit_failure(self, request: Any, errors: FormValidation) -> None:
"""Handle a failed submission (logging, etc.). Override."""
return None
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
cfg = cls.__dict__.get("mizan")
if isinstance(cfg, FormConfig):
_register_form(cls)
def _register_form(form_cls: type[mizanForm]) -> None:
"""Register `{name}.schema/.validate/.submit` for a Pydantic form class."""
cfg = form_cls.mizan
name = cfg.name
pascal = "".join(w.capitalize() for w in name.replace(".", "_").replace("-", "_").split("_"))
schema_input = create_model(f"{pascal}SchemaInput", data=(dict[str, Any], {}))
validate_input = create_model(f"{pascal}ValidateInput", data=(dict[str, Any], ...))
submit_input = create_model(f"{pascal}SubmitInput", data=(dict[str, Any], ...))
class SchemaFunction(ServerFunction):
Input = schema_input
Output = FormSchema
_meta: ClassVar[dict] = {"form": True, "form_name": name, "form_role": "schema"}
def call(self, input) -> FormSchema:
return build_form_schema(form_cls)
class ValidateFunction(ServerFunction):
Input = validate_input
Output = FormValidation
_meta: ClassVar[dict] = {"form": True, "form_name": name, "form_role": "validate"}
def call(self, input) -> FormValidation:
_, validation = _validate(form_cls, input.data)
return validation
class SubmitFunction(ServerFunction):
Input = submit_input
Output = FormSubmitPass
_meta: ClassVar[dict] = {"form": True, "form_name": name, "form_role": "submit"}
def call(self, input) -> FormSubmitPass | FormSubmitFail:
instance, validation = _validate(form_cls, input.data)
if instance is not None:
return FormSubmitPass(success=True, data=instance.on_submit_success(self.request))
instance_for_failure = form_cls.model_construct(**(input.data or {}))
instance_for_failure.on_submit_failure(self.request, validation)
return FormSubmitFail(success=False, errors=validation)
for fn, role in ((SchemaFunction, "schema"), (ValidateFunction, "validate"), (SubmitFunction, "submit")):
fn.__name__ = f"{name}_{role}"
fn.__qualname__ = fn.__name__
register(fn, f"{name}.{role}")
def get_forms() -> dict[str, list]:
"""Group registered form role functions by form name (parity helper)."""
forms: dict[str, list] = {}
for _, cls in get_all_functions().items():
meta = getattr(cls, "_meta", {})
if meta.get("form"):
forms.setdefault(meta.get("form_name"), []).append(cls)
return forms

View File

@@ -1,77 +0,0 @@
"""
Form role output schemas — the wire shapes the schema/validate/submit roles emit.
These mirror the Django adapter's `mizan.forms.schemas` field-for-field (FormMeta,
FieldSchema, FormSchema, FormValidation, FormSubmitPass/Fail) so the generated
client is identical regardless of which backend authored the form. The only
difference is the source: Django builds these from `forms.Field` introspection;
this builds them from Pydantic `FieldInfo`.
"""
from __future__ import annotations
from typing import Any, Optional
from pydantic import BaseModel
class FormMeta(BaseModel):
"""Frontend behavior flags (parity with the Django adapter)."""
refetch_schema_on_validate: bool = False
live_validation: bool = True
live_form_errors: bool = False
class FieldChoice(BaseModel):
value: str
label: str
class FieldError(BaseModel):
message: str
code: Optional[str] = None
class FieldErrorList(BaseModel):
field: str
errors: list[FieldError]
class FieldSchema(BaseModel):
name: str
label: str
type: str
widget: str
required: bool
disabled: bool
help_text: str
initial: Any = None
max_length: Optional[int] = None
min_length: Optional[int] = None
choices: Optional[list[FieldChoice]] = None
class FormSchema(BaseModel):
"""Schema returned by the `.schema` role: form metadata + field definitions."""
name: str
title: str
subtitle: Optional[str] = None
submit_label: str
fields: list[FieldSchema]
meta: FormMeta = FormMeta()
class FormValidation(BaseModel):
errors: list[FieldErrorList]
class FormSubmitPass(BaseModel):
success: bool
data: Optional[dict] = None
class FormSubmitFail(BaseModel):
success: bool
errors: FormValidation

View File

@@ -1,98 +0,0 @@
"""
Edge manifest — FastAPI adapter surface.
The manifest derivation is AFI-common (`mizan_core.manifest.generate_edge_manifest`);
this module exposes it over FastAPI's surface as a callable and a console entry
(`mizan-fastapi-edge-manifest`), mirroring Django's `export_edge_manifest`
management command.
The `render_strategy` field each context carries — `"psr"` when the context has
no user-scoped param, `"dynamic_cached"` when it does — is the PSR signal Edge
reads to decide between one shared pre-rendered artifact and a per-user cached
one. It is derived in the core from the same registry metadata, so FastAPI and
Django emit byte-identical manifests for an identical registry.
CLI:
mizan-fastapi-edge-manifest myproject.app
mizan-fastapi-edge-manifest myproject.app:app --base-url /api/mizan -o edge.json
The positional argument is an import target (``module`` or ``module:attr``); it
is imported for its registration side effects (importing the module runs the
`@client` decorators and `register(...)` calls that populate the registry)
before the manifest is derived.
"""
from __future__ import annotations
import argparse
import importlib
import sys
from pathlib import Path
from typing import Any
from mizan_core.manifest import generate_edge_manifest, generate_edge_manifest_json
__all__ = ["edge_manifest", "generate_edge_manifest", "render_strategies", "main"]
def edge_manifest(base_url: str = "/api/mizan") -> dict[str, Any]:
"""The Edge manifest for the current registry.
Call after the app's `@client` functions are imported/registered. The
returned dict carries each context's ``render_strategy`` (PSR vs.
dynamic_cached) and the mutation→context invalidation routing.
"""
return generate_edge_manifest(base_url=base_url)
def render_strategies(base_url: str = "/api/mizan") -> dict[str, str]:
"""Map each context to its ``render_strategy`` — ``"psr"`` or ``"dynamic_cached"``.
PSR (Preemptive Static Rendering) is the per-context decision Edge needs: a
context with no user-scoped param renders one shared artifact (``psr``) that
is re-rendered on mutation; a user-scoped context renders per-user
(``dynamic_cached``). This surfaces that decision directly so a PSR driver can
enumerate which contexts to pre-render without re-deriving it.
"""
contexts = edge_manifest(base_url)["contexts"]
return {name: entry["render_strategy"] for name, entry in contexts.items()}
def _import_target(target: str) -> None:
"""Import a ``module`` or ``module:attr`` target for its registration effects."""
module_name = target.split(":", 1)[0]
importlib.import_module(module_name)
def main(argv: list[str] | None = None) -> int:
"""Console entry: import the app target, emit the Edge manifest as JSON."""
parser = argparse.ArgumentParser(
prog="mizan-fastapi-edge-manifest",
description="Export the Mizan Edge manifest for a FastAPI app.",
)
parser.add_argument(
"app",
help="Import target whose @client functions to register "
"(e.g. 'myproject.app' or 'myproject.app:app').",
)
parser.add_argument("--base-url", default="/api/mizan", help="Mizan API mount point.")
parser.add_argument("-o", "--output", default=None, help="Write to file instead of stdout.")
parser.add_argument("--indent", type=int, default=2, help="JSON indent (0 = compact).")
args = parser.parse_args(argv)
sys.path.insert(0, "")
_import_target(args.app)
indent = args.indent if args.indent > 0 else None
text = generate_edge_manifest_json(base_url=args.base_url, indent=indent)
if args.output:
Path(args.output).write_text(text, encoding="utf-8")
else:
sys.stdout.write(text)
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -14,22 +14,23 @@ FastAPI router exposing Mizan's HTTP endpoints:
from __future__ import annotations
import json
from typing import Any
from fastapi import APIRouter, Request, WebSocket, WebSocketDisconnect
from fastapi import APIRouter, Request
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse, Response
from pydantic import BaseModel, Field, ValidationError
from starlette.datastructures import UploadFile
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from mizan_core.auth import INVALID, authenticate
from mizan_core.dispatch import DispatchRequest, dispatch_call, dispatch_context
from mizan_core.errors import BadRequest, ErrorCode, Forbidden, MizanError, NotFound, Unauthorized
from mizan_core.registry import get_function
from mizan_core.upload import UploadedFile, bind_uploads
from mizan_core.registry import get_context_groups, get_function
from .config import MizanConfig, get_config
from .executor import (
ErrorCode,
MizanError,
NotFound,
compute_invalidation,
compute_merges,
execute_function,
)
router = APIRouter()
@@ -44,12 +45,11 @@ def _no_store(payload: Any, status_code: int = 200) -> JSONResponse:
@router.get("/session/")
async def session_init() -> JSONResponse:
"""Session-init endpoint. AFI-common; wired here at parity with mizan-django.
"""Session-init probe. Parity with mizan-django's session endpoint.
The endpoint itself is the AFI-common surface. The CSRF *token* is a Django
session mechanism with no FastAPI equivalent, so this returns a null token —
the difference is in the token's backing mechanism, not in whether the
endpoint is owed. The wire-parity harness uses it as its readiness probe.
CSRF is a Django-only concern at the protocol level; FastAPI surfaces a
null token so the response shape stays uniform across backends. The
wire-parity harness uses this endpoint as its readiness probe.
"""
return _no_store({"csrfToken": None})
@@ -59,197 +59,29 @@ class CallBody(BaseModel):
args: dict[str, Any] = Field(default_factory=dict)
async def _parse_call(request: Request) -> tuple[str, dict[str, Any]]:
"""Read a call request, JSON or multipart. Returns `(fn, args)`.
Multipart carries the non-file fields in a JSON `args` part and each file as
its own part; the file parts bind into the Input's Upload fields with the
declarative `File(...)` constraints enforced.
"""
content_type = request.headers.get("content-type", "")
if content_type.startswith("multipart/form-data"):
form = await request.form()
fn = form.get("fn")
if not isinstance(fn, str) or not fn:
raise BadRequest("Missing 'fn' field")
raw_args = form.get("args")
try:
args: dict[str, Any] = json.loads(raw_args) if raw_args else {}
except (TypeError, ValueError):
raise BadRequest("Invalid JSON in 'args' field")
fn_class = get_function(fn)
input_cls = getattr(fn_class, "Input", None) if fn_class else None
if input_cls is not None and hasattr(input_cls, "model_fields"):
files: dict[str, list[UploadedFile]] = {}
for key in set(form.keys()):
wrapped = [
UploadedFile(p.filename, p.content_type, await p.read())
for p in form.getlist(key)
if isinstance(p, UploadFile)
]
if wrapped:
files[key] = wrapped
err = bind_uploads(input_cls, args, files)
if err is not None:
raise BadRequest(err)
return fn, args
try:
body = CallBody(**(await request.json()))
except (ValueError, ValidationError):
raise BadRequest("Invalid request body")
return body.fn, body.args
def _identity(request: Request, cfg: MizanConfig):
"""Identity for dispatch: a host-set `request.state.user`, else a token decode.
A present-but-invalid token rejects (401); no token → None (anonymous).
"""
existing = getattr(getattr(request, "state", None), "user", None)
if existing is not None:
return existing
ident = authenticate(request.headers, cfg.auth)
if ident is INVALID:
raise Unauthorized("Invalid or expired token")
return ident
@router.post("/call/")
async def function_call(request: Request) -> JSONResponse:
"""RPC dispatch — JSON or multipart → `{"result", "invalidate", "merge"?}` with
the `X-Mizan-Invalidate` header alongside the body."""
cfg = get_config(request)
fn, args = await _parse_call(request)
res = await dispatch_call(
DispatchRequest(identity=_identity(request, cfg), args=args, native_request=request),
fn, cfg.cache,
)
payload: dict[str, Any] = {"result": res.data, "invalidate": res.invalidate or []}
if res.merge:
payload["merge"] = res.merge
headers = {"Cache-Control": "no-store"}
if res.invalidate_header:
headers["X-Mizan-Invalidate"] = res.invalidate_header
return JSONResponse(payload, headers=headers)
async def function_call(body: CallBody, request: Request) -> JSONResponse:
"""RPC dispatch — `{"fn": "...", "args": {...}}` → `{"result": ..., "invalidate": [...], "merge"?: [...]}`."""
fn_class = get_function(body.fn)
result = await execute_function(request, body.fn, body.args)
invalidate = compute_invalidation(fn_class, body.args)
merges = compute_merges(fn_class, body.args, result)
payload: dict[str, Any] = {"result": result, "invalidate": invalidate}
if merges:
payload["merge"] = merges
return _no_store(payload)
@router.get("/ctx/{context_name}/")
async def context_fetch(context_name: str, request: Request) -> Response:
"""Bundled context fetch — origin-cached. `{function_name: result, ...}`."""
cfg = get_config(request)
res = await dispatch_context(
DispatchRequest(identity=_identity(request, cfg), args=dict(request.query_params),
native_request=request),
context_name, cfg.cache,
)
headers = {"Cache-Control": "no-store"}
if res.cache_status:
headers["X-Mizan-Cache"] = res.cache_status
return Response(content=res.body_bytes, media_type="application/json", headers=headers)
async def context_fetch(context_name: str, request: Request) -> JSONResponse:
"""Bundled context fetch — `{function_name: result, ...}` for every function in the context."""
fn_names = get_context_groups().get(context_name)
if not fn_names:
raise NotFound(f"Context '{context_name}' not found")
# ─── WebSocket RPC transport ──────────────────────────────────────────────────
def _ws_identity(websocket: WebSocket, cfg: MizanConfig):
"""Identity for a WebSocket RPC: a host-set `websocket.state.user`, else a
token decode from the handshake headers. A present-but-invalid token rejects.
Mirrors the HTTP `_identity` path so a function's `auth=` guard enforces
identically over either transport.
"""
existing = getattr(getattr(websocket, "state", None), "user", None)
if existing is not None:
return existing
ident = authenticate(websocket.headers, cfg.auth)
if ident is INVALID:
raise Unauthorized("Invalid or expired token")
return ident
def _error_frame(request_id: Any, exc: MizanError) -> dict[str, Any]:
err: dict[str, Any] = {"code": exc.code.value, "message": exc.message}
if exc.details:
err["details"] = exc.details
return {"id": request_id, "ok": False, "error": err}
@router.websocket("/ws/")
async def websocket_rpc(websocket: WebSocket) -> None:
"""WebSocket RPC transport for `@client(websocket=True)` functions.
Frame protocol (parity with mizan-django's Channels consumer):
{"action": "rpc", "id": "<req>", "fn": "<name>", "args": {...}}
{"id": "<req>", "ok": true, "data": <result>, "invalidate": [...], "merge"?: [...]}
{"id": "<req>", "ok": false, "error": {"code", "message", "details"?}}
Each call runs through the SAME `mizan_core.dispatch.dispatch_call` as
`POST /call/`, so input validation, `auth=` enforcement, invalidation, merge,
and origin-cache purge are identical across transports. Only functions that
declared `websocket=True` are callable here; an HTTP-only function returns a
`FORBIDDEN` frame rather than executing.
"""
cfg = get_config(websocket)
await websocket.accept()
try:
identity = _ws_identity(websocket, cfg)
except Unauthorized as exc:
await websocket.send_json(_error_frame(None, exc))
await websocket.close(code=1008)
return
try:
while True:
content = await websocket.receive_json()
await _handle_ws_rpc(websocket, content, identity, cfg)
except WebSocketDisconnect:
return
async def _handle_ws_rpc(websocket: WebSocket, content: dict[str, Any], identity, cfg: MizanConfig) -> None:
"""Dispatch one WS RPC frame through the shared dispatch core."""
if content.get("action") != "rpc":
await websocket.send_json({"error": f"Unknown action: {content.get('action')}"})
return
request_id = content.get("id")
fn_name = content.get("fn")
args = content.get("args", {})
if not fn_name:
await websocket.send_json(_error_frame(request_id, BadRequest("Missing 'fn' field")))
return
fn_class = get_function(fn_name)
if fn_class is None:
await websocket.send_json(_error_frame(request_id, NotFound(f"Function '{fn_name}' not found")))
return
if not getattr(fn_class, "_meta", {}).get("websocket"):
await websocket.send_json(
_error_frame(
request_id,
Forbidden("This function is HTTP-only. Use POST /api/mizan/call/ instead."),
)
)
return
try:
res = await dispatch_call(
DispatchRequest(identity=identity, args=args, native_request=websocket),
fn_name, cfg.cache,
)
except MizanError as exc:
await websocket.send_json(_error_frame(request_id, exc))
return
frame: dict[str, Any] = {"id": request_id, "ok": True, "data": res.data,
"invalidate": res.invalidate or []}
if res.merge:
frame["merge"] = res.merge
await websocket.send_json(frame)
params = dict(request.query_params)
bundled = {fn: await execute_function(request, fn, params) for fn in fn_names}
return _no_store(bundled)
# ─── Exception handler ──────────────────────────────────────────────────────

View File

@@ -1,307 +0,0 @@
"""
Typed query projection (Shapes) — the SQLAlchemy binding.
A Shape is a Pydantic model that declares *which* fields and relationships of an
ORM model to project. The declaration surface is identical to the Django
adapter's `mizan.shapes` (`django-readers` binding):
class AuthorShape(Shape[Author]):
id: int
name: str
books: list[BookShape] = [] # nested relationship
AuthorShape.query(session, lambda s: s.where(Author.name == "Ann"))
Only the ORM binding differs: where the Django Shape lowers its spec to
`django-readers` pairs (queryset prepare + instance project), this lowers it to a
SQLAlchemy `select(Model)` with `selectinload(...)` eager-loading for each nested
relationship (the projection-load that keeps the query count flat), then projects
each loaded instance into the Pydantic shape. `.diff()` / `.diff_many()` compare a
constructed shape against current DB rows, mirroring the Django semantics.
The one surface difference SQLAlchemy forces is an explicit `session` argument to
`query` / `diff` / `diff_many` — Django models carry an implicit `objects`
manager; a SQLAlchemy mapped class does not. That is the ORM binding, not the
Shape declaration.
"""
from __future__ import annotations
import types
from typing import Any, ClassVar, Generic, TypeVar, Union, get_type_hints
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.inspection import inspect as sa_inspect
from sqlalchemy.orm import Session, selectinload
_M = TypeVar("_M")
_S = TypeVar("_S", bound="Shape")
def _extract_shape_class(hint) -> type[Shape] | None:
"""The nested Shape a field annotation projects, if any.
Handles `SomeShape`, `list[SomeShape]`, and `SomeShape | None` / Optional —
the same forms the Django binding's `_extract_shape_class` accepts.
"""
origin = getattr(hint, "__origin__", None)
args = getattr(hint, "__args__", ())
if origin is list and args and isinstance(args[0], type) and issubclass(args[0], Shape):
return args[0]
if isinstance(hint, type) and issubclass(hint, Shape) and hint is not Shape:
return hint
if origin is Union or isinstance(hint, types.UnionType):
for arg in args:
if arg is type(None):
continue
if isinstance(arg, type) and issubclass(arg, Shape) and arg is not Shape:
return arg
return None
def _resolve_model(cls) -> Any | None:
"""The mapped model a Shape subclass is parameterized on (`Shape[Model]`)."""
for base in cls.__bases__:
meta = getattr(base, "__pydantic_generic_metadata__", None) or {}
if meta.get("origin") is Shape and (args := meta.get("args")):
return args[0]
return None
class Shape(BaseModel, Generic[_M]):
"""Typed projection over a SQLAlchemy mapped model.
Subclass as `Shape[Model]`; annotate the fields/relationships to project.
Scalar annotations become columns to read; annotations referencing another
Shape become relationships to eager-load and project recursively.
"""
_model: ClassVar[Any]
_nested: ClassVar[dict[str, type[Shape]]]
_field_names: ClassVar[list[str]]
_pk_field: ClassVar[str]
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
if not (model := _resolve_model(cls)):
return
mapper = sa_inspect(model)
cls._model = model
cls._nested = {}
pk_cols = mapper.primary_key
cls._pk_field = pk_cols[0].key if pk_cols else "id"
hints = get_type_hints(cls, include_extras=False, localns={cls.__name__: cls}) or cls.__annotations__
field_names: list[str] = []
for name, hint in hints.items():
if name.startswith("_"):
continue
if shape_cls := _extract_shape_class(hint):
cls._nested[name] = shape_cls
else:
field_names.append(name)
cls._field_names = field_names
# ─── Loading + projection ────────────────────────────────────────────────
@classmethod
def _loader_options(cls) -> list[Any]:
"""`selectinload(...)` chains for every nested relationship (recursive).
This is the SQLAlchemy analogue of django-readers' prefetch wiring: each
nested Shape contributes a `selectinload` on its relationship attribute,
with the child Shape's own loader options nested beneath it, so the whole
projection loads in O(depth) queries rather than N+1.
"""
options: list[Any] = []
for name, shape_cls in cls._nested.items():
attr = getattr(cls._model, name)
child = shape_cls._loader_options()
loader = selectinload(attr)
options.append(loader.options(*child) if child else loader)
return options
@classmethod
def _project(cls: type[_S], instance: Any) -> _S:
"""Project a loaded ORM instance into this Shape (recursively for nested)."""
data: dict[str, Any] = {name: getattr(instance, name) for name in cls._field_names}
for name, shape_cls in cls._nested.items():
related = getattr(instance, name)
if related is None:
data[name] = None
elif isinstance(related, (list, set, tuple)) or hasattr(related, "__iter__") and not isinstance(related, (str, bytes)):
data[name] = [shape_cls._project(child) for child in related]
else:
data[name] = shape_cls._project(related)
return cls.model_validate(data)
@classmethod
def query(cls: type[_S], session: Session, *stmt_fns, **relation_stmt) -> list[_S]:
"""Project the model into a list of shapes.
Args:
session: An open SQLAlchemy `Session`.
*stmt_fns: Callables `(select) -> select` applied in order to the base
`select(Model)` — filters/ordering/limits (the SQLAlchemy analogue
of the Django binding's queryset functions).
**relation_stmt: Per-relationship callables `(select) -> select` whose
criteria scope a nested relationship's load (e.g.
``books=lambda s: s.where(Book.is_published.is_(True))``).
Returns:
A list of projected shape instances.
"""
stmt = select(cls._model)
loaders = cls._loader_options_scoped(relation_stmt)
if loaders:
stmt = stmt.options(*loaders)
for fn in stmt_fns:
stmt = fn(stmt)
rows = session.execute(stmt).unique().scalars().all()
return [cls._project(obj) for obj in rows]
@classmethod
def _loader_options_scoped(cls, relation_stmt: dict[str, Any]) -> list[Any]:
"""`_loader_options`, but with caller-supplied criteria applied per relation."""
if not relation_stmt:
return cls._loader_options()
options: list[Any] = []
for name, shape_cls in cls._nested.items():
attr = getattr(cls._model, name)
loader = selectinload(attr)
child = shape_cls._loader_options()
if child:
loader = loader.options(*child)
scope = relation_stmt.get(name)
if scope is not None:
# `selectinload(...).and_(...)` filters the related rows loaded.
criteria = scope(select(shape_cls._model)).whereclause
if criteria is not None:
loader = selectinload(attr.and_(criteria))
if child:
loader = loader.options(*child)
options.append(loader)
return options
@classmethod
def _get_pk(cls, instance) -> Any | None:
return getattr(instance, cls._pk_field, None)
# ─── Diff ────────────────────────────────────────────────────────────────
@classmethod
def diff_many(cls: type[_S], session: Session, items: list[_S]) -> list[tuple[_S, "Diff"]]:
"""Diff a batch of shapes against current DB state in one fetch.
New items (no PK) diff against `None`; existing items batch-fetch by PK.
Raises if an item declares a PK that no row matches.
"""
pk_field = cls._pk_field
pk_map: dict[Any, _S] = {}
new_items: list[_S] = []
for item in items:
pk = cls._get_pk(item)
(pk_map.__setitem__(pk, item) if pk is not None else new_items.append(item))
current_map: dict[Any, _S] = {}
if pk_map:
pk_col = getattr(cls._model, pk_field)
current = cls.query(session, lambda s, _c=pk_col: s.where(_c.in_(list(pk_map.keys()))))
current_map = {cls._get_pk(c): c for c in current}
results: list[tuple[_S, Diff]] = []
for item in new_items:
results.append((item, cls._diff_one(item, None)))
for pk, item in pk_map.items():
current = current_map.get(pk)
if current is None:
raise LookupError(f"{cls._model.__name__} with {pk_field}={pk} does not exist")
results.append((item, cls._diff_one(item, current)))
return results
@classmethod
def _diff_one(cls, incoming: _S, current: _S | None) -> "Diff":
pk_field = cls._pk_field
changed = (
{k: getattr(incoming, k) for k in cls._field_names
if k != pk_field and getattr(incoming, k) != getattr(current, k)}
if current
else {k: getattr(incoming, k) for k in cls._field_names if k != pk_field}
)
nested: dict[str, NestedDiff] = {}
for name, shape_cls in cls._nested.items():
incoming_items = getattr(incoming, name, None) or []
current_items = (getattr(current, name, None) or []) if current else []
if not isinstance(incoming_items, list):
incoming_items = [incoming_items]
if not isinstance(current_items, list):
current_items = [current_items]
current_by_pk = {shape_cls._get_pk(c): c for c in current_items if shape_cls._get_pk(c) is not None}
incoming_by_pk = {shape_cls._get_pk(c): c for c in incoming_items if shape_cls._get_pk(c) is not None}
nested[name] = NestedDiff(
created=[c for c in incoming_items if shape_cls._get_pk(c) is None],
updated=[c for pk, c in incoming_by_pk.items() if pk in current_by_pk and c != current_by_pk[pk]],
deleted=[pk for pk in current_by_pk if pk not in incoming_by_pk],
)
return Diff(is_new=current is None, changed=changed, _nested=nested)
def diff(self, session: Session) -> "Diff":
"""Diff this shape against its current DB row (or `None` if new)."""
cls = type(self)
pk = cls._get_pk(self)
if pk is not None:
pk_col = getattr(cls._model, cls._pk_field)
results = cls.query(session, lambda s: s.where(pk_col == pk))
if not results:
raise LookupError(f"{cls._model.__name__} with {cls._pk_field}={pk} does not exist")
current = results[0]
else:
current = None
return cls._diff_one(self, current)
class NestedDiff:
__slots__ = ("created", "updated", "deleted")
def __init__(self, created=(), updated=(), deleted=()):
self.created = list(created)
self.updated = list(updated)
self.deleted = list(deleted)
class Diff:
__slots__ = ("is_new", "changed", "_nested")
def __init__(self, is_new: bool, changed: dict[str, Any], _nested: dict[str, NestedDiff]):
self.is_new = is_new
self.changed = changed
self._nested = _nested
def nested(self, name: str) -> NestedDiff:
"""Strict access to a nested diff. Raises `KeyError` for an unknown name."""
if name not in self._nested:
valid = ", ".join(sorted(self._nested)) or "(none)"
raise KeyError(f"No nested diff for '{name}'. Valid nested shapes: {valid}")
return self._nested[name]
def __getattr__(self, name: str) -> NestedDiff:
if name.startswith("_"):
raise AttributeError(name)
if name not in self._nested:
valid = ", ".join(sorted(self._nested)) or "(none)"
raise AttributeError(f"No nested diff for '{name}'. Valid nested shapes: {valid}")
return self._nested[name]

View File

@@ -1,80 +0,0 @@
"""
SSR render path — FastAPI adapter surface over the shared Bun bridge.
The SSR subprocess lifecycle and JSON-RPC wire protocol live in
`mizan_core.ssr.SSRBridge` (framework-agnostic). FastAPI has no template-engine
backend, so instead of Django's `MizanTemplates` veneer this exposes an
`SSRRenderer` whose `.render(...)` calls the same bridge — `renderToString` runs
in the persistent Bun worker — and returns an `HTMLResponse` with the rendered
markup plus the hydration payload the client reads on mount.
Usage:
from mizan_fastapi.ssr import SSRRenderer
ssr = SSRRenderer(worker="path/to/mizan-ssr/src/worker.tsx", dirs=["frontend"])
@app.get("/profile/{user_id}")
async def profile(user_id: int):
return ssr.render("components/Profile.tsx", {"user_id": user_id})
`render` resolves the template name to an absolute file path against `dirs`
(parity with Django's `DIRS`), then renders the component's default export. The
hydration wrapping matches the Django backend byte-for-byte so the same client
bundle hydrates either server.
"""
from __future__ import annotations
import json
import os
from typing import Any
from fastapi.responses import HTMLResponse
from mizan_core.ssr import SSRBridge
class SSRRenderer:
"""Render React `.tsx`/`.jsx` files via the shared Bun SSR bridge.
One renderer owns one persistent `SSRBridge`. Thread-safe (the bridge
serializes worker I/O); a single renderer can be shared across the app.
"""
def __init__(self, worker: str, dirs: list[str] | None = None, timeout: float = 5.0) -> None:
self._dirs = list(dirs or [])
self._bridge = SSRBridge(worker_path=worker, timeout=timeout)
def _resolve(self, template_name: str) -> str:
"""Resolve a template name to an absolute file path against `dirs`.
An already-absolute, existing path is used directly; otherwise each `dirs`
entry is tried in order (parity with Django's `DIRS` resolution).
"""
if os.path.isabs(template_name) and os.path.isfile(template_name):
return template_name
for dir_path in self._dirs:
candidate = os.path.join(dir_path, template_name)
if os.path.isfile(candidate):
return os.path.abspath(candidate)
raise FileNotFoundError(
f"SSR component '{template_name}' not found in dirs={self._dirs!r}"
)
def render_to_string(self, template_name: str, props: dict[str, Any] | None = None) -> str:
"""Render the component to an HTML string (markup + hydration script)."""
props = dict(props or {})
result = self._bridge.render(self._resolve(template_name), props)
hydration_json = json.dumps(props, sort_keys=True, default=str)
return (
f'<div id="mizan-root">{result.html}</div>'
f"<script>window.__MIZAN_SSR_DATA__={hydration_json}</script>"
)
def render(self, template_name: str, props: dict[str, Any] | None = None, status_code: int = 200) -> HTMLResponse:
"""Render the component and return a FastAPI `HTMLResponse`."""
return HTMLResponse(self.render_to_string(template_name, props), status_code=status_code)
def shutdown(self) -> None:
"""Stop the underlying Bun subprocess."""
self._bridge.shutdown()

View File

@@ -1,167 +0,0 @@
"""
Edge-manifest + PSR behavior — the genuine capability behind the
`edge_manifest` and `psr` probes.
Proves the FastAPI adapter emits the manifest the spec defines (contexts,
mutations, params, user_scoped, render_strategy, page_routes) by deriving it from
a real registry, and that `render_strategy` falls out of the user-scoped-param
rule: a context whose params overlap {user_id, user, owner_id, account_id} is
`dynamic_cached`, otherwise `psr`.
"""
from __future__ import annotations
import json
import subprocess
import sys
import pytest
from fastapi.responses import Response
import mizan_fastapi # registers the Starlette Response base for view-path detection
from mizan_core.client.function import client
from mizan_core.registry import clear_registry, register
from mizan_fastapi import edge_manifest, generate_edge_manifest
from mizan_fastapi.manifest import render_strategies
@pytest.fixture(autouse=True)
def _clean_registry():
clear_registry()
yield
clear_registry()
def _register(fn, name):
register(fn, name)
def test_user_scoped_context_is_dynamic_cached():
@client(context="user")
def user_profile(request, user_id: int) -> dict:
return {"id": user_id}
_register(user_profile, "user_profile")
manifest = edge_manifest()
ctx = manifest["contexts"]["user"]
assert ctx["user_scoped"] is True
assert ctx["render_strategy"] == "dynamic_cached"
assert ctx["params"] == ["user_id"]
assert ctx["endpoints"] == ["/api/mizan/ctx/user/"]
def test_non_user_scoped_context_is_psr():
@client(context="catalog")
def catalog_items(request, category: str) -> list[dict]:
return [{"category": category}]
_register(catalog_items, "catalog_items")
ctx = edge_manifest()["contexts"]["catalog"]
assert ctx["user_scoped"] is False
assert ctx["render_strategy"] == "psr"
def test_render_strategies_maps_each_context():
@client(context="user")
def me(request, user_id: int) -> dict:
return {"id": user_id}
@client(context="catalog")
def items(request) -> list[dict]:
return []
_register(me, "me")
_register(items, "items")
strategies = render_strategies()
assert strategies == {"user": "dynamic_cached", "catalog": "psr"}
def test_mutation_records_affects_and_auto_scope():
@client(context="user")
def user_profile(request, user_id: int) -> dict:
return {"id": user_id}
@client(affects="user")
def rename(request, user_id: int, name: str) -> dict:
return {"ok": True}
_register(user_profile, "user_profile")
_register(rename, "rename")
mutation = edge_manifest()["mutations"]["rename"]
assert mutation["affects"] == ["user"]
# user_id matches the context's param → auto-scoped
assert mutation["auto_scoped_params"] == ["user_id"]
def test_private_and_route_mutation_carried():
@client(affects="subscription", private=True, route="/webhooks/stripe/", methods=["POST"])
def stripe_webhook(request) -> Response:
return Response(status_code=200)
@client(context="subscription")
def subscription(request, user_id: int) -> dict:
return {"id": user_id}
_register(stripe_webhook, "stripe_webhook")
_register(subscription, "subscription")
mutation = edge_manifest()["mutations"]["stripe_webhook"]
assert mutation["private"] is True
assert mutation["route"] == "/webhooks/stripe/"
assert mutation["methods"] == ["POST"]
def test_view_path_function_records_route_and_page_routes():
@client(context="profile", route="/profile/<user_id>/")
def profile_page(request, user_id: int) -> Response:
return Response(status_code=200)
_register(profile_page, "profile_page")
ctx = edge_manifest()["contexts"]["profile"]
assert ctx["page_routes"] == ["/profile/<user_id>/"]
fn_entry = next(f for f in ctx["functions"] if f["name"] == "profile_page")
assert fn_entry["path"] == "view"
assert fn_entry["route"] == "/profile/<user_id>/"
def test_fastapi_manifest_matches_core_derivation():
"""The adapter callable is a thin pass-through to the shared core derivation."""
@client(context="user")
def user_profile(request, user_id: int) -> dict:
return {"id": user_id}
_register(user_profile, "user_profile")
assert edge_manifest() == generate_edge_manifest(base_url="/api/mizan")
def test_cli_entry_emits_manifest_json(tmp_path):
"""`mizan-fastapi-edge-manifest <module>` imports the module then prints JSON."""
app_module = tmp_path / "manifest_app.py"
app_module.write_text(
"from mizan_core.client.function import client\n"
"from mizan_core.registry import register\n"
"@client(context='user')\n"
"def user_profile(request, user_id: int) -> dict:\n"
" return {'id': user_id}\n"
"register(user_profile, 'user_profile')\n",
encoding="utf-8",
)
result = subprocess.run(
[sys.executable, "-m", "mizan_fastapi.manifest", "manifest_app", "--indent", "0"],
cwd=tmp_path,
capture_output=True,
text=True,
check=False,
)
assert result.returncode == 0, result.stderr
manifest = json.loads(result.stdout)
assert manifest["contexts"]["user"]["render_strategy"] == "dynamic_cached"

View File

@@ -1,145 +0,0 @@
"""
Forms behavior — the genuine capability behind the `forms` probe.
Proves the Pydantic binding exposes the same schema / validate / submit role
contract as the Django adapter: subclassing `mizanForm` auto-registers
`{name}.schema`, `{name}.validate`, `{name}.submit` with the matching
`_meta["form_role"]`, the schema role emits typed field definitions, validate
returns structured field errors, and submit validates then runs
`on_submit_success` / `on_submit_failure`.
"""
from __future__ import annotations
import pytest
from mizan_core.registry import clear_registry, get_function
from mizan_fastapi.forms import (
FormConfig,
FormSubmitFail,
FormSubmitPass,
FormValidation,
build_form_schema,
get_forms,
mizanForm,
)
@pytest.fixture(autouse=True)
def _clean():
clear_registry()
yield
clear_registry()
def _make_contact_form():
class ContactForm(mizanForm):
mizan = FormConfig(name="contact", title="Contact Us", submit_label="Send")
name: str
email: str
message: str = ""
def on_submit_success(self, request) -> dict:
return {"sent": True, "to": self.email}
return ContactForm
def test_subclassing_registers_three_role_functions():
_make_contact_form()
for role in ("schema", "validate", "submit"):
fn = get_function(f"contact.{role}")
assert fn is not None, f"contact.{role} not registered"
assert fn._meta["form"] is True
assert fn._meta["form_name"] == "contact"
assert fn._meta["form_role"] == role
def test_schema_role_emits_field_definitions():
form_cls = _make_contact_form()
SchemaFn = get_function("contact.schema")
schema = SchemaFn(request=None).call(None)
assert schema.name == "contact"
assert schema.title == "Contact Us"
assert schema.submit_label == "Send"
field_names = {f.name for f in schema.fields}
assert field_names == {"name", "email", "message"}
# `message` has a default → not required; `name`/`email` required
by_name = {f.name: f for f in schema.fields}
assert by_name["name"].required is True
assert by_name["message"].required is False
def test_build_form_schema_maps_types():
class TypedForm(mizanForm):
mizan = FormConfig(name="typed")
count: int
ratio: float
active: bool
label: str
schema = build_form_schema(TypedForm)
by_name = {f.name: f for f in schema.fields}
assert by_name["count"].type == "number"
assert by_name["ratio"].type == "number"
assert by_name["active"].type == "checkbox"
assert by_name["label"].type == "text"
def test_validate_role_passes_clean_data():
_make_contact_form()
ValidateFn = get_function("contact.validate")
ValidateInput = ValidateFn.Input
out = ValidateFn(request=None).call(ValidateInput(data={"name": "Ryth", "email": "r@x.com"}))
assert isinstance(out, FormValidation)
assert out.errors == []
def test_validate_role_reports_field_errors():
_make_contact_form()
ValidateFn = get_function("contact.validate")
ValidateInput = ValidateFn.Input
out = ValidateFn(request=None).call(ValidateInput(data={"email": "r@x.com"})) # missing 'name'
error_fields = {e.field for e in out.errors}
assert "name" in error_fields
def test_submit_role_runs_on_submit_success():
_make_contact_form()
SubmitFn = get_function("contact.submit")
SubmitInput = SubmitFn.Input
result = SubmitFn(request=None).call(
SubmitInput(data={"name": "Ryth", "email": "ryth@example.com", "message": "hi"})
)
assert isinstance(result, FormSubmitPass)
assert result.success is True
assert result.data == {"sent": True, "to": "ryth@example.com"}
def test_submit_role_returns_fail_on_invalid():
captured = {}
class GuardedForm(mizanForm):
mizan = FormConfig(name="guarded")
name: str
def on_submit_failure(self, request, errors) -> None:
captured["errors"] = errors
SubmitFn = get_function("guarded.submit")
SubmitInput = SubmitFn.Input
result = SubmitFn(request=None).call(SubmitInput(data={})) # missing required 'name'
assert isinstance(result, FormSubmitFail)
assert result.success is False
assert any(e.field == "name" for e in result.errors.errors)
# on_submit_failure hook fired with the validation
assert "errors" in captured
def test_get_forms_groups_by_form_name():
_make_contact_form()
forms = get_forms()
assert set(forms.keys()) == {"contact"}
assert len(forms["contact"]) == 3

View File

@@ -1,98 +0,0 @@
"""FastAPI parity with Django: X-Mizan-Invalidate header, origin cache, token auth."""
from __future__ import annotations
import pytest
from fastapi import Depends, FastAPI
from fastapi.testclient import TestClient
from pydantic import BaseModel
from mizan_core.auth import AuthConfig, JWTConfig, create_access_token
from mizan_core.cache.backend import MemoryCache
from mizan_core.client.function import client
from mizan_core.dispatch import CacheOrchestrator
from mizan_core.registry import clear_registry, register
from mizan_fastapi import (
MizanAuthMiddleware,
MizanConfig,
MizanError,
mizan_auth,
mizan_exception_handler,
router as mizan_router,
)
class Out(BaseModel):
ok: bool
SECRET = "x" * 32
JWT = JWTConfig(private_key=SECRET, public_key=SECRET)
def _app(*, with_cache=False, with_auth_dep=False) -> FastAPI:
clear_registry()
UserCtx = "user"
@client(context=UserCtx)
def user_profile(request, user_id: int) -> Out:
return Out(ok=True)
@client(affects=UserCtx)
def update_profile(request, user_id: int) -> Out:
return Out(ok=True)
@client(auth=True)
def whoami(request) -> Out:
return Out(ok=True)
register(user_profile, "user_profile")
register(update_profile, "update_profile")
register(whoami, "whoami")
app = FastAPI()
cache = CacheOrchestrator(MemoryCache(), SECRET) if with_cache else CacheOrchestrator(None, None)
app.state.mizan_config = MizanConfig(auth=AuthConfig(jwt=JWT), cache=cache)
deps = [Depends(mizan_auth())] if with_auth_dep else []
app.include_router(mizan_router, prefix="/api/mizan", dependencies=deps)
app.add_exception_handler(MizanError, mizan_exception_handler)
return app
def test_mutation_emits_invalidate_header():
c = TestClient(_app())
r = c.post("/api/mizan/call/", json={"fn": "update_profile", "args": {"user_id": 5}})
assert r.status_code == 200
assert r.json()["invalidate"] == [{"context": "user", "params": {"user_id": 5}}]
assert r.headers["X-Mizan-Invalidate"] == "user;user_id=5"
def test_origin_cache_hit_miss():
c = TestClient(_app(with_cache=True))
r1 = c.get("/api/mizan/ctx/user/", params={"user_id": 5})
assert r1.status_code == 200 and r1.headers["X-Mizan-Cache"] == "MISS"
r2 = c.get("/api/mizan/ctx/user/", params={"user_id": 5})
assert r2.headers["X-Mizan-Cache"] == "HIT"
assert r1.content == r2.content
def test_auth_required_rejects_anonymous():
c = TestClient(_app())
r = c.post("/api/mizan/call/", json={"fn": "whoami", "args": {}})
assert r.status_code == 401
def test_auth_required_passes_with_bearer_jwt():
c = TestClient(_app(with_auth_dep=True))
tok = create_access_token("7", "sess", JWT, is_staff=True)
r = c.post("/api/mizan/call/", json={"fn": "whoami", "args": {}},
headers={"Authorization": f"Bearer {tok}"})
assert r.status_code == 200 and r.json()["result"] == {"ok": True}
def test_invalid_bearer_token_rejected():
c = TestClient(_app())
r = c.post("/api/mizan/call/", json={"fn": "update_profile", "args": {"user_id": 1}},
headers={"Authorization": "Bearer not-a-real-token"})
assert r.status_code == 401

View File

@@ -1,269 +0,0 @@
"""
Shapes behavior — the genuine capability behind the `shapes` probe.
Proves the SQLAlchemy binding has the same Shape declaration surface and
projection/diff semantics as the Django `django-readers` binding:
- `Shape[Model]` resolves the mapped model + PK from the generic arg;
- scalar annotations project columns, Shape-typed annotations project relations;
- `.query(session, *stmt_fns, **relation_stmt)` flat / nested / scoped;
- nested loads stay flat (selectinload, not N+1);
- `.diff()` / `.diff_many()` detect field changes + nested created/updated/deleted.
"""
from __future__ import annotations
import pytest
from sqlalchemy import ForeignKey, create_engine, event
from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column, relationship
from mizan_fastapi.shapes import Diff, NestedDiff, Shape
# ─── Mapped models ────────────────────────────────────────────────────────────
class Base(DeclarativeBase):
pass
class Publisher(Base):
__tablename__ = "publisher"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str]
country: Mapped[str]
authors: Mapped[list["Author"]] = relationship(back_populates="publisher")
class Author(Base):
__tablename__ = "author"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str]
bio: Mapped[str] = mapped_column(default="")
publisher_id: Mapped[int] = mapped_column(ForeignKey("publisher.id"))
publisher: Mapped[Publisher] = relationship(back_populates="authors")
books: Mapped[list["Book"]] = relationship(back_populates="author")
class Book(Base):
__tablename__ = "book"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str]
is_published: Mapped[bool] = mapped_column(default=True)
author_id: Mapped[int] = mapped_column(ForeignKey("author.id"))
author: Mapped[Author] = relationship(back_populates="books")
# ─── Shapes (declaration surface identical to the Django adapter) ──────────────
class FlatAuthorShape(Shape[Author]):
id: int | None = None
name: str
class FlatBookShape(Shape[Book]):
id: int | None = None
title: str
is_published: bool
class BookCardShape(Shape[Book]):
id: int | None = None
title: str
is_published: bool
author: FlatAuthorShape # single nested FK
class AuthorCardShape(Shape[Author]):
id: int | None = None
name: str
bio: str
books: list[FlatBookShape] = [] # list nested reverse-FK
class PublisherDetailShape(Shape[Publisher]):
id: int | None = None
name: str
authors: list[AuthorCardShape] = [] # 3-level nesting
# ─── Fixtures ──────────────────────────────────────────────────────────────────
@pytest.fixture
def session():
engine = create_engine("sqlite://")
Base.metadata.create_all(engine)
with Session(engine) as s:
pub = Publisher(name="Orbit", country="UK")
ann = Author(name="Ann Leckie", bio="Imperial Radch", publisher=pub)
devi = Author(name="Devi Pillai", bio="", publisher=pub)
ann.books = [
Book(title="Ancillary Justice", is_published=True),
Book(title="Provenance", is_published=False),
]
s.add_all([pub, ann, devi])
s.commit()
yield s
# ─── Declaration ────────────────────────────────────────────────────────────────
def test_shape_resolves_model_and_pk():
assert FlatAuthorShape._model is Author
assert FlatAuthorShape._pk_field == "id"
def test_flat_shape_has_no_nested():
assert FlatAuthorShape._nested == {}
assert FlatAuthorShape._field_names == ["id", "name"]
def test_single_nested_detected():
assert BookCardShape._nested == {"author": FlatAuthorShape}
def test_list_nested_detected():
assert AuthorCardShape._nested == {"books": FlatBookShape}
# ─── Query ──────────────────────────────────────────────────────────────────────
def test_flat_query_projects_fields(session):
authors = FlatAuthorShape.query(session)
assert len(authors) == 2
assert {a.name for a in authors} == {"Ann Leckie", "Devi Pillai"}
def test_query_with_stmt_fn_filters(session):
authors = FlatAuthorShape.query(session, lambda s: s.where(Author.name == "Ann Leckie"))
assert [a.name for a in authors] == ["Ann Leckie"]
def test_single_nested_fk_projected(session):
books = BookCardShape.query(session, lambda s: s.where(Book.title == "Ancillary Justice"))
assert len(books) == 1
assert books[0].author.name == "Ann Leckie"
def test_list_nested_reverse_fk_projected(session):
authors = AuthorCardShape.query(session, lambda s: s.where(Author.name == "Ann Leckie"))
assert len(authors) == 1
assert {b.title for b in authors[0].books} == {"Ancillary Justice", "Provenance"}
def test_empty_nested_list(session):
authors = AuthorCardShape.query(session, lambda s: s.where(Author.name == "Devi Pillai"))
assert authors[0].books == []
def test_three_level_nesting(session):
pubs = PublisherDetailShape.query(session)
assert len(pubs) == 1
leckie = next(a for a in pubs[0].authors if a.name == "Ann Leckie")
assert len(leckie.books) == 2
def test_relation_stmt_scopes_nested_load(session):
authors = AuthorCardShape.query(
session,
lambda s: s.where(Author.name == "Ann Leckie"),
books=lambda s: s.where(Book.is_published.is_(True)),
)
assert [b.title for b in authors[0].books] == ["Ancillary Justice"]
assert all(b.is_published for b in authors[0].books)
def test_nested_query_stays_flat(session):
"""selectinload keeps the projection at O(depth) queries, not N+1."""
counter = {"n": 0}
@event.listens_for(session.bind, "after_cursor_execute")
def _count(*args):
counter["n"] += 1
AuthorCardShape.query(session)
# one query for authors + one selectin for books
assert counter["n"] == 2
# ─── Diff ─────────────────────────────────────────────────────────────────────
def test_diff_no_changes(session):
book = session.query(Book).filter_by(title="Ancillary Justice").one()
shape = FlatBookShape(id=book.id, title="Ancillary Justice", is_published=True)
d = shape.diff(session)
assert d.is_new is False
assert d.changed == {}
def test_diff_detects_field_change(session):
book = session.query(Book).filter_by(title="Ancillary Justice").one()
shape = FlatBookShape(id=book.id, title="Ancillary Justice (rev)", is_published=True)
d = shape.diff(session)
assert d.changed["title"] == "Ancillary Justice (rev)"
def test_diff_new_item(session):
shape = FlatBookShape(id=None, title="Elantris", is_published=True)
d = shape.diff(session)
assert d.is_new is True
assert "title" in d.changed
def test_diff_nonexistent_pk_raises(session):
shape = FlatBookShape(id=999999, title="Ghost", is_published=False)
with pytest.raises(LookupError):
shape.diff(session)
def test_nested_diff_created_updated_deleted(session):
author = session.query(Author).filter_by(name="Ann Leckie").one()
books = sorted(author.books, key=lambda b: b.title)
# keep one (updated), drop one (deleted), add one (created)
shape = AuthorCardShape(
id=author.id,
name="Ann Leckie",
bio="Imperial Radch",
books=[
FlatBookShape(id=books[0].id, title="Ancillary Justice REWRITTEN", is_published=True),
FlatBookShape(id=None, title="Ancillary Sword", is_published=True),
],
)
d = shape.diff(session)
assert len(d.books.updated) == 1
assert len(d.books.created) == 1
assert len(d.books.deleted) == 1
def test_diff_strict_nested_access_raises_on_typo(session):
author = session.query(Author).filter_by(name="Ann Leckie").one()
shape = FlatAuthorShape(id=author.id, name="Ann Leckie")
d = shape.diff(session)
with pytest.raises(AttributeError):
_ = d.bookz
with pytest.raises(KeyError):
d.nested("bookz")
def test_diff_many_batches(session):
books = session.query(Book).all()
items = [FlatBookShape(id=b.id, title=b.title + "!", is_published=b.is_published) for b in books]
results = FlatBookShape.diff_many(session, items)
assert len(results) == len(books)
assert all("title" in d.changed for _, d in results)
def test_diff_many_mixed_new_and_existing(session):
book = session.query(Book).first()
items = [
FlatBookShape(id=book.id, title=book.title, is_published=book.is_published),
FlatBookShape(id=None, title="Brand New", is_published=False),
]
results = FlatBookShape.diff_many(session, items)
assert sum(1 for _, d in results if d.is_new) == 1
assert sum(1 for _, d in results if not d.is_new) == 1

View File

@@ -1,138 +0,0 @@
"""
SSR behavior — the genuine capability behind the `ssr_bridge` probe.
The SSR subprocess lifecycle + JSON-RPC protocol live in the shared
`mizan_core.ssr.SSRBridge`; the FastAPI `SSRRenderer` resolves a component path
against `dirs`, drives the bridge, and wraps the result with the hydration script
the client reads on mount.
Bun is not assumed present in CI, so the bridge is driven against a stand-in
worker that speaks the SAME newline-delimited JSON-RPC protocol (ready signal +
`render` → `{id, html}`). That exercises the real bridge code path (spawn,
message-ID correlation, threaded reader) — only the renderer binary is swapped.
The path-resolution and hydration-wrapping are tested directly.
"""
from __future__ import annotations
import sys
import textwrap
import pytest
from mizan_core.ssr import SSRBridge
from mizan_fastapi.ssr import SSRRenderer
# A Python stand-in for the Bun worker: emits the ready signal, then for each
# render request echoes a deterministic HTML fragment built from the props.
_FAKE_WORKER = textwrap.dedent(
"""
import json, sys
sys.stdout.write(json.dumps({"id": 0, "ready": True}) + "\\n"); sys.stdout.flush()
for line in sys.stdin:
line = line.strip()
if not line:
continue
msg = json.loads(line)
if msg.get("method") == "render":
props = msg["params"]["props"]
html = "<p>" + props.get("name", "") + "</p>"
sys.stdout.write(json.dumps({"id": msg["id"], "html": html}) + "\\n")
sys.stdout.flush()
"""
)
@pytest.fixture
def fake_worker(tmp_path):
worker = tmp_path / "fake_worker.py"
worker.write_text(_FAKE_WORKER, encoding="utf-8")
return str(worker)
@pytest.fixture
def python_bridge(fake_worker, monkeypatch):
"""An `SSRBridge` whose subprocess is python (not bun), driving the fake worker."""
import subprocess
real_popen = subprocess.Popen
def fake_popen(cmd, *args, **kwargs):
# Swap the `bun run <worker>` invocation for `python <worker>`.
if cmd[:2] == ["bun", "run"]:
cmd = [sys.executable, cmd[2]]
return real_popen(cmd, *args, **kwargs)
monkeypatch.setattr(subprocess, "Popen", fake_popen)
bridge = SSRBridge(worker_path=fake_worker, timeout=5.0)
yield bridge
bridge.shutdown()
def test_bridge_round_trips_render(python_bridge):
result = python_bridge.render("/abs/Hello.tsx", {"name": "World"})
assert result.html == "<p>World</p>"
def test_bridge_correlates_concurrent_renders(python_bridge):
# Two renders on the persistent subprocess return their own results.
a = python_bridge.render("/abs/A.tsx", {"name": "A"})
b = python_bridge.render("/abs/B.tsx", {"name": "B"})
assert (a.html, b.html) == ("<p>A</p>", "<p>B</p>")
def test_renderer_resolves_against_dirs_and_wraps_hydration(fake_worker, monkeypatch, tmp_path):
import subprocess
real_popen = subprocess.Popen
monkeypatch.setattr(
subprocess, "Popen",
lambda cmd, *a, **k: real_popen([sys.executable, cmd[2]] if cmd[:2] == ["bun", "run"] else cmd, *a, **k),
)
components = tmp_path / "frontend"
components.mkdir()
(components / "Hello.tsx").write_text("export default () => null", encoding="utf-8")
renderer = SSRRenderer(worker=fake_worker, dirs=[str(components)])
try:
html = renderer.render_to_string("Hello.tsx", {"name": "Mizan"})
finally:
renderer.shutdown()
assert '<div id="mizan-root"><p>Mizan</p></div>' in html
assert 'window.__MIZAN_SSR_DATA__={"name": "Mizan"}' in html
def test_renderer_returns_html_response(fake_worker, monkeypatch, tmp_path):
import subprocess
from fastapi.responses import HTMLResponse
real_popen = subprocess.Popen
monkeypatch.setattr(
subprocess, "Popen",
lambda cmd, *a, **k: real_popen([sys.executable, cmd[2]] if cmd[:2] == ["bun", "run"] else cmd, *a, **k),
)
components = tmp_path / "frontend"
components.mkdir()
(components / "Card.tsx").write_text("export default () => null", encoding="utf-8")
renderer = SSRRenderer(worker=fake_worker, dirs=[str(components)])
try:
response = renderer.render("Card.tsx", {"name": "x"})
finally:
renderer.shutdown()
assert isinstance(response, HTMLResponse)
assert response.status_code == 200
def test_renderer_raises_on_missing_component(fake_worker, tmp_path):
renderer = SSRRenderer(worker=fake_worker, dirs=[str(tmp_path)])
try:
with pytest.raises(FileNotFoundError):
renderer.render_to_string("Nope.tsx", {})
finally:
renderer.shutdown()

View File

@@ -1,71 +0,0 @@
"""Upload dispatch over FastAPI multipart — files bind into Upload fields and
the declarative `File(...)` constraints are enforced."""
from __future__ import annotations
import json
from typing import Annotated
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from pydantic import BaseModel
from mizan_core.client.function import client
from mizan_core.registry import clear_registry, register
from mizan_fastapi import File, MizanError, Upload, mizan_exception_handler, router as mizan_router
class AvatarOut(BaseModel):
ok: bool
size: int
name: str | None = None
@pytest.fixture
def app():
clear_registry()
@client
def set_avatar(
request,
user_id: int,
avatar: Annotated[Upload, File(max_size="1MB", content_types=["image/png"])],
) -> AvatarOut:
return AvatarOut(ok=True, size=avatar.size, name=avatar.filename)
register(set_avatar, "set_avatar")
fastapi_app = FastAPI()
fastapi_app.include_router(mizan_router, prefix="/api/mizan")
fastapi_app.add_exception_handler(MizanError, mizan_exception_handler)
return fastapi_app
def _post(test_client: TestClient, args: dict, file_tuple: tuple):
return test_client.post(
"/api/mizan/call/",
data={"fn": "set_avatar", "args": json.dumps(args)},
files={"avatar": file_tuple},
)
def test_upload_binds_and_executes(app):
resp = _post(TestClient(app), {"user_id": 5}, ("a.png", b"\x89PNG" + b"x" * 100, "image/png"))
assert resp.status_code == 200, resp.text
result = resp.json()["result"]
assert result["ok"] is True
assert result["name"] == "a.png"
assert result["size"] == 104
def test_max_size_rejected(app):
resp = _post(TestClient(app), {"user_id": 5}, ("b.png", b"x" * (2 * 1024 * 1024), "image/png"))
assert resp.status_code == 400
assert "max size" in resp.text
def test_content_type_rejected(app):
resp = _post(TestClient(app), {"user_id": 5}, ("c.gif", b"GIF89a", "image/gif"))
assert resp.status_code == 400
assert "content-type" in resp.text

View File

@@ -1,145 +0,0 @@
"""
WebSocket RPC behavior — the genuine capability behind the `websocket` probe.
Proves the `/ws/` route dispatches `@client(websocket=True)` functions through
the SAME `mizan_core.dispatch` core as `POST /call/`: input validation, the
`{result, invalidate, merge}` envelope, `auth=` enforcement, and the
websocket=True gate that rejects HTTP-only functions. The frame protocol matches
mizan-django's Channels consumer (`action:"rpc"` → `{id, ok, data|error}`).
"""
from __future__ import annotations
import pytest
from fastapi import FastAPI
from fastapi.exceptions import RequestValidationError
from fastapi.testclient import TestClient
from pydantic import BaseModel
from mizan_core.client.function import client
from mizan_core.registry import clear_registry, register
from mizan_fastapi import (
MizanError,
mizan_exception_handler,
mizan_validation_handler,
router as mizan_router,
)
class EchoOut(BaseModel):
message: str
@pytest.fixture
def app():
clear_registry()
@client(websocket=True)
def ws_echo(request, text: str) -> EchoOut:
return EchoOut(message=f"ws: {text}")
@client(websocket=True)
def ws_add(request, a: int, b: int) -> dict:
return {"total": a + b}
@client(websocket=True, affects="user")
def ws_update(request, user_id: int) -> dict:
return {"ok": True}
@client(websocket=True, auth=True)
def ws_secret(request) -> dict:
return {"secret": True}
@client # HTTP-only — must be rejected over WS
def http_only(request) -> dict:
return {"http": True}
for fn, name in (
(ws_echo, "ws_echo"), (ws_add, "ws_add"), (ws_update, "ws_update"),
(ws_secret, "ws_secret"), (http_only, "http_only"),
):
register(fn, name)
fastapi_app = FastAPI()
fastapi_app.include_router(mizan_router, prefix="/api/mizan")
fastapi_app.add_exception_handler(MizanError, mizan_exception_handler)
fastapi_app.add_exception_handler(RequestValidationError, mizan_validation_handler)
yield fastapi_app
clear_registry()
@pytest.fixture
def http(app):
return TestClient(app)
def test_ws_rpc_dispatches_and_returns_data(http):
with http.websocket_connect("/api/mizan/ws/") as ws:
ws.send_json({"action": "rpc", "id": "1", "fn": "ws_echo", "args": {"text": "hi"}})
frame = ws.receive_json()
assert frame == {"id": "1", "ok": True, "data": {"message": "ws: hi"}, "invalidate": []}
def test_ws_rpc_validates_input_through_core(http):
with http.websocket_connect("/api/mizan/ws/") as ws:
ws.send_json({"action": "rpc", "id": "2", "fn": "ws_add", "args": {"a": "nope", "b": 3}})
frame = ws.receive_json()
assert frame["ok"] is False
assert frame["error"]["code"] == "VALIDATION_ERROR"
def test_ws_rpc_carries_invalidation(http):
with http.websocket_connect("/api/mizan/ws/") as ws:
ws.send_json({"action": "rpc", "id": "3", "fn": "ws_update", "args": {"user_id": 5}})
frame = ws.receive_json()
assert frame["ok"] is True
assert "user" in frame["invalidate"]
def test_http_only_function_is_forbidden_over_ws(http):
with http.websocket_connect("/api/mizan/ws/") as ws:
ws.send_json({"action": "rpc", "id": "4", "fn": "http_only", "args": {}})
frame = ws.receive_json()
assert frame["ok"] is False
assert frame["error"]["code"] == "FORBIDDEN"
def test_unknown_function_over_ws_is_not_found(http):
with http.websocket_connect("/api/mizan/ws/") as ws:
ws.send_json({"action": "rpc", "id": "5", "fn": "ghost", "args": {}})
frame = ws.receive_json()
assert frame["ok"] is False
assert frame["error"]["code"] == "NOT_FOUND"
def test_auth_required_function_rejects_anonymous_over_ws(http):
with http.websocket_connect("/api/mizan/ws/") as ws:
ws.send_json({"action": "rpc", "id": "6", "fn": "ws_secret", "args": {}})
frame = ws.receive_json()
assert frame["ok"] is False
assert frame["error"]["code"] == "UNAUTHORIZED"
def test_missing_fn_field_is_bad_request(http):
with http.websocket_connect("/api/mizan/ws/") as ws:
ws.send_json({"action": "rpc", "id": "7"})
frame = ws.receive_json()
assert frame["ok"] is False
assert frame["error"]["code"] == "BAD_REQUEST"
def test_unknown_action_errors(http):
with http.websocket_connect("/api/mizan/ws/") as ws:
ws.send_json({"action": "bogus"})
frame = ws.receive_json()
assert "error" in frame
def test_multiple_calls_on_one_connection(http):
with http.websocket_connect("/api/mizan/ws/") as ws:
ws.send_json({"action": "rpc", "id": "a", "fn": "ws_echo", "args": {"text": "1"}})
first = ws.receive_json()
ws.send_json({"action": "rpc", "id": "b", "fn": "ws_echo", "args": {"text": "2"}})
second = ws.receive_json()
assert first["data"]["message"] == "ws: 1"
assert second["data"]["message"] == "ws: 2"