The conformance board (tests/afi/test_capability_parity.py) is now fully green: 90 capability cells + 4 meta-locks + 3 codegen byte-parity = 97 passed. The gaps the prose table used to launder as "Django-only" / "out of scope" are wired, against the pinned-spec model (single-authored spec, byte-identical conformance across languages) — never per-language reimplementation. FastAPI — edge_manifest + PSR (logic single-sourced in mizan_core.manifest), WebSocket RPC (/ws/ through the shared dispatch), SSR (the framework-agnostic SSRBridge relocated to mizan_core.ssr; Django rides it from there), Shapes (SQLAlchemy projection, same declaration surface as django-readers), Forms (Pydantic schema/validate/submit). Rust (Axum + Tauri + cores/mizan-rust) — X-Mizan-Invalidate header, auth= enforcement, origin HMAC cache, edge manifest + PSR, WebSocket handler / IPC subscription channel, multipart upload, SSR bridge, Shapes, Forms; JWT/MWT mint+verify and cache-key derivation byte-pinned to the Python reference (cache_keys_pin, token_pin, invalidate_header_pin). TypeScript — a KDL IR emitter byte-identical to the Python build_ir (so a TS backend can feed the codegen — the largest gap), multipart upload, session-init, WebSocket transport, SSR bridge, JWT/MWT mint (pinned to Python), Shapes, Forms. Verified in the merged tree: core 25, fastapi 74, django 353/21-skip, mizan-rust (incl. cross-language pins) green, axum 10, tauri 8, mizan-ts 103/2-skip. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
389 lines
17 KiB
Python
389 lines
17 KiB
Python
"""
|
|
Capability probes — the adversarial half of the conformance gate.
|
|
|
|
A probe answers one question about one (capability, adapter) pair: *is this
|
|
capability actually wired into this adapter?* It answers by inspecting the
|
|
backend's own source for the concrete artifact the capability requires — a
|
|
route registration, an exported renderer, a management command, a call into the
|
|
shared core. It never asks the adapter to self-report (a self-report is the
|
|
thing that drifts), and it never reads a parity table (that would be circular).
|
|
|
|
The single invariant: **a probe goes green only when the wiring exists.** You
|
|
cannot flip a cell by editing prose, a docstring, or a README — only by adding
|
|
the route / module / binding the probe looks for.
|
|
|
|
Two facts learned by building this and watching it lie, now load-bearing:
|
|
|
|
1. Comments are the false-positive engine. A doc-comment that says "the JS side
|
|
re-wraps it into a Promise.reject" made an auth-enforcement probe pass on the
|
|
word `reject`. So source is comment-stripped per language before matching,
|
|
and patterns target code identifiers (`MizanError::Unauthorized`, a route
|
|
macro, an exported function name), never prose-able words.
|
|
|
|
2. AFI-common logic lives in the shared core; the adapter rides it. FastAPI's
|
|
JWT/MWT support *is* `mizan_core.auth.authenticate` — there is no `jwt`
|
|
literal in the FastAPI adapter, because the adapter delegates. Axum's KDL IR
|
|
lives in `cores/mizan-rust`, not the axum crate. So capabilities that are
|
|
genuinely core-provided (`jwt`, `mwt`, `ir_export`) probe the BACKEND scope
|
|
(adapter + its language core); capabilities the adapter must wire to its own
|
|
transport (the header on the response, the route, the manifest command, the
|
|
WebSocket handler) probe the ADAPTER scope only.
|
|
|
|
Depth. These are *source-surface* probes: they confirm the artifact is present,
|
|
not that it behaves correctly under load. Runtime-behavior depth (mount, drive,
|
|
assert the wire bytes) is owed per adapter and lands with each gap's closure —
|
|
the runtime probe is written alongside the wiring it verifies. Surface-presence
|
|
is the floor that makes de-scope impossible; runtime is the ceiling each closure
|
|
raises, and the gap between them is named here, not hidden.
|
|
|
|
`partial` (◑) is a real state: Axum declares `Transport::Websocket` in the IR
|
|
but routes no handler; mizan-ts decodes a JWT but mints none. It renders ◑ — and
|
|
the conformance suite still treats it as RED, because the suite asserts
|
|
`state == "pass"`. ◑ is visible honesty, never a shippable-green hiding place.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
from dataclasses import dataclass
|
|
from functools import lru_cache
|
|
from pathlib import Path
|
|
from typing import Callable, Literal
|
|
|
|
from manifest import Adapter
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[2]
|
|
BACKENDS = REPO_ROOT / "backends"
|
|
CORES = REPO_ROOT / "cores"
|
|
|
|
# Each adapter's own source root. Probes scoped "adapter" read only here.
|
|
ADAPTER_ROOTS: dict[str, Path] = {
|
|
"django": BACKENDS / "mizan-django" / "src" / "mizan",
|
|
"fastapi": BACKENDS / "mizan-fastapi" / "src" / "mizan_fastapi",
|
|
"rust_axum": BACKENDS / "mizan-rust-axum" / "src",
|
|
"tauri": BACKENDS / "mizan-tauri" / "src",
|
|
"typescript": BACKENDS / "mizan-ts" / "src",
|
|
}
|
|
|
|
# The shared core each language's adapters ride. Probes scoped "backend" read
|
|
# the adapter root PLUS these — that is where core-provided AFI-common logic
|
|
# (token minting, KDL IR) actually lives.
|
|
CORE_ROOTS: dict[str, list[Path]] = {
|
|
"python": [CORES / "mizan-python" / "src" / "mizan_core"],
|
|
"rust": [CORES / "mizan-rust" / "src", CORES / "mizan-rust-macros" / "src"],
|
|
"typescript": [], # mizan-ts is self-contained; no shared core
|
|
}
|
|
|
|
_SOURCE_EXTS = {".py", ".rs", ".ts", ".tsx"}
|
|
_SKIP_DIRS = {"node_modules", "target", "__pycache__", "dist", ".venv"}
|
|
|
|
|
|
State = Literal["pass", "partial", "fail"]
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ProbeResult:
|
|
state: State
|
|
detail: str
|
|
|
|
|
|
# ─── Source collection + comment stripping ────────────────────────────────────
|
|
|
|
|
|
def _strip_comments(text: str, language: str) -> str:
|
|
"""Remove comments so prose can't satisfy a code-artifact pattern.
|
|
|
|
Imperfect by design (a `#` inside a string may be clipped); the patterns are
|
|
code identifiers that live outside strings, and the cost of a rare false
|
|
strip is far below the cost of a comment-driven false `pass`.
|
|
"""
|
|
if language in ("rust", "typescript"):
|
|
text = re.sub(r"/\*.*?\*/", "", text, flags=re.DOTALL) # block /* */ (incl. /** */)
|
|
text = re.sub(r"(?<!:)//[^\n]*", "", text) # line // (keep URL `://`)
|
|
elif language == "python":
|
|
text = re.sub(r'""".*?"""', "", text, flags=re.DOTALL) # triple-quoted
|
|
text = re.sub(r"'''.*?'''", "", text, flags=re.DOTALL)
|
|
text = re.sub(r"#[^\n]*", "", text) # line #
|
|
return text
|
|
|
|
|
|
def _read_roots(roots: tuple[Path, ...], language: str) -> str:
|
|
chunks: list[str] = []
|
|
for root in roots:
|
|
if not root.exists():
|
|
continue
|
|
for path in sorted(root.rglob("*")):
|
|
if not (path.is_file() and path.suffix in _SOURCE_EXTS):
|
|
continue
|
|
if any(part in _SKIP_DIRS for part in path.parts):
|
|
continue
|
|
chunks.append(_strip_comments(path.read_text(encoding="utf-8", errors="replace"), language))
|
|
return "\n".join(chunks)
|
|
|
|
|
|
@lru_cache(maxsize=None)
|
|
def _adapter_text(adapter_id: str, language: str) -> str:
|
|
return _read_roots((ADAPTER_ROOTS[adapter_id],), language)
|
|
|
|
|
|
@lru_cache(maxsize=None)
|
|
def _backend_text(adapter_id: str, language: str) -> str:
|
|
roots = (ADAPTER_ROOTS[adapter_id], *CORE_ROOTS.get(language, []))
|
|
return _read_roots(tuple(roots), language)
|
|
|
|
|
|
def _adapter(a: Adapter) -> str:
|
|
return _adapter_text(a.id, a.language)
|
|
|
|
|
|
def _backend(a: Adapter) -> str:
|
|
return _backend_text(a.id, a.language)
|
|
|
|
|
|
def _has_path(adapter_id: str, *relparts: str) -> bool:
|
|
return ADAPTER_ROOTS[adapter_id].joinpath(*relparts).exists()
|
|
|
|
|
|
def _hit(text: str, pattern: str) -> bool:
|
|
return re.search(pattern, text) is not None
|
|
|
|
|
|
def _wired(found: bool, what: str) -> ProbeResult:
|
|
return ProbeResult("pass", f"wired: {what}") if found else ProbeResult("fail", f"missing: {what}")
|
|
|
|
|
|
# ─── Per-capability detection ─────────────────────────────────────────────────
|
|
#
|
|
# Patterns are grounded in artifacts read directly from each adapter:
|
|
# django urls.py `path("call/", ...)`, channels/ ssr/ shapes/ forms/ dirs
|
|
# fastapi router.py `@router.post("/call/")`, header insert, /session/
|
|
# axum handlers.rs `function_call`, `compute_invalidation`, no header/cache
|
|
# tauri lib.rs `mizan_invoke`, Envelope::Call/Fetch, no auth/cache/ws
|
|
# ts dispatch.ts `handleMutationCall`, checkAuth, token.ts decode-only
|
|
|
|
|
|
def _probe_rpc_call(a: Adapter) -> ProbeResult:
|
|
pat = {
|
|
"python": r'path\("call/|/call/|function_call',
|
|
"rust": r"function_call|mizan_invoke|Envelope::Call",
|
|
"typescript": r"handleMutationCall",
|
|
}[a.language]
|
|
return _wired(_hit(_adapter(a), pat), "RPC call dispatch")
|
|
|
|
|
|
def _probe_context_bundle(a: Adapter) -> ProbeResult:
|
|
pat = {
|
|
"python": r"context_fetch|/ctx/|ctx/<",
|
|
"rust": r"context_fetch|handle_fetch|Envelope::Fetch",
|
|
"typescript": r"handleContextFetch",
|
|
}[a.language]
|
|
return _wired(_hit(_adapter(a), pat), "named-context bundle fetch")
|
|
|
|
|
|
def _probe_invalidate_body(a: Adapter) -> ProbeResult:
|
|
pat = {
|
|
"python": r'"invalidate"|invalidate=|\binvalidate\b',
|
|
"rust": r"invalidate|InvalidationTarget",
|
|
"typescript": r"\.invalidate\b|invalidate:",
|
|
}[a.language]
|
|
return _wired(_hit(_adapter(a), pat), "`invalidate` key in response body")
|
|
|
|
|
|
def _probe_invalidate_header(a: Adapter) -> ProbeResult:
|
|
return _wired(_hit(_adapter(a), r"X-Mizan-Invalidate"), "`X-Mizan-Invalidate` header emission")
|
|
|
|
|
|
def _probe_invalidate_autoscope(a: Adapter) -> ProbeResult:
|
|
# The adapter wires auto-scoping by routing through the shared resolver.
|
|
pat = {
|
|
"python": r"resolve_invalidation|dispatch_call|dispatch_context",
|
|
"rust": r"compute_invalidation",
|
|
"typescript": r"resolveInvalidation",
|
|
}[a.language]
|
|
return _wired(_hit(_adapter(a), pat), "three-tier invalidation auto-scoping (shared resolver)")
|
|
|
|
|
|
def _probe_registration(a: Adapter) -> ProbeResult:
|
|
pat = {
|
|
"python": r"\bregister\b|get_function|registry",
|
|
"rust": r"FUNCTIONS|lookup_function|linkme",
|
|
"typescript": r"getFunction|\bregister\b|registry",
|
|
}[a.language]
|
|
return _wired(_hit(_adapter(a), pat), "function discovery / registration")
|
|
|
|
|
|
def _probe_ir_export(a: Adapter) -> ProbeResult:
|
|
# Core-provided: python build_ir/export cmd (adapter), rust kdl.rs/ir.rs (core).
|
|
if a.language == "python":
|
|
return _wired(_hit(_adapter(a), r"build_ir|export_mizan_ir"), "KDL IR export")
|
|
if a.language == "rust":
|
|
return _wired(_hit(_backend(a), r"to_kdl|emit_kdl|fn build_ir|kdl::"), "KDL IR export (rust core)")
|
|
# TypeScript: emits the edge manifest (JSON), but no codegen KDL IR (note 8).
|
|
if _hit(_adapter(a), r"to_kdl|emitKdl|buildIr|\bkdl\b"):
|
|
return ProbeResult("pass", "wired: KDL IR emitter")
|
|
return ProbeResult("fail", "missing: KDL IR emitter (manifest.ts emits edge JSON, not codegen IR)")
|
|
|
|
|
|
def _probe_upload(a: Adapter) -> ProbeResult:
|
|
pat = {
|
|
"python": r"bind_uploads|UploadedFile|multipart",
|
|
"rust": r"Multipart|multipart|bind_uploads",
|
|
"typescript": r"FormData|multipart",
|
|
}[a.language]
|
|
return _wired(_hit(_adapter(a), pat), "multipart / Upload-type dispatch binding")
|
|
|
|
|
|
def _probe_auth_enforcement(a: Adapter) -> ProbeResult:
|
|
if a.language == "python":
|
|
return _wired(_hit(_adapter(a), r"enforce_auth|authenticate\(|authguard"), "auth= enforcement")
|
|
if a.language == "typescript":
|
|
return _wired(_hit(_adapter(a), r"checkAuth|authDenial|AuthDenial"), "auth= enforcement")
|
|
# Rust adapters (axum, tauri): dispatch must reject on auth=.
|
|
if _hit(_adapter(a), r"Unauthorized|Forbidden|enforce_auth"):
|
|
return ProbeResult("pass", "wired: auth enforcement in dispatch")
|
|
if _hit(_backend(a), r"\bauth\b|is_private|\bprivate\b"):
|
|
return ProbeResult("partial", "auth/private carried in FunctionSpec; dispatch does not reject")
|
|
return ProbeResult("fail", "missing: auth= enforcement")
|
|
|
|
|
|
def _probe_origin_cache(a: Adapter) -> ProbeResult:
|
|
pat = {
|
|
"python": r"CacheOrchestrator|derive_cache_key|cfg\.cache|cache_get|cache_put",
|
|
"rust": r"derive_cache_key|CacheBackend|origin_cache",
|
|
"typescript": r"cacheGet|cachePut|deriveCacheKey",
|
|
}[a.language]
|
|
return _wired(_hit(_backend(a), pat), "origin-side HMAC cache")
|
|
|
|
|
|
def _probe_edge_manifest(a: Adapter) -> ProbeResult:
|
|
pat = {
|
|
"python": r"export_edge_manifest|generate_edge_manifest|edge_manifest",
|
|
"rust": r"edge_manifest|generate_manifest",
|
|
"typescript": r"generateManifest|EdgeManifest",
|
|
}[a.language]
|
|
return _wired(_hit(_adapter(a), pat), "edge manifest export")
|
|
|
|
|
|
def _probe_psr(a: Adapter) -> ProbeResult:
|
|
return _wired(_hit(_adapter(a), r"render_strategy|renderStrategy"), "PSR render_strategy")
|
|
|
|
|
|
def _probe_session_init(a: Adapter) -> ProbeResult:
|
|
pat = {
|
|
"python": r"session/|session_init",
|
|
"rust": r"session_init|/session/",
|
|
"typescript": r"sessionInit|/session/",
|
|
}[a.language]
|
|
return _wired(_hit(_adapter(a), pat), "session / CSRF init endpoint")
|
|
|
|
|
|
def _probe_websocket(a: Adapter) -> ProbeResult:
|
|
if a.id == "django":
|
|
return _wired(_has_path(a.id, "channels"), "Django Channels consumer")
|
|
if a.id == "fastapi":
|
|
return _wired(_hit(_adapter(a), r"WebSocket|@router\.websocket|websocket_route"), "FastAPI WebSocket route")
|
|
if a.id == "rust_axum":
|
|
if _hit(_adapter(a), r"WebSocketUpgrade|on_upgrade|ws_handler"):
|
|
return ProbeResult("pass", "wired: Axum WebSocket handler")
|
|
if _hit(_backend(a), r"Websocket|WebSocket"):
|
|
return ProbeResult("partial", "Transport::Websocket declared in IR; no Axum handler routed")
|
|
return ProbeResult("fail", "missing: WebSocket transport")
|
|
if a.id == "tauri":
|
|
return _wired(_hit(_adapter(a), r"subscription|Subscribe|emit_to|Channel<"), "IPC subscription channel")
|
|
return _wired(_hit(_adapter(a), r"WebSocket|websocket"), "WebSocket transport")
|
|
|
|
|
|
def _probe_ssr_bridge(a: Adapter) -> ProbeResult:
|
|
# Uniform, location-independent: the SSR subprocess bridge is single-sourced
|
|
# (Python adapters ride `mizan_core.ssr.SSRBridge`); a capability "pass" means
|
|
# the ADAPTER invokes it — references the bridge / its renderer — over its own
|
|
# surface, not that a `bridge.py` lives at a fixed path. So the check is the
|
|
# same for every adapter: an invocation of the SSR renderer in adapter source.
|
|
return _wired(_hit(_adapter(a), r"SSRBridge|renderToString|ssr_bridge"), "SSR bridge (subprocess renderer)")
|
|
|
|
|
|
def _probe_jwt(a: Adapter) -> ProbeResult:
|
|
# Core-provided for Python (mizan_core.auth); TS has decode-only helpers.
|
|
if a.language == "python":
|
|
return _wired(_hit(_backend(a), r"\bjwt\b|JWT|def authenticate"), "JWT auth (rides mizan_core.auth)")
|
|
if a.id == "typescript":
|
|
if _hit(_adapter(a), r"signJwt|mintJwt|createJwt|encodeJwt"):
|
|
return ProbeResult("pass", "wired: JWT mint + verify")
|
|
if _hit(_adapter(a), r"decodeJwt|decodeJwtBearer"):
|
|
return ProbeResult("partial", "decode-only helper; mints no tokens")
|
|
return ProbeResult("fail", "missing: JWT auth")
|
|
return _wired(_hit(_backend(a), r"\bjwt\b|JWT"), "JWT auth (access / refresh)")
|
|
|
|
|
|
def _probe_mwt(a: Adapter) -> ProbeResult:
|
|
if a.language == "python":
|
|
return _wired(_hit(_backend(a), r"\bmwt\b|MWT|X-Mizan-Token"), "MWT (edge identity token)")
|
|
if a.id == "typescript":
|
|
if _hit(_adapter(a), r"signMwt|mintMwt|createMwt"):
|
|
return ProbeResult("pass", "wired: MWT mint")
|
|
if _hit(_adapter(a), r"decodeMwt|identityFromMwt"):
|
|
return ProbeResult("partial", "decode-only helper; mints no tokens")
|
|
return ProbeResult("fail", "missing: MWT")
|
|
return _wired(_hit(_backend(a), r"\bmwt\b|MWT"), "MWT (edge identity token)")
|
|
|
|
|
|
def _probe_shapes(a: Adapter) -> ProbeResult:
|
|
if a.id == "django":
|
|
return _wired(_has_path(a.id, "shapes"), "django-readers Shapes binding")
|
|
pat = {
|
|
"python": r"django_readers|QuerysetProjection|shapes\.",
|
|
"rust": r"shapes::|QueryProjection",
|
|
"typescript": r"QueryProjection|shapesBinding",
|
|
}[a.language]
|
|
return _wired(_hit(_adapter(a), pat), "typed query-projection binding")
|
|
|
|
|
|
def _probe_forms(a: Adapter) -> ProbeResult:
|
|
if a.id == "django":
|
|
return _wired(_has_path(a.id, "forms"), "Django Forms binding (schema / validate / submit)")
|
|
if a.language == "rust":
|
|
if _hit(_adapter(a), r"form_submit|form_validate|FormRole::Submit"):
|
|
return ProbeResult("pass", "wired: form validate/submit endpoint")
|
|
if _hit(_backend(a), r"is_form|form_role|FormRole"):
|
|
return ProbeResult("partial", "is_form/form_role carried; no validate/submit endpoint")
|
|
return ProbeResult("fail", "missing: forms")
|
|
pat = {
|
|
"python": r"FormSchema|form_role|forms\.",
|
|
"typescript": r"FormSchema|formRole|formSubmit",
|
|
}[a.language]
|
|
return _wired(_hit(_adapter(a), pat), "forms (schema / validate / submit)")
|
|
|
|
|
|
PROBES: dict[str, Callable[[Adapter], ProbeResult]] = {
|
|
"rpc_call": _probe_rpc_call,
|
|
"context_bundle": _probe_context_bundle,
|
|
"invalidate_body": _probe_invalidate_body,
|
|
"invalidate_header": _probe_invalidate_header,
|
|
"invalidate_autoscope": _probe_invalidate_autoscope,
|
|
"registration": _probe_registration,
|
|
"ir_export": _probe_ir_export,
|
|
"upload": _probe_upload,
|
|
"auth_enforcement": _probe_auth_enforcement,
|
|
"origin_cache": _probe_origin_cache,
|
|
"edge_manifest": _probe_edge_manifest,
|
|
"psr": _probe_psr,
|
|
"session_init": _probe_session_init,
|
|
"websocket": _probe_websocket,
|
|
"ssr_bridge": _probe_ssr_bridge,
|
|
"jwt": _probe_jwt,
|
|
"mwt": _probe_mwt,
|
|
"shapes": _probe_shapes,
|
|
"forms": _probe_forms,
|
|
}
|
|
|
|
|
|
def run_probe(capability_id: str, adapter: Adapter) -> ProbeResult:
|
|
"""Run the probe for one (capability, adapter) pair."""
|
|
probe = PROBES.get(capability_id)
|
|
if probe is None:
|
|
raise KeyError(
|
|
f"No probe registered for capability '{capability_id}'. Every capability "
|
|
f"in manifest.CAPABILITIES owes a probe — add one to PROBES."
|
|
)
|
|
return probe(adapter)
|