"""End-to-end dispatch tests against a real FastAPI app + TestClient.""" from __future__ import annotations import asyncio import pytest from fastapi import FastAPI from fastapi.exceptions import RequestValidationError from fastapi.testclient import TestClient from pydantic import BaseModel from mizan_core.client.function import client from mizan_core.registry import clear_registry, register from mizan_fastapi import ( MizanError, mizan_exception_handler, mizan_validation_handler, router as mizan_router, ) # ─── Fixtures ─────────────────────────────────────────────────────────────── class EchoOutput(BaseModel): message: str class SumOutput(BaseModel): total: int class UserOutput(BaseModel): email: str authenticated: bool class ItemOutput(BaseModel): id: int name: str @pytest.fixture def app(): """Build a fresh FastAPI app + Mizan router with a few @client functions.""" clear_registry() @client def echo(request, text: str) -> EchoOutput: return EchoOutput(message=f"echo: {text}") @client def add(request, a: int, b: int) -> SumOutput: return SumOutput(total=a + b) @client(context="user") def current_user(request) -> UserOutput: return UserOutput(email="anon@example.com", authenticated=False) @client(context="user") def user_count(request) -> SumOutput: return SumOutput(total=42) @client(affects="user") def update_email(request, email: str) -> EchoOutput: return EchoOutput(message=f"updated: {email}") @client(auth=True) def whoami(request) -> UserOutput: return UserOutput(email="real@example.com", authenticated=True) @client def list_items(request) -> list[ItemOutput]: return [ItemOutput(id=1, name="a"), ItemOutput(id=2, name="b")] @client def find_item(request, item_id: int) -> ItemOutput | None: return ItemOutput(id=item_id, name="found") if item_id > 0 else None @client(merge="items") def set_item_name(request, id: int, name: str) -> ItemOutput: return ItemOutput(id=id, name=name) @client(context="items") def items_list(request) -> list[ItemOutput]: return [ItemOutput(id=1, name="orig")] @client async def async_echo(request, text: str) -> EchoOutput: # await something on the loop to prove we're really running async await asyncio.sleep(0) return EchoOutput(message=f"async: {text}") register(echo, "echo") register(add, "add") register(current_user, "current_user") register(user_count, "user_count") register(update_email, "update_email") register(whoami, "whoami") register(list_items, "list_items") register(find_item, "find_item") register(set_item_name, "set_item_name") register(items_list, "items_list") register(async_echo, "async_echo") fastapi_app = FastAPI() fastapi_app.include_router(mizan_router, prefix="/api/mizan") fastapi_app.add_exception_handler(MizanError, mizan_exception_handler) fastapi_app.add_exception_handler(RequestValidationError, mizan_validation_handler) yield fastapi_app clear_registry() @pytest.fixture def http(app): return TestClient(app) # ─── RPC dispatch ─────────────────────────────────────────────────────────── class FunctionCallTests: def test_simple_call_returns_result(self, http): r = http.post("/api/mizan/call/", json={"fn": "echo", "args": {"text": "hi"}}) assert r.status_code == 200 body = r.json() assert body["result"]["message"] == "echo: hi" assert body["invalidate"] == [] def test_call_with_typed_input(self, http): r = http.post("/api/mizan/call/", json={"fn": "add", "args": {"a": 2, "b": 3}}) assert r.status_code == 200 assert r.json()["result"]["total"] == 5 def test_unknown_function_returns_not_found(self, http): r = http.post("/api/mizan/call/", json={"fn": "ghost"}) assert r.status_code == 404 assert r.json()["error"]["code"] == "NOT_FOUND" def test_validation_error_returns_422(self, http): r = http.post("/api/mizan/call/", json={"fn": "add", "args": {"a": "not-int", "b": 3}}) assert r.status_code == 422 assert r.json()["error"]["code"] == "VALIDATION_ERROR" def test_missing_required_input_returns_validation_error(self, http): r = http.post("/api/mizan/call/", json={"fn": "add", "args": {}}) assert r.status_code == 422 assert r.json()["error"]["code"] == "VALIDATION_ERROR" def test_missing_fn_field_returns_400(self, http): r = http.post("/api/mizan/call/", json={}) assert r.status_code == 400 assert r.json()["error"]["code"] == "BAD_REQUEST" def test_invalid_json_returns_400(self, http): r = http.post("/api/mizan/call/", content=b"not json", headers={"content-type": "application/json"}) assert r.status_code == 400 def test_response_carries_no_store(self, http): r = http.post("/api/mizan/call/", json={"fn": "echo", "args": {"text": "x"}}) assert r.headers.get("cache-control") == "no-store" # ─── Context bundling ─────────────────────────────────────────────────────── class ContextFetchTests: def test_context_returns_bundled_results(self, http): r = http.get("/api/mizan/ctx/user/") assert r.status_code == 200 body = r.json() assert "current_user" in body assert "user_count" in body assert body["current_user"]["email"] == "anon@example.com" assert body["user_count"]["total"] == 42 def test_unknown_context_returns_not_found(self, http): r = http.get("/api/mizan/ctx/ghost/") assert r.status_code == 404 assert r.json()["error"]["code"] == "NOT_FOUND" # ─── Invalidation ─────────────────────────────────────────────────────────── class AuthTests: """The decorator normalizes auth=True → meta['auth']='required'; executor must match both.""" def test_anonymous_request_to_auth_required_returns_401(self, http): r = http.post("/api/mizan/call/", json={"fn": "whoami", "args": {}}) assert r.status_code == 401 assert r.json()["error"]["code"] == "UNAUTHORIZED" class InvalidationTests: def test_mutation_emits_invalidate_list(self, http): r = http.post( "/api/mizan/call/", json={"fn": "update_email", "args": {"email": "new@example.com"}}, ) assert r.status_code == 200 body = r.json() # affects='user' is a context-name string → invalidate list contains 'user' assert "user" in body["invalidate"] # ─── Structured-output shapes ─────────────────────────────────────────────── class StructuredOutputTests: """list[BaseModel] and Optional[BaseModel] should reach the wire as bare values, not {result: ...}.""" def test_list_of_basemodel_returns_bare_array(self, http): r = http.post("/api/mizan/call/", json={"fn": "list_items", "args": {}}) assert r.status_code == 200 body = r.json() assert body["result"] == [ {"id": 1, "name": "a"}, {"id": 2, "name": "b"}, ] def test_optional_basemodel_returns_inner_or_none(self, http): r_found = http.post("/api/mizan/call/", json={"fn": "find_item", "args": {"item_id": 5}}) assert r_found.status_code == 200 assert r_found.json()["result"] == {"id": 5, "name": "found"} r_missing = http.post("/api/mizan/call/", json={"fn": "find_item", "args": {"item_id": 0}}) assert r_missing.status_code == 200 assert r_missing.json()["result"] is None # ─── Merge protocol ───────────────────────────────────────────────────────── class AsyncHandlerTests: """`async def` handlers dispatch on the loop via view.acall.""" def test_async_handler_returns_awaited_result(self, http): r = http.post("/api/mizan/call/", json={"fn": "async_echo", "args": {"text": "hello"}}) assert r.status_code == 200 assert r.json()["result"] == {"message": "async: hello"} class MergeTests: """@client(merge=...) emits a `merge` field in the response so the kernel can splice without refetch.""" def test_merge_target_emits_merge_entry(self, http): r = http.post( "/api/mizan/call/", json={"fn": "set_item_name", "args": {"id": 42, "name": "renamed"}}, ) assert r.status_code == 200 body = r.json() # Server resolves slot — items_list returns list[ItemOutput], mutation returns ItemOutput assert body["merge"] == [ {"context": "items", "slot": "items_list", "value": {"id": 42, "name": "renamed"}} ] # invalidate stays empty when only merge is declared assert body["invalidate"] == []