AFI parity: close all 35 gaps — every adapter wires every AFI-common capability

The conformance board (tests/afi/test_capability_parity.py) is now fully green:
90 capability cells + 4 meta-locks + 3 codegen byte-parity = 97 passed. The
gaps the prose table used to launder as "Django-only" / "out of scope" are
wired, against the pinned-spec model (single-authored spec, byte-identical
conformance across languages) — never per-language reimplementation.

FastAPI — edge_manifest + PSR (logic single-sourced in mizan_core.manifest),
WebSocket RPC (/ws/ through the shared dispatch), SSR (the framework-agnostic
SSRBridge relocated to mizan_core.ssr; Django rides it from there), Shapes
(SQLAlchemy projection, same declaration surface as django-readers), Forms
(Pydantic schema/validate/submit).

Rust (Axum + Tauri + cores/mizan-rust) — X-Mizan-Invalidate header, auth=
enforcement, origin HMAC cache, edge manifest + PSR, WebSocket handler / IPC
subscription channel, multipart upload, SSR bridge, Shapes, Forms; JWT/MWT
mint+verify and cache-key derivation byte-pinned to the Python reference
(cache_keys_pin, token_pin, invalidate_header_pin).

TypeScript — a KDL IR emitter byte-identical to the Python build_ir (so a TS
backend can feed the codegen — the largest gap), multipart upload, session-init,
WebSocket transport, SSR bridge, JWT/MWT mint (pinned to Python), Shapes, Forms.

Verified in the merged tree: core 25, fastapi 74, django 353/21-skip,
mizan-rust (incl. cross-language pins) green, axum 10, tauri 8, mizan-ts 103/2-skip.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-04 13:44:35 -04:00
parent 58d2cb2848
commit 6c5f6f1fba
81 changed files with 9893 additions and 463 deletions

View File

@@ -0,0 +1,167 @@
"""
Edge-manifest + PSR behavior — the genuine capability behind the
`edge_manifest` and `psr` probes.
Proves the FastAPI adapter emits the manifest the spec defines (contexts,
mutations, params, user_scoped, render_strategy, page_routes) by deriving it from
a real registry, and that `render_strategy` falls out of the user-scoped-param
rule: a context whose params overlap {user_id, user, owner_id, account_id} is
`dynamic_cached`, otherwise `psr`.
"""
from __future__ import annotations
import json
import subprocess
import sys
import pytest
from fastapi.responses import Response
import mizan_fastapi # registers the Starlette Response base for view-path detection
from mizan_core.client.function import client
from mizan_core.registry import clear_registry, register
from mizan_fastapi import edge_manifest, generate_edge_manifest
from mizan_fastapi.manifest import render_strategies
@pytest.fixture(autouse=True)
def _clean_registry():
clear_registry()
yield
clear_registry()
def _register(fn, name):
register(fn, name)
def test_user_scoped_context_is_dynamic_cached():
@client(context="user")
def user_profile(request, user_id: int) -> dict:
return {"id": user_id}
_register(user_profile, "user_profile")
manifest = edge_manifest()
ctx = manifest["contexts"]["user"]
assert ctx["user_scoped"] is True
assert ctx["render_strategy"] == "dynamic_cached"
assert ctx["params"] == ["user_id"]
assert ctx["endpoints"] == ["/api/mizan/ctx/user/"]
def test_non_user_scoped_context_is_psr():
@client(context="catalog")
def catalog_items(request, category: str) -> list[dict]:
return [{"category": category}]
_register(catalog_items, "catalog_items")
ctx = edge_manifest()["contexts"]["catalog"]
assert ctx["user_scoped"] is False
assert ctx["render_strategy"] == "psr"
def test_render_strategies_maps_each_context():
@client(context="user")
def me(request, user_id: int) -> dict:
return {"id": user_id}
@client(context="catalog")
def items(request) -> list[dict]:
return []
_register(me, "me")
_register(items, "items")
strategies = render_strategies()
assert strategies == {"user": "dynamic_cached", "catalog": "psr"}
def test_mutation_records_affects_and_auto_scope():
@client(context="user")
def user_profile(request, user_id: int) -> dict:
return {"id": user_id}
@client(affects="user")
def rename(request, user_id: int, name: str) -> dict:
return {"ok": True}
_register(user_profile, "user_profile")
_register(rename, "rename")
mutation = edge_manifest()["mutations"]["rename"]
assert mutation["affects"] == ["user"]
# user_id matches the context's param → auto-scoped
assert mutation["auto_scoped_params"] == ["user_id"]
def test_private_and_route_mutation_carried():
@client(affects="subscription", private=True, route="/webhooks/stripe/", methods=["POST"])
def stripe_webhook(request) -> Response:
return Response(status_code=200)
@client(context="subscription")
def subscription(request, user_id: int) -> dict:
return {"id": user_id}
_register(stripe_webhook, "stripe_webhook")
_register(subscription, "subscription")
mutation = edge_manifest()["mutations"]["stripe_webhook"]
assert mutation["private"] is True
assert mutation["route"] == "/webhooks/stripe/"
assert mutation["methods"] == ["POST"]
def test_view_path_function_records_route_and_page_routes():
@client(context="profile", route="/profile/<user_id>/")
def profile_page(request, user_id: int) -> Response:
return Response(status_code=200)
_register(profile_page, "profile_page")
ctx = edge_manifest()["contexts"]["profile"]
assert ctx["page_routes"] == ["/profile/<user_id>/"]
fn_entry = next(f for f in ctx["functions"] if f["name"] == "profile_page")
assert fn_entry["path"] == "view"
assert fn_entry["route"] == "/profile/<user_id>/"
def test_fastapi_manifest_matches_core_derivation():
"""The adapter callable is a thin pass-through to the shared core derivation."""
@client(context="user")
def user_profile(request, user_id: int) -> dict:
return {"id": user_id}
_register(user_profile, "user_profile")
assert edge_manifest() == generate_edge_manifest(base_url="/api/mizan")
def test_cli_entry_emits_manifest_json(tmp_path):
"""`mizan-fastapi-edge-manifest <module>` imports the module then prints JSON."""
app_module = tmp_path / "manifest_app.py"
app_module.write_text(
"from mizan_core.client.function import client\n"
"from mizan_core.registry import register\n"
"@client(context='user')\n"
"def user_profile(request, user_id: int) -> dict:\n"
" return {'id': user_id}\n"
"register(user_profile, 'user_profile')\n",
encoding="utf-8",
)
result = subprocess.run(
[sys.executable, "-m", "mizan_fastapi.manifest", "manifest_app", "--indent", "0"],
cwd=tmp_path,
capture_output=True,
text=True,
check=False,
)
assert result.returncode == 0, result.stderr
manifest = json.loads(result.stdout)
assert manifest["contexts"]["user"]["render_strategy"] == "dynamic_cached"

View File

@@ -0,0 +1,145 @@
"""
Forms behavior — the genuine capability behind the `forms` probe.
Proves the Pydantic binding exposes the same schema / validate / submit role
contract as the Django adapter: subclassing `mizanForm` auto-registers
`{name}.schema`, `{name}.validate`, `{name}.submit` with the matching
`_meta["form_role"]`, the schema role emits typed field definitions, validate
returns structured field errors, and submit validates then runs
`on_submit_success` / `on_submit_failure`.
"""
from __future__ import annotations
import pytest
from mizan_core.registry import clear_registry, get_function
from mizan_fastapi.forms import (
FormConfig,
FormSubmitFail,
FormSubmitPass,
FormValidation,
build_form_schema,
get_forms,
mizanForm,
)
@pytest.fixture(autouse=True)
def _clean():
clear_registry()
yield
clear_registry()
def _make_contact_form():
class ContactForm(mizanForm):
mizan = FormConfig(name="contact", title="Contact Us", submit_label="Send")
name: str
email: str
message: str = ""
def on_submit_success(self, request) -> dict:
return {"sent": True, "to": self.email}
return ContactForm
def test_subclassing_registers_three_role_functions():
_make_contact_form()
for role in ("schema", "validate", "submit"):
fn = get_function(f"contact.{role}")
assert fn is not None, f"contact.{role} not registered"
assert fn._meta["form"] is True
assert fn._meta["form_name"] == "contact"
assert fn._meta["form_role"] == role
def test_schema_role_emits_field_definitions():
form_cls = _make_contact_form()
SchemaFn = get_function("contact.schema")
schema = SchemaFn(request=None).call(None)
assert schema.name == "contact"
assert schema.title == "Contact Us"
assert schema.submit_label == "Send"
field_names = {f.name for f in schema.fields}
assert field_names == {"name", "email", "message"}
# `message` has a default → not required; `name`/`email` required
by_name = {f.name: f for f in schema.fields}
assert by_name["name"].required is True
assert by_name["message"].required is False
def test_build_form_schema_maps_types():
class TypedForm(mizanForm):
mizan = FormConfig(name="typed")
count: int
ratio: float
active: bool
label: str
schema = build_form_schema(TypedForm)
by_name = {f.name: f for f in schema.fields}
assert by_name["count"].type == "number"
assert by_name["ratio"].type == "number"
assert by_name["active"].type == "checkbox"
assert by_name["label"].type == "text"
def test_validate_role_passes_clean_data():
_make_contact_form()
ValidateFn = get_function("contact.validate")
ValidateInput = ValidateFn.Input
out = ValidateFn(request=None).call(ValidateInput(data={"name": "Ryth", "email": "r@x.com"}))
assert isinstance(out, FormValidation)
assert out.errors == []
def test_validate_role_reports_field_errors():
_make_contact_form()
ValidateFn = get_function("contact.validate")
ValidateInput = ValidateFn.Input
out = ValidateFn(request=None).call(ValidateInput(data={"email": "r@x.com"})) # missing 'name'
error_fields = {e.field for e in out.errors}
assert "name" in error_fields
def test_submit_role_runs_on_submit_success():
_make_contact_form()
SubmitFn = get_function("contact.submit")
SubmitInput = SubmitFn.Input
result = SubmitFn(request=None).call(
SubmitInput(data={"name": "Ryth", "email": "ryth@example.com", "message": "hi"})
)
assert isinstance(result, FormSubmitPass)
assert result.success is True
assert result.data == {"sent": True, "to": "ryth@example.com"}
def test_submit_role_returns_fail_on_invalid():
captured = {}
class GuardedForm(mizanForm):
mizan = FormConfig(name="guarded")
name: str
def on_submit_failure(self, request, errors) -> None:
captured["errors"] = errors
SubmitFn = get_function("guarded.submit")
SubmitInput = SubmitFn.Input
result = SubmitFn(request=None).call(SubmitInput(data={})) # missing required 'name'
assert isinstance(result, FormSubmitFail)
assert result.success is False
assert any(e.field == "name" for e in result.errors.errors)
# on_submit_failure hook fired with the validation
assert "errors" in captured
def test_get_forms_groups_by_form_name():
_make_contact_form()
forms = get_forms()
assert set(forms.keys()) == {"contact"}
assert len(forms["contact"]) == 3

View File

@@ -0,0 +1,269 @@
"""
Shapes behavior — the genuine capability behind the `shapes` probe.
Proves the SQLAlchemy binding has the same Shape declaration surface and
projection/diff semantics as the Django `django-readers` binding:
- `Shape[Model]` resolves the mapped model + PK from the generic arg;
- scalar annotations project columns, Shape-typed annotations project relations;
- `.query(session, *stmt_fns, **relation_stmt)` flat / nested / scoped;
- nested loads stay flat (selectinload, not N+1);
- `.diff()` / `.diff_many()` detect field changes + nested created/updated/deleted.
"""
from __future__ import annotations
import pytest
from sqlalchemy import ForeignKey, create_engine, event
from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column, relationship
from mizan_fastapi.shapes import Diff, NestedDiff, Shape
# ─── Mapped models ────────────────────────────────────────────────────────────
class Base(DeclarativeBase):
pass
class Publisher(Base):
__tablename__ = "publisher"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str]
country: Mapped[str]
authors: Mapped[list["Author"]] = relationship(back_populates="publisher")
class Author(Base):
__tablename__ = "author"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str]
bio: Mapped[str] = mapped_column(default="")
publisher_id: Mapped[int] = mapped_column(ForeignKey("publisher.id"))
publisher: Mapped[Publisher] = relationship(back_populates="authors")
books: Mapped[list["Book"]] = relationship(back_populates="author")
class Book(Base):
__tablename__ = "book"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str]
is_published: Mapped[bool] = mapped_column(default=True)
author_id: Mapped[int] = mapped_column(ForeignKey("author.id"))
author: Mapped[Author] = relationship(back_populates="books")
# ─── Shapes (declaration surface identical to the Django adapter) ──────────────
class FlatAuthorShape(Shape[Author]):
id: int | None = None
name: str
class FlatBookShape(Shape[Book]):
id: int | None = None
title: str
is_published: bool
class BookCardShape(Shape[Book]):
id: int | None = None
title: str
is_published: bool
author: FlatAuthorShape # single nested FK
class AuthorCardShape(Shape[Author]):
id: int | None = None
name: str
bio: str
books: list[FlatBookShape] = [] # list nested reverse-FK
class PublisherDetailShape(Shape[Publisher]):
id: int | None = None
name: str
authors: list[AuthorCardShape] = [] # 3-level nesting
# ─── Fixtures ──────────────────────────────────────────────────────────────────
@pytest.fixture
def session():
engine = create_engine("sqlite://")
Base.metadata.create_all(engine)
with Session(engine) as s:
pub = Publisher(name="Orbit", country="UK")
ann = Author(name="Ann Leckie", bio="Imperial Radch", publisher=pub)
devi = Author(name="Devi Pillai", bio="", publisher=pub)
ann.books = [
Book(title="Ancillary Justice", is_published=True),
Book(title="Provenance", is_published=False),
]
s.add_all([pub, ann, devi])
s.commit()
yield s
# ─── Declaration ────────────────────────────────────────────────────────────────
def test_shape_resolves_model_and_pk():
assert FlatAuthorShape._model is Author
assert FlatAuthorShape._pk_field == "id"
def test_flat_shape_has_no_nested():
assert FlatAuthorShape._nested == {}
assert FlatAuthorShape._field_names == ["id", "name"]
def test_single_nested_detected():
assert BookCardShape._nested == {"author": FlatAuthorShape}
def test_list_nested_detected():
assert AuthorCardShape._nested == {"books": FlatBookShape}
# ─── Query ──────────────────────────────────────────────────────────────────────
def test_flat_query_projects_fields(session):
authors = FlatAuthorShape.query(session)
assert len(authors) == 2
assert {a.name for a in authors} == {"Ann Leckie", "Devi Pillai"}
def test_query_with_stmt_fn_filters(session):
authors = FlatAuthorShape.query(session, lambda s: s.where(Author.name == "Ann Leckie"))
assert [a.name for a in authors] == ["Ann Leckie"]
def test_single_nested_fk_projected(session):
books = BookCardShape.query(session, lambda s: s.where(Book.title == "Ancillary Justice"))
assert len(books) == 1
assert books[0].author.name == "Ann Leckie"
def test_list_nested_reverse_fk_projected(session):
authors = AuthorCardShape.query(session, lambda s: s.where(Author.name == "Ann Leckie"))
assert len(authors) == 1
assert {b.title for b in authors[0].books} == {"Ancillary Justice", "Provenance"}
def test_empty_nested_list(session):
authors = AuthorCardShape.query(session, lambda s: s.where(Author.name == "Devi Pillai"))
assert authors[0].books == []
def test_three_level_nesting(session):
pubs = PublisherDetailShape.query(session)
assert len(pubs) == 1
leckie = next(a for a in pubs[0].authors if a.name == "Ann Leckie")
assert len(leckie.books) == 2
def test_relation_stmt_scopes_nested_load(session):
authors = AuthorCardShape.query(
session,
lambda s: s.where(Author.name == "Ann Leckie"),
books=lambda s: s.where(Book.is_published.is_(True)),
)
assert [b.title for b in authors[0].books] == ["Ancillary Justice"]
assert all(b.is_published for b in authors[0].books)
def test_nested_query_stays_flat(session):
"""selectinload keeps the projection at O(depth) queries, not N+1."""
counter = {"n": 0}
@event.listens_for(session.bind, "after_cursor_execute")
def _count(*args):
counter["n"] += 1
AuthorCardShape.query(session)
# one query for authors + one selectin for books
assert counter["n"] == 2
# ─── Diff ─────────────────────────────────────────────────────────────────────
def test_diff_no_changes(session):
book = session.query(Book).filter_by(title="Ancillary Justice").one()
shape = FlatBookShape(id=book.id, title="Ancillary Justice", is_published=True)
d = shape.diff(session)
assert d.is_new is False
assert d.changed == {}
def test_diff_detects_field_change(session):
book = session.query(Book).filter_by(title="Ancillary Justice").one()
shape = FlatBookShape(id=book.id, title="Ancillary Justice (rev)", is_published=True)
d = shape.diff(session)
assert d.changed["title"] == "Ancillary Justice (rev)"
def test_diff_new_item(session):
shape = FlatBookShape(id=None, title="Elantris", is_published=True)
d = shape.diff(session)
assert d.is_new is True
assert "title" in d.changed
def test_diff_nonexistent_pk_raises(session):
shape = FlatBookShape(id=999999, title="Ghost", is_published=False)
with pytest.raises(LookupError):
shape.diff(session)
def test_nested_diff_created_updated_deleted(session):
author = session.query(Author).filter_by(name="Ann Leckie").one()
books = sorted(author.books, key=lambda b: b.title)
# keep one (updated), drop one (deleted), add one (created)
shape = AuthorCardShape(
id=author.id,
name="Ann Leckie",
bio="Imperial Radch",
books=[
FlatBookShape(id=books[0].id, title="Ancillary Justice REWRITTEN", is_published=True),
FlatBookShape(id=None, title="Ancillary Sword", is_published=True),
],
)
d = shape.diff(session)
assert len(d.books.updated) == 1
assert len(d.books.created) == 1
assert len(d.books.deleted) == 1
def test_diff_strict_nested_access_raises_on_typo(session):
author = session.query(Author).filter_by(name="Ann Leckie").one()
shape = FlatAuthorShape(id=author.id, name="Ann Leckie")
d = shape.diff(session)
with pytest.raises(AttributeError):
_ = d.bookz
with pytest.raises(KeyError):
d.nested("bookz")
def test_diff_many_batches(session):
books = session.query(Book).all()
items = [FlatBookShape(id=b.id, title=b.title + "!", is_published=b.is_published) for b in books]
results = FlatBookShape.diff_many(session, items)
assert len(results) == len(books)
assert all("title" in d.changed for _, d in results)
def test_diff_many_mixed_new_and_existing(session):
book = session.query(Book).first()
items = [
FlatBookShape(id=book.id, title=book.title, is_published=book.is_published),
FlatBookShape(id=None, title="Brand New", is_published=False),
]
results = FlatBookShape.diff_many(session, items)
assert sum(1 for _, d in results if d.is_new) == 1
assert sum(1 for _, d in results if not d.is_new) == 1

View File

@@ -0,0 +1,138 @@
"""
SSR behavior — the genuine capability behind the `ssr_bridge` probe.
The SSR subprocess lifecycle + JSON-RPC protocol live in the shared
`mizan_core.ssr.SSRBridge`; the FastAPI `SSRRenderer` resolves a component path
against `dirs`, drives the bridge, and wraps the result with the hydration script
the client reads on mount.
Bun is not assumed present in CI, so the bridge is driven against a stand-in
worker that speaks the SAME newline-delimited JSON-RPC protocol (ready signal +
`render` → `{id, html}`). That exercises the real bridge code path (spawn,
message-ID correlation, threaded reader) — only the renderer binary is swapped.
The path-resolution and hydration-wrapping are tested directly.
"""
from __future__ import annotations
import sys
import textwrap
import pytest
from mizan_core.ssr import SSRBridge
from mizan_fastapi.ssr import SSRRenderer
# A Python stand-in for the Bun worker: emits the ready signal, then for each
# render request echoes a deterministic HTML fragment built from the props.
_FAKE_WORKER = textwrap.dedent(
"""
import json, sys
sys.stdout.write(json.dumps({"id": 0, "ready": True}) + "\\n"); sys.stdout.flush()
for line in sys.stdin:
line = line.strip()
if not line:
continue
msg = json.loads(line)
if msg.get("method") == "render":
props = msg["params"]["props"]
html = "<p>" + props.get("name", "") + "</p>"
sys.stdout.write(json.dumps({"id": msg["id"], "html": html}) + "\\n")
sys.stdout.flush()
"""
)
@pytest.fixture
def fake_worker(tmp_path):
worker = tmp_path / "fake_worker.py"
worker.write_text(_FAKE_WORKER, encoding="utf-8")
return str(worker)
@pytest.fixture
def python_bridge(fake_worker, monkeypatch):
"""An `SSRBridge` whose subprocess is python (not bun), driving the fake worker."""
import subprocess
real_popen = subprocess.Popen
def fake_popen(cmd, *args, **kwargs):
# Swap the `bun run <worker>` invocation for `python <worker>`.
if cmd[:2] == ["bun", "run"]:
cmd = [sys.executable, cmd[2]]
return real_popen(cmd, *args, **kwargs)
monkeypatch.setattr(subprocess, "Popen", fake_popen)
bridge = SSRBridge(worker_path=fake_worker, timeout=5.0)
yield bridge
bridge.shutdown()
def test_bridge_round_trips_render(python_bridge):
result = python_bridge.render("/abs/Hello.tsx", {"name": "World"})
assert result.html == "<p>World</p>"
def test_bridge_correlates_concurrent_renders(python_bridge):
# Two renders on the persistent subprocess return their own results.
a = python_bridge.render("/abs/A.tsx", {"name": "A"})
b = python_bridge.render("/abs/B.tsx", {"name": "B"})
assert (a.html, b.html) == ("<p>A</p>", "<p>B</p>")
def test_renderer_resolves_against_dirs_and_wraps_hydration(fake_worker, monkeypatch, tmp_path):
import subprocess
real_popen = subprocess.Popen
monkeypatch.setattr(
subprocess, "Popen",
lambda cmd, *a, **k: real_popen([sys.executable, cmd[2]] if cmd[:2] == ["bun", "run"] else cmd, *a, **k),
)
components = tmp_path / "frontend"
components.mkdir()
(components / "Hello.tsx").write_text("export default () => null", encoding="utf-8")
renderer = SSRRenderer(worker=fake_worker, dirs=[str(components)])
try:
html = renderer.render_to_string("Hello.tsx", {"name": "Mizan"})
finally:
renderer.shutdown()
assert '<div id="mizan-root"><p>Mizan</p></div>' in html
assert 'window.__MIZAN_SSR_DATA__={"name": "Mizan"}' in html
def test_renderer_returns_html_response(fake_worker, monkeypatch, tmp_path):
import subprocess
from fastapi.responses import HTMLResponse
real_popen = subprocess.Popen
monkeypatch.setattr(
subprocess, "Popen",
lambda cmd, *a, **k: real_popen([sys.executable, cmd[2]] if cmd[:2] == ["bun", "run"] else cmd, *a, **k),
)
components = tmp_path / "frontend"
components.mkdir()
(components / "Card.tsx").write_text("export default () => null", encoding="utf-8")
renderer = SSRRenderer(worker=fake_worker, dirs=[str(components)])
try:
response = renderer.render("Card.tsx", {"name": "x"})
finally:
renderer.shutdown()
assert isinstance(response, HTMLResponse)
assert response.status_code == 200
def test_renderer_raises_on_missing_component(fake_worker, tmp_path):
renderer = SSRRenderer(worker=fake_worker, dirs=[str(tmp_path)])
try:
with pytest.raises(FileNotFoundError):
renderer.render_to_string("Nope.tsx", {})
finally:
renderer.shutdown()

View File

@@ -0,0 +1,145 @@
"""
WebSocket RPC behavior — the genuine capability behind the `websocket` probe.
Proves the `/ws/` route dispatches `@client(websocket=True)` functions through
the SAME `mizan_core.dispatch` core as `POST /call/`: input validation, the
`{result, invalidate, merge}` envelope, `auth=` enforcement, and the
websocket=True gate that rejects HTTP-only functions. The frame protocol matches
mizan-django's Channels consumer (`action:"rpc"` → `{id, ok, data|error}`).
"""
from __future__ import annotations
import pytest
from fastapi import FastAPI
from fastapi.exceptions import RequestValidationError
from fastapi.testclient import TestClient
from pydantic import BaseModel
from mizan_core.client.function import client
from mizan_core.registry import clear_registry, register
from mizan_fastapi import (
MizanError,
mizan_exception_handler,
mizan_validation_handler,
router as mizan_router,
)
class EchoOut(BaseModel):
message: str
@pytest.fixture
def app():
clear_registry()
@client(websocket=True)
def ws_echo(request, text: str) -> EchoOut:
return EchoOut(message=f"ws: {text}")
@client(websocket=True)
def ws_add(request, a: int, b: int) -> dict:
return {"total": a + b}
@client(websocket=True, affects="user")
def ws_update(request, user_id: int) -> dict:
return {"ok": True}
@client(websocket=True, auth=True)
def ws_secret(request) -> dict:
return {"secret": True}
@client # HTTP-only — must be rejected over WS
def http_only(request) -> dict:
return {"http": True}
for fn, name in (
(ws_echo, "ws_echo"), (ws_add, "ws_add"), (ws_update, "ws_update"),
(ws_secret, "ws_secret"), (http_only, "http_only"),
):
register(fn, name)
fastapi_app = FastAPI()
fastapi_app.include_router(mizan_router, prefix="/api/mizan")
fastapi_app.add_exception_handler(MizanError, mizan_exception_handler)
fastapi_app.add_exception_handler(RequestValidationError, mizan_validation_handler)
yield fastapi_app
clear_registry()
@pytest.fixture
def http(app):
return TestClient(app)
def test_ws_rpc_dispatches_and_returns_data(http):
with http.websocket_connect("/api/mizan/ws/") as ws:
ws.send_json({"action": "rpc", "id": "1", "fn": "ws_echo", "args": {"text": "hi"}})
frame = ws.receive_json()
assert frame == {"id": "1", "ok": True, "data": {"message": "ws: hi"}, "invalidate": []}
def test_ws_rpc_validates_input_through_core(http):
with http.websocket_connect("/api/mizan/ws/") as ws:
ws.send_json({"action": "rpc", "id": "2", "fn": "ws_add", "args": {"a": "nope", "b": 3}})
frame = ws.receive_json()
assert frame["ok"] is False
assert frame["error"]["code"] == "VALIDATION_ERROR"
def test_ws_rpc_carries_invalidation(http):
with http.websocket_connect("/api/mizan/ws/") as ws:
ws.send_json({"action": "rpc", "id": "3", "fn": "ws_update", "args": {"user_id": 5}})
frame = ws.receive_json()
assert frame["ok"] is True
assert "user" in frame["invalidate"]
def test_http_only_function_is_forbidden_over_ws(http):
with http.websocket_connect("/api/mizan/ws/") as ws:
ws.send_json({"action": "rpc", "id": "4", "fn": "http_only", "args": {}})
frame = ws.receive_json()
assert frame["ok"] is False
assert frame["error"]["code"] == "FORBIDDEN"
def test_unknown_function_over_ws_is_not_found(http):
with http.websocket_connect("/api/mizan/ws/") as ws:
ws.send_json({"action": "rpc", "id": "5", "fn": "ghost", "args": {}})
frame = ws.receive_json()
assert frame["ok"] is False
assert frame["error"]["code"] == "NOT_FOUND"
def test_auth_required_function_rejects_anonymous_over_ws(http):
with http.websocket_connect("/api/mizan/ws/") as ws:
ws.send_json({"action": "rpc", "id": "6", "fn": "ws_secret", "args": {}})
frame = ws.receive_json()
assert frame["ok"] is False
assert frame["error"]["code"] == "UNAUTHORIZED"
def test_missing_fn_field_is_bad_request(http):
with http.websocket_connect("/api/mizan/ws/") as ws:
ws.send_json({"action": "rpc", "id": "7"})
frame = ws.receive_json()
assert frame["ok"] is False
assert frame["error"]["code"] == "BAD_REQUEST"
def test_unknown_action_errors(http):
with http.websocket_connect("/api/mizan/ws/") as ws:
ws.send_json({"action": "bogus"})
frame = ws.receive_json()
assert "error" in frame
def test_multiple_calls_on_one_connection(http):
with http.websocket_connect("/api/mizan/ws/") as ws:
ws.send_json({"action": "rpc", "id": "a", "fn": "ws_echo", "args": {"text": "1"}})
first = ws.receive_json()
ws.send_json({"action": "rpc", "id": "b", "fn": "ws_echo", "args": {"text": "2"}})
second = ws.receive_json()
assert first["data"]["message"] == "ws: 1"
assert second["data"]["message"] == "ws: 2"