""" Capability probes — the adversarial half of the conformance gate. A probe answers one question about one (capability, adapter) pair: *is this capability actually wired into this adapter?* It answers by inspecting the backend's own source for the concrete artifact the capability requires — a route registration, an exported renderer, a management command, a call into the shared core. It never asks the adapter to self-report (a self-report is the thing that drifts), and it never reads a parity table (that would be circular). The single invariant: **a probe goes green only when the wiring exists.** You cannot flip a cell by editing prose, a docstring, or a README — only by adding the route / module / binding the probe looks for. Two facts learned by building this and watching it lie, now load-bearing: 1. Comments are the false-positive engine. A doc-comment that says "the JS side re-wraps it into a Promise.reject" made an auth-enforcement probe pass on the word `reject`. So source is comment-stripped per language before matching, and patterns target code identifiers (`MizanError::Unauthorized`, a route macro, an exported function name), never prose-able words. 2. AFI-common logic lives in the shared core; the adapter rides it. FastAPI's JWT/MWT support *is* `mizan_core.auth.authenticate` — there is no `jwt` literal in the FastAPI adapter, because the adapter delegates. Axum's KDL IR lives in `cores/mizan-rust`, not the axum crate. So capabilities that are genuinely core-provided (`jwt`, `mwt`, `ir_export`) probe the BACKEND scope (adapter + its language core); capabilities the adapter must wire to its own transport (the header on the response, the route, the manifest command, the WebSocket handler) probe the ADAPTER scope only. Depth. These are *source-surface* probes: they confirm the artifact is present, not that it behaves correctly under load. Runtime-behavior depth (mount, drive, assert the wire bytes) is owed per adapter and lands with each gap's closure — the runtime probe is written alongside the wiring it verifies. Surface-presence is the floor that makes de-scope impossible; runtime is the ceiling each closure raises, and the gap between them is named here, not hidden. `partial` (◑) is a real state: Axum declares `Transport::Websocket` in the IR but routes no handler; mizan-ts decodes a JWT but mints none. It renders ◑ — and the conformance suite still treats it as RED, because the suite asserts `state == "pass"`. ◑ is visible honesty, never a shippable-green hiding place. """ from __future__ import annotations import re from dataclasses import dataclass from functools import lru_cache from pathlib import Path from typing import Callable, Literal from manifest import Adapter REPO_ROOT = Path(__file__).resolve().parents[2] BACKENDS = REPO_ROOT / "backends" CORES = REPO_ROOT / "cores" # Each adapter's own source root. Probes scoped "adapter" read only here. ADAPTER_ROOTS: dict[str, Path] = { "django": BACKENDS / "mizan-django" / "src" / "mizan", "fastapi": BACKENDS / "mizan-fastapi" / "src" / "mizan_fastapi", "rust_axum": BACKENDS / "mizan-rust-axum" / "src", "tauri": BACKENDS / "mizan-tauri" / "src", "typescript": BACKENDS / "mizan-ts" / "src", } # The shared core each language's adapters ride. Probes scoped "backend" read # the adapter root PLUS these — that is where core-provided AFI-common logic # (token minting, KDL IR) actually lives. CORE_ROOTS: dict[str, list[Path]] = { "python": [CORES / "mizan-python" / "src" / "mizan_core"], "rust": [CORES / "mizan-rust" / "src", CORES / "mizan-rust-macros" / "src"], "typescript": [], # mizan-ts is self-contained; no shared core } _SOURCE_EXTS = {".py", ".rs", ".ts", ".tsx"} _SKIP_DIRS = {"node_modules", "target", "__pycache__", "dist", ".venv"} State = Literal["pass", "partial", "fail"] @dataclass(frozen=True) class ProbeResult: state: State detail: str # ─── Source collection + comment stripping ──────────────────────────────────── def _strip_comments(text: str, language: str) -> str: """Remove comments so prose can't satisfy a code-artifact pattern. Imperfect by design (a `#` inside a string may be clipped); the patterns are code identifiers that live outside strings, and the cost of a rare false strip is far below the cost of a comment-driven false `pass`. """ if language in ("rust", "typescript"): text = re.sub(r"/\*.*?\*/", "", text, flags=re.DOTALL) # block /* */ (incl. /** */) text = re.sub(r"(? str: chunks: list[str] = [] for root in roots: if not root.exists(): continue for path in sorted(root.rglob("*")): if not (path.is_file() and path.suffix in _SOURCE_EXTS): continue if any(part in _SKIP_DIRS for part in path.parts): continue chunks.append(_strip_comments(path.read_text(encoding="utf-8", errors="replace"), language)) return "\n".join(chunks) @lru_cache(maxsize=None) def _adapter_text(adapter_id: str, language: str) -> str: return _read_roots((ADAPTER_ROOTS[adapter_id],), language) @lru_cache(maxsize=None) def _backend_text(adapter_id: str, language: str) -> str: roots = (ADAPTER_ROOTS[adapter_id], *CORE_ROOTS.get(language, [])) return _read_roots(tuple(roots), language) def _adapter(a: Adapter) -> str: return _adapter_text(a.id, a.language) def _backend(a: Adapter) -> str: return _backend_text(a.id, a.language) def _has_path(adapter_id: str, *relparts: str) -> bool: return ADAPTER_ROOTS[adapter_id].joinpath(*relparts).exists() def _hit(text: str, pattern: str) -> bool: return re.search(pattern, text) is not None def _wired(found: bool, what: str) -> ProbeResult: return ProbeResult("pass", f"wired: {what}") if found else ProbeResult("fail", f"missing: {what}") # ─── Per-capability detection ───────────────────────────────────────────────── # # Patterns are grounded in artifacts read directly from each adapter: # django urls.py `path("call/", ...)`, channels/ ssr/ shapes/ forms/ dirs # fastapi router.py `@router.post("/call/")`, header insert, /session/ # axum handlers.rs `function_call`, `compute_invalidation`, no header/cache # tauri lib.rs `mizan_invoke`, Envelope::Call/Fetch, no auth/cache/ws # ts dispatch.ts `handleMutationCall`, checkAuth, token.ts decode-only def _probe_rpc_call(a: Adapter) -> ProbeResult: pat = { "python": r'path\("call/|/call/|function_call', "rust": r"function_call|mizan_invoke|Envelope::Call", "typescript": r"handleMutationCall", }[a.language] return _wired(_hit(_adapter(a), pat), "RPC call dispatch") def _probe_context_bundle(a: Adapter) -> ProbeResult: pat = { "python": r"context_fetch|/ctx/|ctx/<", "rust": r"context_fetch|handle_fetch|Envelope::Fetch", "typescript": r"handleContextFetch", }[a.language] return _wired(_hit(_adapter(a), pat), "named-context bundle fetch") def _probe_invalidate_body(a: Adapter) -> ProbeResult: pat = { "python": r'"invalidate"|invalidate=|\binvalidate\b', "rust": r"invalidate|InvalidationTarget", "typescript": r"\.invalidate\b|invalidate:", }[a.language] return _wired(_hit(_adapter(a), pat), "`invalidate` key in response body") def _probe_invalidate_header(a: Adapter) -> ProbeResult: return _wired(_hit(_adapter(a), r"X-Mizan-Invalidate"), "`X-Mizan-Invalidate` header emission") def _probe_invalidate_autoscope(a: Adapter) -> ProbeResult: # The adapter wires auto-scoping by routing through the shared resolver. pat = { "python": r"resolve_invalidation|dispatch_call|dispatch_context", "rust": r"compute_invalidation", "typescript": r"resolveInvalidation", }[a.language] return _wired(_hit(_adapter(a), pat), "three-tier invalidation auto-scoping (shared resolver)") def _probe_registration(a: Adapter) -> ProbeResult: pat = { "python": r"\bregister\b|get_function|registry", "rust": r"FUNCTIONS|lookup_function|linkme", "typescript": r"getFunction|\bregister\b|registry", }[a.language] return _wired(_hit(_adapter(a), pat), "function discovery / registration") def _probe_ir_export(a: Adapter) -> ProbeResult: # Core-provided: python build_ir/export cmd (adapter), rust kdl.rs/ir.rs (core). if a.language == "python": return _wired(_hit(_adapter(a), r"build_ir|export_mizan_ir"), "KDL IR export") if a.language == "rust": return _wired(_hit(_backend(a), r"to_kdl|emit_kdl|fn build_ir|kdl::"), "KDL IR export (rust core)") # TypeScript: emits the edge manifest (JSON), but no codegen KDL IR (note 8). if _hit(_adapter(a), r"to_kdl|emitKdl|buildIr|\bkdl\b"): return ProbeResult("pass", "wired: KDL IR emitter") return ProbeResult("fail", "missing: KDL IR emitter (manifest.ts emits edge JSON, not codegen IR)") def _probe_upload(a: Adapter) -> ProbeResult: pat = { "python": r"bind_uploads|UploadedFile|multipart", "rust": r"Multipart|multipart|bind_uploads", "typescript": r"FormData|multipart", }[a.language] return _wired(_hit(_adapter(a), pat), "multipart / Upload-type dispatch binding") def _probe_auth_enforcement(a: Adapter) -> ProbeResult: if a.language == "python": return _wired(_hit(_adapter(a), r"enforce_auth|authenticate\(|authguard"), "auth= enforcement") if a.language == "typescript": return _wired(_hit(_adapter(a), r"checkAuth|authDenial|AuthDenial"), "auth= enforcement") # Rust adapters (axum, tauri): dispatch must reject on auth=. if _hit(_adapter(a), r"Unauthorized|Forbidden|enforce_auth"): return ProbeResult("pass", "wired: auth enforcement in dispatch") if _hit(_backend(a), r"\bauth\b|is_private|\bprivate\b"): return ProbeResult("partial", "auth/private carried in FunctionSpec; dispatch does not reject") return ProbeResult("fail", "missing: auth= enforcement") def _probe_origin_cache(a: Adapter) -> ProbeResult: pat = { "python": r"CacheOrchestrator|derive_cache_key|cfg\.cache|cache_get|cache_put", "rust": r"derive_cache_key|CacheBackend|origin_cache", "typescript": r"cacheGet|cachePut|deriveCacheKey", }[a.language] return _wired(_hit(_backend(a), pat), "origin-side HMAC cache") def _probe_edge_manifest(a: Adapter) -> ProbeResult: pat = { "python": r"export_edge_manifest|generate_edge_manifest|edge_manifest", "rust": r"edge_manifest|generate_manifest", "typescript": r"generateManifest|EdgeManifest", }[a.language] return _wired(_hit(_adapter(a), pat), "edge manifest export") def _probe_psr(a: Adapter) -> ProbeResult: return _wired(_hit(_adapter(a), r"render_strategy|renderStrategy"), "PSR render_strategy") def _probe_session_init(a: Adapter) -> ProbeResult: pat = { "python": r"session/|session_init", "rust": r"session_init|/session/", "typescript": r"sessionInit|/session/", }[a.language] return _wired(_hit(_adapter(a), pat), "session / CSRF init endpoint") def _probe_websocket(a: Adapter) -> ProbeResult: if a.id == "django": return _wired(_has_path(a.id, "channels"), "Django Channels consumer") if a.id == "fastapi": return _wired(_hit(_adapter(a), r"WebSocket|@router\.websocket|websocket_route"), "FastAPI WebSocket route") if a.id == "rust_axum": if _hit(_adapter(a), r"WebSocketUpgrade|on_upgrade|ws_handler"): return ProbeResult("pass", "wired: Axum WebSocket handler") if _hit(_backend(a), r"Websocket|WebSocket"): return ProbeResult("partial", "Transport::Websocket declared in IR; no Axum handler routed") return ProbeResult("fail", "missing: WebSocket transport") if a.id == "tauri": return _wired(_hit(_adapter(a), r"subscription|Subscribe|emit_to|Channel<"), "IPC subscription channel") return _wired(_hit(_adapter(a), r"WebSocket|websocket"), "WebSocket transport") def _probe_ssr_bridge(a: Adapter) -> ProbeResult: if a.id == "django": return _wired(_has_path(a.id, "ssr", "bridge.py"), "Bun SSR subprocess bridge") return _wired(_hit(_adapter(a), r"SSRBridge|renderToString|ssr_bridge"), "SSR bridge (subprocess renderer)") def _probe_jwt(a: Adapter) -> ProbeResult: # Core-provided for Python (mizan_core.auth); TS has decode-only helpers. if a.language == "python": return _wired(_hit(_backend(a), r"\bjwt\b|JWT|def authenticate"), "JWT auth (rides mizan_core.auth)") if a.id == "typescript": if _hit(_adapter(a), r"signJwt|mintJwt|createJwt|encodeJwt"): return ProbeResult("pass", "wired: JWT mint + verify") if _hit(_adapter(a), r"decodeJwt|decodeJwtBearer"): return ProbeResult("partial", "decode-only helper; mints no tokens") return ProbeResult("fail", "missing: JWT auth") return _wired(_hit(_backend(a), r"\bjwt\b|JWT"), "JWT auth (access / refresh)") def _probe_mwt(a: Adapter) -> ProbeResult: if a.language == "python": return _wired(_hit(_backend(a), r"\bmwt\b|MWT|X-Mizan-Token"), "MWT (edge identity token)") if a.id == "typescript": if _hit(_adapter(a), r"signMwt|mintMwt|createMwt"): return ProbeResult("pass", "wired: MWT mint") if _hit(_adapter(a), r"decodeMwt|identityFromMwt"): return ProbeResult("partial", "decode-only helper; mints no tokens") return ProbeResult("fail", "missing: MWT") return _wired(_hit(_backend(a), r"\bmwt\b|MWT"), "MWT (edge identity token)") def _probe_shapes(a: Adapter) -> ProbeResult: if a.id == "django": return _wired(_has_path(a.id, "shapes"), "django-readers Shapes binding") pat = { "python": r"django_readers|QuerysetProjection|shapes\.", "rust": r"shapes::|QueryProjection", "typescript": r"QueryProjection|shapesBinding", }[a.language] return _wired(_hit(_adapter(a), pat), "typed query-projection binding") def _probe_forms(a: Adapter) -> ProbeResult: if a.id == "django": return _wired(_has_path(a.id, "forms"), "Django Forms binding (schema / validate / submit)") if a.language == "rust": if _hit(_adapter(a), r"form_submit|form_validate|FormRole::Submit"): return ProbeResult("pass", "wired: form validate/submit endpoint") if _hit(_backend(a), r"is_form|form_role|FormRole"): return ProbeResult("partial", "is_form/form_role carried; no validate/submit endpoint") return ProbeResult("fail", "missing: forms") pat = { "python": r"FormSchema|form_role|forms\.", "typescript": r"FormSchema|formRole|formSubmit", }[a.language] return _wired(_hit(_adapter(a), pat), "forms (schema / validate / submit)") PROBES: dict[str, Callable[[Adapter], ProbeResult]] = { "rpc_call": _probe_rpc_call, "context_bundle": _probe_context_bundle, "invalidate_body": _probe_invalidate_body, "invalidate_header": _probe_invalidate_header, "invalidate_autoscope": _probe_invalidate_autoscope, "registration": _probe_registration, "ir_export": _probe_ir_export, "upload": _probe_upload, "auth_enforcement": _probe_auth_enforcement, "origin_cache": _probe_origin_cache, "edge_manifest": _probe_edge_manifest, "psr": _probe_psr, "session_init": _probe_session_init, "websocket": _probe_websocket, "ssr_bridge": _probe_ssr_bridge, "jwt": _probe_jwt, "mwt": _probe_mwt, "shapes": _probe_shapes, "forms": _probe_forms, } def run_probe(capability_id: str, adapter: Adapter) -> ProbeResult: """Run the probe for one (capability, adapter) pair.""" probe = PROBES.get(capability_id) if probe is None: raise KeyError( f"No probe registered for capability '{capability_id}'. Every capability " f"in manifest.CAPABILITIES owes a probe — add one to PROBES." ) return probe(adapter)