Mizan codegen substrate: Rust kernel + Rust codegen binary, JS generator deleted

The Mizan codegen substrate moves off JavaScript template-literal emission
onto a compiled Rust binary that consumes the same OpenAPI + x-mizan-* IR
the JS substrate consumed. Three structural wins fall out of one move:

1. Moat closes. The codegen logic (how `affects` becomes auto-invalidation,
   how named contexts collapse onto bundled fetches, how the registry-to-
   Provider mapping is shaped) ships compiled instead of as source bytes
   in every consumer's node_modules.

2. Pattern F (lines.push append-walls) becomes structurally unauthorable.
   The emit substrate is askama templates in templates/<target>/*.j2 —
   actual target-language files with {{ ... }} substitution markers,
   syntax-highlighted natively, type-checked against the render context
   structs at compile time. The Rust emit modules build typed render
   contexts and call .render(); no string-builder surface exists.

3. OpenAPI `default`-bearing fields now emit as non-optional in TS / Python
   / Rust — the server always populates them, so consumer code reads them
   without nullable checks. Surfaced by Blazr's typecheck on regeneration.

Layout:
  frontends/mizan-rust/        — Rust port of @mizan/base; #[cfg(feature="pyo3")]
                                 exposes PyMizanClient for the Python target.
  protocol/mizan-codegen/      — codegen binary source + askama templates.
  protocol/mizan-generate/     — npm-package shim. bin/launcher.mjs dispatches
                                 to the platform-appropriate prebuilt binary.
                                 Old generator/ JS tree deleted.
  tests/rust/                  — wire-parity drivers. drive_kernel exercises
                                 raw client.call() / fetch_context(); drive_emitted
                                 exercises the typed crate the codegen emits.
  tests/afi/afi_codegen_app.py — codegen entrypoint module (imports + registers).
  backends/mizan-fastapi/.../schema.py — adds outputNullable so the Rust
                                 codegen can wrap T | None responses in Option<T>.

Verification:
  - 20 mizan-codegen tests green (IR deserialization, byte-equivalent
    parity vs JS baseline for stage1/rust/python/react/vue/svelte,
    structural test for channels).
  - tests/rust/run_wire_parity.py — 12/12 probes green via the Rust binary
    driving the FastAPI fixture end-to-end.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-17 18:26:32 -04:00
parent c15c6f3e14
commit 43bcf3f26f
114 changed files with 11090 additions and 2342 deletions

View File

@@ -0,0 +1,195 @@
//! `MizanClient` — the kernel entry point.
//!
//! Mirrors the `configure(opts)` + module-level state in
//! `frontends/mizan-base/src/index.ts`, but as an owned struct because
//! Rust lacks module-level mutable state. Consumers hold an
//! `Arc<MizanClient>` and pass it everywhere the TS code would have
//! used the module-level `config`.
//!
//! Public surface:
//! - `MizanClient::new(config)` — build with reqwest cookie jar.
//! - `client.fetch_context(name, params)` — async, returns parsed JSON bundle.
//! - `client.call(fn_name, args)` — async, applies merge + invalidation
//! from the response then returns `result`.
//! - `client.register_context(name, params, fetch_fn)` — register an
//! instance; returns a `ContextHandle`.
//! - `client.invalidate(name)` / `client.invalidate_scoped(name, params)`
//! — schedule invalidation via the kernel queue.
//! - `client.merge(context, params, slot, value)` — splice a value into
//! a context bundle slot.
use std::sync::Arc;
use std::time::Duration;
use reqwest::cookie::CookieStore;
use reqwest::header::{HeaderMap, HeaderName, HeaderValue, ACCEPT};
use reqwest::Url;
use serde_json::Value;
use tokio::sync::OnceCell;
use crate::context::{ContextHandle, ContextRegistry, FetchFn};
use crate::error::MizanError;
use crate::invalidation::InvalidationQueue;
use crate::transport;
pub struct MizanConfig {
pub base_url: String,
pub session: bool,
pub csrf_cookie_name: String,
pub csrf_header_name: String,
pub extra_headers: Vec<(String, String)>,
}
impl Default for MizanConfig {
fn default() -> Self {
Self {
base_url: "/api/mizan".to_string(),
session: true,
csrf_cookie_name: "csrftoken".to_string(),
csrf_header_name: "X-CSRFToken".to_string(),
extra_headers: Vec::new(),
}
}
}
pub struct MizanClient {
config: Arc<MizanConfig>,
http: reqwest::Client,
cookie_jar: Arc<reqwest::cookie::Jar>,
registry: Arc<ContextRegistry>,
queue: Arc<InvalidationQueue>,
session_ready: OnceCell<()>,
}
impl MizanClient {
pub fn new(config: MizanConfig) -> Arc<Self> {
let cookie_jar = Arc::new(reqwest::cookie::Jar::default());
let http = reqwest::Client::builder()
.cookie_provider(Arc::clone(&cookie_jar))
.build()
.expect("reqwest client construction");
let registry = Arc::new(ContextRegistry::new());
let queue = InvalidationQueue::new(Arc::clone(&registry));
Arc::new(Self {
config: Arc::new(config),
http,
cookie_jar,
registry,
queue,
session_ready: OnceCell::new(),
})
}
pub fn config(&self) -> &MizanConfig {
&self.config
}
pub fn http(&self) -> &reqwest::Client {
&self.http
}
pub fn context_registry(&self) -> &Arc<ContextRegistry> {
&self.registry
}
pub fn invalidation_queue(&self) -> &Arc<InvalidationQueue> {
&self.queue
}
/// Hit `/session/` once on first call to bootstrap the CSRF cookie.
/// No-op when `config.session == false`. Three attempts with 100ms
/// × attempt backoff.
pub async fn ensure_session_ready(&self) -> Result<(), MizanError> {
if !self.config.session {
return Ok(());
}
self.session_ready
.get_or_try_init(|| async {
if self.read_csrf_cookie().is_some() {
return Ok(());
}
let url = Url::parse(&format!("{}/session/", self.config.base_url.trim_end_matches('/')))
.map_err(|e| MizanError::transport(format!("invalid base_url: {e}")))?;
for attempt in 0..3 {
let res = self.http.get(url.clone()).send().await;
if res.is_ok() && self.read_csrf_cookie().is_some() {
return Ok(());
}
if attempt < 2 {
tokio::time::sleep(Duration::from_millis(100 * (attempt as u64 + 1))).await;
}
}
// Mirror TS: failing to bootstrap is non-fatal — subsequent
// calls proceed without CSRF and may still succeed (e.g.,
// FastAPI configs that don't require it).
Ok(())
})
.await
.copied()
}
pub(crate) async fn resolve_headers(&self) -> HeaderMap {
let mut headers = HeaderMap::new();
for (name, value) in &self.config.extra_headers {
if let (Ok(n), Ok(v)) = (HeaderName::try_from(name.as_str()), HeaderValue::try_from(value.as_str())) {
headers.insert(n, v);
}
}
if let Some(token) = self.read_csrf_cookie() {
if let (Ok(n), Ok(v)) = (
HeaderName::try_from(self.config.csrf_header_name.as_str()),
HeaderValue::try_from(token.as_str()),
) {
headers.insert(n, v);
}
}
headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
headers
}
fn read_csrf_cookie(&self) -> Option<String> {
let url = Url::parse(&self.config.base_url).ok()?;
let header = self.cookie_jar.cookies(&url)?;
let raw = header.to_str().ok()?;
let needle = format!("{}=", self.config.csrf_cookie_name);
raw.split(';')
.map(|p| p.trim())
.find_map(|p| p.strip_prefix(&needle))
.map(|v| v.trim_matches('"').to_string())
}
// ── High-level API ─────────────────────────────────────────────────
pub async fn fetch_context(&self, context: &str, params: &Value) -> Result<Value, MizanError> {
transport::mizan_fetch(self, context, params).await
}
pub async fn call(&self, fn_name: &str, args: Value) -> Result<Value, MizanError> {
transport::mizan_call(self, fn_name, args).await
}
pub async fn register_context(
self: &Arc<Self>,
name: impl Into<String>,
params: Value,
fetch_fn: FetchFn,
) -> ContextHandle {
self.registry.register(name, params, fetch_fn, None).await
}
pub async fn invalidate(self: &Arc<Self>, name: impl Into<String>) {
self.queue.invalidate(name).await;
}
pub async fn invalidate_scoped(self: &Arc<Self>, name: impl Into<String>, params: Value) {
self.queue.invalidate_scoped(name, params).await;
}
pub async fn merge(&self, context: &str, params: Option<&Value>, slot: &str, value: &Value) {
self.registry.merge(context, params, slot, value).await;
}
}

View File

@@ -0,0 +1,365 @@
//! Context registry.
//!
//! Mirrors the `contexts: Map<string, Map<ParamKey, ContextEntry>>`
//! shape in `frontends/mizan-base/src/index.ts`. Each entry holds the
//! latest `ContextState`, a `tokio::sync::watch::Sender` for notifying
//! subscribers, and a fetch function the registry invokes on demand.
//!
//! Subscribers receive a `ContextHandle` whose `rx: watch::Receiver`
//! they read from in their own loop. Watch channels overwrite the
//! previous value if the receiver hasn't consumed it yet — the render
//! loop sees only the latest state on each tick, never an intermediate
//! one. The TS kernel achieves the same effect via React's external
//! store re-render coalescing.
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use serde_json::Value;
use tokio::sync::{Mutex, RwLock, mpsc, watch};
use tokio_util::sync::CancellationToken;
use crate::error::MizanError;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ContextStatus {
Idle,
Loading,
Success,
Error,
}
#[derive(Debug, Clone)]
pub struct ContextState<T> {
pub data: Option<T>,
pub status: ContextStatus,
pub error: Option<Arc<MizanError>>,
}
pub type ContextStateRaw = ContextState<Value>;
impl ContextStateRaw {
pub fn idle() -> Self {
Self { data: None, status: ContextStatus::Idle, error: None }
}
}
pub type FetchFn = Arc<
dyn Fn() -> Pin<Box<dyn Future<Output = Result<Value, MizanError>> + Send + 'static>>
+ Send
+ Sync,
>;
struct ContextEntry {
#[allow(dead_code)]
params: Value,
tx: watch::Sender<ContextStateRaw>,
fetch_fn: FetchFn,
refetch_tx: mpsc::UnboundedSender<()>,
/// Cancel signal for the entry's spawned refetch loop. Set when the
/// last handle on the entry unregisters.
cancel: CancellationToken,
}
pub struct ContextRegistry {
/// Outer key: context name. Inner key: `stable_key(params)`.
entries: RwLock<HashMap<String, HashMap<String, Arc<Mutex<ContextEntry>>>>>,
}
impl Default for ContextRegistry {
fn default() -> Self {
Self::new()
}
}
impl ContextRegistry {
pub fn new() -> Self {
Self { entries: RwLock::new(HashMap::new()) }
}
/// Register an instance of `(context_name, params)`. Idempotent —
/// re-registering the same key returns a handle on the existing
/// entry (the fetch_fn closure is replaced so the latest binding
/// wins).
pub async fn register(
self: &Arc<Self>,
name: impl Into<String>,
params: Value,
fetch_fn: FetchFn,
initial_data: Option<Value>,
) -> ContextHandle {
let name = name.into();
let key = stable_key(&params);
let mut outer = self.entries.write().await;
let inner = outer.entry(name.clone()).or_default();
if let Some(existing) = inner.get(&key).cloned() {
// Update the fetch closure so the latest registration's
// closure wins (matches the TS Strict-Mode behavior).
{
let mut entry = existing.lock().await;
entry.fetch_fn = fetch_fn;
}
let entry = existing.lock().await;
return ContextHandle {
rx: entry.tx.subscribe(),
refetch_tx: entry.refetch_tx.clone(),
cancel: entry.cancel.clone(),
registry: Arc::clone(self),
name,
key,
};
}
let initial = match initial_data {
Some(data) => ContextState { data: Some(data), status: ContextStatus::Success, error: None },
None => ContextStateRaw::idle(),
};
let (tx, _rx) = watch::channel(initial);
let (refetch_tx, mut refetch_rx) = mpsc::unbounded_channel::<()>();
let cancel = CancellationToken::new();
let entry = Arc::new(Mutex::new(ContextEntry {
params: params.clone(),
tx: tx.clone(),
fetch_fn: fetch_fn.clone(),
refetch_tx: refetch_tx.clone(),
cancel: cancel.clone(),
}));
inner.insert(key.clone(), Arc::clone(&entry));
drop(outer);
// Spawn the entry's refetch loop. The loop owns its own fetch
// closure handle resolution via the entry mutex — each tick
// reads the latest closure, so updates via re-register apply.
let entry_for_task = Arc::clone(&entry);
let cancel_for_task = cancel.clone();
tokio::spawn(async move {
loop {
tokio::select! {
_ = cancel_for_task.cancelled() => break,
msg = refetch_rx.recv() => {
if msg.is_none() { break; }
let (fetch_fn, tx) = {
let entry = entry_for_task.lock().await;
(entry.fetch_fn.clone(), entry.tx.clone())
};
// Loading state
let cur = tx.borrow().clone();
let loading = ContextState { data: cur.data, status: ContextStatus::Loading, error: None };
let _ = tx.send(loading);
// Drive the fetch
match fetch_fn().await {
Ok(data) => {
let _ = tx.send(ContextState { data: Some(data), status: ContextStatus::Success, error: None });
}
Err(err) => {
let cur = tx.borrow().clone();
let _ = tx.send(ContextState {
data: cur.data,
status: ContextStatus::Error,
error: Some(Arc::new(err)),
});
}
}
}
}
}
});
ContextHandle {
rx: tx.subscribe(),
refetch_tx,
cancel,
registry: Arc::clone(self),
name,
key,
}
}
/// Merge a value into a context entry's bundle slot. Mirrors the
/// TS kernel `merge(context, params, slot, value)` call.
pub async fn merge(
&self,
name: &str,
params: Option<&Value>,
slot: &str,
value: &Value,
) {
let key = match params {
Some(p) => stable_key(p),
None => stable_key(&Value::Object(Default::default())),
};
let entry_handle = {
let outer = self.entries.read().await;
outer.get(name).and_then(|inner| inner.get(&key)).cloned()
};
let Some(entry_arc) = entry_handle else { return };
let entry = entry_arc.lock().await;
let cur = entry.tx.borrow().clone();
let Some(bundle) = cur.data.as_ref() else { return };
let Some(merged) = crate::merge::merge_into_bundle(bundle, slot, value) else { return };
let _ = entry.tx.send(ContextState {
data: Some(merged),
status: ContextStatus::Success,
error: None,
});
}
/// Trigger refetch on every entry of `name`.
pub async fn invalidate_broad(&self, name: &str) {
let entries = {
let outer = self.entries.read().await;
outer.get(name).map(|inner| inner.values().cloned().collect::<Vec<_>>())
};
let Some(entries) = entries else { return };
for entry in entries {
let tx = {
let e = entry.lock().await;
e.refetch_tx.clone()
};
let _ = tx.send(());
}
}
/// Trigger refetch on the single entry matching `(name, params)`.
pub async fn invalidate_scoped(&self, name: &str, params: &Value) {
let key = stable_key(params);
let entry_arc = {
let outer = self.entries.read().await;
outer.get(name).and_then(|inner| inner.get(&key)).cloned()
};
let Some(entry_arc) = entry_arc else { return };
let tx = {
let entry = entry_arc.lock().await;
entry.refetch_tx.clone()
};
let _ = tx.send(());
}
async fn unregister(&self, name: &str, key: &str) {
let mut outer = self.entries.write().await;
if let Some(inner) = outer.get_mut(name) {
if let Some(entry) = inner.remove(key) {
let entry = entry.lock().await;
entry.cancel.cancel();
}
if inner.is_empty() {
outer.remove(name);
}
}
}
}
pub struct ContextHandle {
pub rx: watch::Receiver<ContextStateRaw>,
refetch_tx: mpsc::UnboundedSender<()>,
cancel: CancellationToken,
registry: Arc<ContextRegistry>,
name: String,
key: String,
}
impl ContextHandle {
/// Drive a refetch. Returns immediately; the new state lands on
/// `rx` once the kernel's refetch task finishes the fetch.
pub fn refetch(&self) {
let _ = self.refetch_tx.send(());
}
pub fn state(&self) -> ContextStateRaw {
self.rx.borrow().clone()
}
pub fn cancel_token(&self) -> CancellationToken {
self.cancel.clone()
}
pub async fn unregister(self) {
self.registry.unregister(&self.name, &self.key).await;
}
}
/// Byte-identical to TS `JSON.stringify(params, Object.keys(params).sort())`.
///
/// Uses `BTreeMap` for deterministic key ordering and serializes via
/// `serde_json::to_string` (compact, no whitespace) — matches the TS
/// default. Non-object / non-string params (numbers, booleans) pass
/// through serde_json's standard JSON representation.
pub fn stable_key(params: &Value) -> String {
match params {
Value::Object(map) => {
let sorted: BTreeMap<&String, &Value> = map.iter().collect();
serde_json::to_string(&sorted).unwrap_or_default()
}
other => serde_json::to_string(other).unwrap_or_default(),
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn stable_key_sorts_object_keys() {
let a = stable_key(&json!({"b": 1, "a": 2}));
let b = stable_key(&json!({"a": 2, "b": 1}));
assert_eq!(a, b);
assert_eq!(a, r#"{"a":2,"b":1}"#);
}
#[test]
fn stable_key_handles_empty_object() {
assert_eq!(stable_key(&json!({})), "{}");
}
#[tokio::test]
async fn register_and_refetch() {
let registry = Arc::new(ContextRegistry::new());
let counter = Arc::new(std::sync::atomic::AtomicU32::new(0));
let counter_clone = Arc::clone(&counter);
let fetch_fn: FetchFn = Arc::new(move || {
let counter = Arc::clone(&counter_clone);
Box::pin(async move {
let n = counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + 1;
Ok(json!({ "count": n }))
})
});
let mut handle = registry.register("test", json!({}), fetch_fn, None).await;
handle.refetch();
// Poll until success — watch::Receiver::changed() returns once
// per "newest value seen" advance, so back-to-back sends from the
// refetch task can coalesce into a single notification. The loop
// ignores intermediate Loading states and waits for Success.
loop {
tokio::time::timeout(std::time::Duration::from_secs(2), handle.rx.changed())
.await
.expect("changed timed out")
.unwrap();
if handle.state().status == ContextStatus::Success {
break;
}
}
let state = handle.state();
assert_eq!(state.data.unwrap()["count"], 1);
}
}

View File

@@ -0,0 +1,121 @@
//! Wire error envelope. Mirrors `MizanError` in `frontends/mizan-base/src/index.ts`.
//!
//! Two envelope shapes are tolerated:
//!
//! - FastAPI: `{"error": {"code": "...", "message": "...", "details": ...}}`
//! - Django: `{"error": true, "code": "...", "message": "...", "details": ...}`
//!
//! When neither shape parses, `code` falls back to `HTTP_<status>` and the
//! raw response body is the message.
use serde::Deserialize;
use serde_json::Value;
#[derive(Debug, Clone)]
pub struct MizanError {
pub status: u16,
pub code: String,
pub message: String,
pub details: Option<Value>,
pub raw_body: String,
}
impl MizanError {
pub fn from_response(status: u16, body: String) -> Self {
let parsed = serde_json::from_str::<Envelope>(&body).ok();
let (code, message, details) = match parsed {
Some(Envelope::Fastapi { error }) => (
error.code.unwrap_or_else(|| format!("HTTP_{status}")),
error.message.unwrap_or_else(|| format!("Mizan call failed ({status})")),
error.details,
),
Some(Envelope::Django { code, message, details, .. }) => (
code.unwrap_or_else(|| format!("HTTP_{status}")),
message.unwrap_or_else(|| format!("Mizan call failed ({status})")),
details,
),
None => (
format!("HTTP_{status}"),
format!("Mizan call failed ({status})"),
None,
),
};
Self { status, code, message, details, raw_body: body }
}
pub fn transport(message: impl Into<String>) -> Self {
Self {
status: 0,
code: "TRANSPORT".to_string(),
message: message.into(),
details: None,
raw_body: String::new(),
}
}
}
impl std::fmt::Display for MizanError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Mizan {} ({}): {}", self.status, self.code, self.message)
}
}
impl std::error::Error for MizanError {}
#[derive(Deserialize)]
#[serde(untagged)]
enum Envelope {
Fastapi { error: NestedError },
Django {
// Django form is `{"error": true, "code": ..., "message": ..., "details": ...}`.
// `error` is a bool sentinel; the actual fields are siblings.
#[allow(dead_code)]
error: bool,
code: Option<String>,
message: Option<String>,
details: Option<Value>,
},
}
#[derive(Deserialize)]
struct NestedError {
code: Option<String>,
message: Option<String>,
details: Option<Value>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parses_fastapi_envelope() {
let body = r#"{"error":{"code":"BAD_REQUEST","message":"oops","details":{"k":1}}}"#;
let e = MizanError::from_response(400, body.to_string());
assert_eq!(e.code, "BAD_REQUEST");
assert_eq!(e.message, "oops");
assert_eq!(e.details, Some(serde_json::json!({"k": 1})));
}
#[test]
fn parses_django_envelope() {
let body = r#"{"error":true,"code":"NOT_FOUND","message":"missing","details":null}"#;
let e = MizanError::from_response(404, body.to_string());
assert_eq!(e.code, "NOT_FOUND");
assert_eq!(e.message, "missing");
}
#[test]
fn falls_back_on_unparseable_body() {
let e = MizanError::from_response(500, "Internal Server Error".to_string());
assert_eq!(e.code, "HTTP_500");
assert!(e.message.contains("500"));
}
}

View File

@@ -0,0 +1,148 @@
//! Invalidation queue.
//!
//! Mirrors the TS kernel's `pending` / `pendingScoped` / `flush()` pair
//! at `frontends/mizan-base/src/index.ts`. Mutations accumulate
//! invalidation targets; the queue batches them and triggers refetches
//! on the matching context entries.
//!
//! The TS kernel uses `queueMicrotask(flush)` to batch within a single
//! event-loop tick. The Rust equivalent is a `tokio::task::yield_now()`
//! debounce: when `invalidate()` is called, push to the queue, and if
//! no flush is scheduled spawn a task that yields once then flushes.
//! That gives the same "batch within a single async tick" semantics.
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use serde_json::Value;
use tokio::sync::Mutex;
use crate::context::ContextRegistry;
#[derive(Debug, Clone)]
pub struct ScopedTarget {
pub context: String,
pub params: Value,
}
#[derive(Default)]
struct Pending {
broad: HashSet<String>,
scoped: Vec<ScopedTarget>,
}
pub struct InvalidationQueue {
pending: Mutex<Pending>,
scheduled: AtomicBool,
registry: Arc<ContextRegistry>,
}
impl InvalidationQueue {
pub fn new(registry: Arc<ContextRegistry>) -> Arc<Self> {
Arc::new(Self {
pending: Mutex::new(Pending::default()),
scheduled: AtomicBool::new(false),
registry,
})
}
/// Schedule a broad invalidation (every entry of `name` refetches).
pub async fn invalidate(self: &Arc<Self>, name: impl Into<String>) {
{
let mut pending = self.pending.lock().await;
pending.broad.insert(name.into());
}
self.schedule_flush();
}
/// Schedule a scoped invalidation (the entry matching `(name,
/// params)` refetches).
pub async fn invalidate_scoped(self: &Arc<Self>, name: impl Into<String>, params: Value) {
{
let mut pending = self.pending.lock().await;
pending.scoped.push(ScopedTarget { context: name.into(), params });
}
self.schedule_flush();
}
fn schedule_flush(self: &Arc<Self>) {
if self.scheduled.swap(true, Ordering::SeqCst) {
return;
}
let this = Arc::clone(self);
tokio::spawn(async move {
// Yield once to batch invalidations queued in the same
// async tick — equivalent to TS `queueMicrotask`.
tokio::task::yield_now().await;
this.flush().await;
this.scheduled.store(false, Ordering::SeqCst);
});
}
async fn flush(&self) {
let snapshot = {
let mut pending = self.pending.lock().await;
let broad = std::mem::take(&mut pending.broad);
let scoped = std::mem::take(&mut pending.scoped);
(broad, scoped)
};
let (broad, scoped) = snapshot;
// Broad first — they cover all scoped variants of the same name.
for name in &broad {
self.registry.invalidate_broad(name).await;
}
for target in &scoped {
if broad.contains(&target.context) {
continue;
}
self.registry.invalidate_scoped(&target.context, &target.params).await;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::context::{ContextHandle, ContextRegistry, ContextStatus, FetchFn};
use serde_json::json;
fn counted_fetch(counter: Arc<std::sync::atomic::AtomicU32>) -> FetchFn {
Arc::new(move || {
let counter = Arc::clone(&counter);
Box::pin(async move {
let n = counter.fetch_add(1, Ordering::SeqCst) + 1;
Ok(json!({ "count": n }))
})
})
}
async fn wait_for_success(handle: &mut ContextHandle) {
loop {
handle.rx.changed().await.unwrap();
if handle.state().status == ContextStatus::Success {
return;
}
}
}
#[tokio::test]
async fn broad_invalidate_triggers_refetch() {
let registry = Arc::new(ContextRegistry::new());
let queue = InvalidationQueue::new(Arc::clone(&registry));
let counter = Arc::new(std::sync::atomic::AtomicU32::new(0));
let mut handle = registry.register("user", json!({}), counted_fetch(Arc::clone(&counter)), None).await;
handle.refetch();
wait_for_success(&mut handle).await;
assert_eq!(counter.load(Ordering::SeqCst), 1);
queue.invalidate("user").await;
wait_for_success(&mut handle).await;
assert_eq!(counter.load(Ordering::SeqCst), 2);
}
}

View File

@@ -0,0 +1,28 @@
//! Mizan client kernel.
//!
//! Rust port of `@mizan/base` (frontends/mizan-base/src/index.ts). Same
//! public surface, same protocol, same wire shape. Consumers — generated
//! per-app crates, the GPU worker, the Python `PyMizanClient` — depend
//! on this kernel and never construct HTTP requests directly.
//!
//! Modules:
//! - [`client`] — `MizanClient`, `MizanConfig`, session init
//! - [`context`] — registry, `ContextState`, `ContextHandle`, `stable_key`
//! - [`error`] — `MizanError`, envelope parsing
//! - [`transport`] — `mizan_fetch`, `mizan_call`, retry, header resolution
//! - [`merge`] — `splice_slot`
//! - [`invalidation`] — `InvalidationQueue`, debounced flush
pub mod client;
pub mod context;
pub mod error;
pub mod invalidation;
pub mod merge;
pub mod transport;
#[cfg(feature = "pyo3")]
pub mod pyo3_bridge;
pub use client::{MizanClient, MizanConfig};
pub use context::{ContextHandle, ContextState, ContextStateRaw, ContextStatus, stable_key};
pub use error::MizanError;

View File

@@ -0,0 +1,107 @@
//! Mutation-driven merge of a value into a context's bundle slot.
//!
//! Mirrors `spliceSlot` in `frontends/mizan-base/src/index.ts`. The server
//! has already resolved which slot the value lands in (by matching the
//! mutation's return type against each context function's return type),
//! so the kernel does no inference — it writes directly to `bundle[slot]`.
//!
//! Rules:
//! - If the existing slot is an array and the new value is also an array,
//! the array replaces the slot wholesale.
//! - If the existing slot is an array and the new value is an object with
//! an `id` field, upsert by `id` — replace the matching entry in place
//! or append.
//! - Otherwise the slot is replaced with the new value.
use serde_json::Value;
pub fn splice_slot(slot: &Value, value: &Value) -> Value {
if let Value::Array(slot_arr) = slot {
if let Value::Array(_) = value {
return value.clone();
}
if let Some(id) = value.get("id") {
let mut next = slot_arr.clone();
let idx = next.iter().position(|item| item.get("id") == Some(id));
match idx {
Some(i) => next[i] = value.clone(),
None => next.push(value.clone()),
}
return Value::Array(next);
}
}
value.clone()
}
/// Apply a merge entry to the bundle of a context entry. Returns the new
/// bundle, or `None` if the slot wasn't present in the bundle (caller
/// should treat that as a no-op so server-driven merges into stale
/// caches don't fabricate slots).
pub fn merge_into_bundle(bundle: &Value, slot_name: &str, value: &Value) -> Option<Value> {
let obj = bundle.as_object()?;
if !obj.contains_key(slot_name) {
return None;
}
let mut next = obj.clone();
let spliced = splice_slot(obj.get(slot_name)?, value);
next.insert(slot_name.to_string(), spliced);
Some(Value::Object(next))
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn replaces_scalar_slot() {
let slot = json!(1);
let value = json!(2);
assert_eq!(splice_slot(&slot, &value), json!(2));
}
#[test]
fn upserts_array_by_id() {
let slot = json!([{"id": 1, "name": "a"}, {"id": 2, "name": "b"}]);
let value = json!({"id": 1, "name": "A"});
assert_eq!(
splice_slot(&slot, &value),
json!([{"id": 1, "name": "A"}, {"id": 2, "name": "b"}]),
);
}
#[test]
fn appends_when_id_not_in_array() {
let slot = json!([{"id": 1, "name": "a"}]);
let value = json!({"id": 9, "name": "z"});
assert_eq!(
splice_slot(&slot, &value),
json!([{"id": 1, "name": "a"}, {"id": 9, "name": "z"}]),
);
}
#[test]
fn array_replaces_array() {
let slot = json!([1, 2, 3]);
let value = json!([7, 8]);
assert_eq!(splice_slot(&slot, &value), json!([7, 8]));
}
#[test]
fn merge_into_bundle_skips_missing_slot() {
let bundle = json!({"existing": 1});
let value = json!(42);
assert!(merge_into_bundle(&bundle, "missing", &value).is_none());
}
#[test]
fn merge_into_bundle_updates_present_slot() {
let bundle = json!({"user_profile": {"id": 1, "name": "old"}});
let value = json!({"id": 1, "name": "new"});
let merged = merge_into_bundle(&bundle, "user_profile", &value).unwrap();
assert_eq!(merged["user_profile"]["name"], "new");
}
}

View File

@@ -0,0 +1,252 @@
//! PyO3 façade — exposes `MizanClient` to Python as `PyMizanClient`.
//!
//! Same kernel, same wire. The Python wrapper that the codegen emits
//! adds typed methods on top of this client (Pydantic in / Pydantic
//! out); this module's job is the GIL boundary plus the async-to-sync
//! bridge.
//!
//! Architecture:
//! - One tokio multi-thread runtime owned by the `PyMizanClient`.
//! - `call` / `fetch_context` use `py.allow_threads(|| rt.block_on(...))`
//! so the GIL is released across the network round-trip.
//! - `subscribe_context` spawns a tokio task that owns a watch
//! receiver; on each change the task acquires the GIL via
//! `Python::with_gil` and fires the Python callback. The returned
//! `CancellationToken` (wrapped as `PyContextSubscription`) lets
//! Python cancel the watcher.
use std::sync::Arc;
use pyo3::prelude::*;
use pyo3::types::{PyDict};
use pythonize::{depythonize, pythonize};
use serde_json::Value;
use tokio::runtime::Runtime;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use crate::client::{MizanClient, MizanConfig};
use crate::context::{ContextStateRaw, ContextStatus};
#[pyclass]
pub struct PyMizanClient {
inner: Arc<MizanClient>,
rt: Arc<Runtime>,
}
#[pyclass]
pub struct PyContextSubscription {
cancel: CancellationToken,
}
#[pymethods]
impl PyContextSubscription {
fn cancel(&self) {
self.cancel.cancel();
}
}
#[pymethods]
impl PyMizanClient {
#[new]
#[pyo3(signature = (base_url, *, session = false, csrf_cookie_name = String::from("csrftoken"), csrf_header_name = String::from("X-CSRFToken")))]
fn new(
base_url: String,
session: bool,
csrf_cookie_name: String,
csrf_header_name: String,
) -> PyResult<Self> {
let rt = Runtime::new()
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("tokio runtime: {e}")))?;
let config = MizanConfig {
base_url,
session,
csrf_cookie_name,
csrf_header_name,
extra_headers: Vec::new(),
};
Ok(Self {
inner: MizanClient::new(config),
rt: Arc::new(rt),
})
}
/// Invoke a mutation or plain function. `args` is a Python dict (or
/// any pythonize-compatible object). Returns the unwrapped `result`
/// from the server response as a Python object.
fn call(&self, py: Python<'_>, fn_name: String, args: &Bound<'_, PyDict>) -> PyResult<PyObject> {
let args_value: Value = depythonize(args.as_any())
.map_err(|e| PyErr::new::<pyo3::exceptions::PyValueError, _>(format!("args: {e}")))?;
let inner = Arc::clone(&self.inner);
let result: Value = py.allow_threads(|| {
self.rt.block_on(async move { inner.call(&fn_name, args_value).await })
})
.map_err(mizan_err_to_py)?;
pythonize(py, &result)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyValueError, _>(format!("encode result: {e}")))
.map(|bound| bound.unbind())
}
/// One-shot context fetch (does not register a subscription).
fn fetch_context(&self, py: Python<'_>, name: String, params: &Bound<'_, PyDict>) -> PyResult<PyObject> {
let params_value: Value = depythonize(params.as_any())
.map_err(|e| PyErr::new::<pyo3::exceptions::PyValueError, _>(format!("params: {e}")))?;
let inner = Arc::clone(&self.inner);
let result: Value = py.allow_threads(|| {
self.rt.block_on(async move { inner.fetch_context(&name, &params_value).await })
})
.map_err(mizan_err_to_py)?;
pythonize(py, &result)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyValueError, _>(format!("encode result: {e}")))
.map(|bound| bound.unbind())
}
/// Register a subscription. The kernel owns the fetch lifecycle;
/// `callback` is invoked from the watcher task once per state
/// change. Returns a handle whose `.cancel()` ends the subscription.
///
/// The callback receives a dict with keys: `data`, `status`,
/// `error`. Status is one of `"idle"`, `"loading"`, `"success"`,
/// `"error"`. Error is a dict with `code`, `message`, `status`,
/// `details` (or None).
#[pyo3(signature = (name, params, callback))]
fn subscribe_context(
&self,
py: Python<'_>,
name: String,
params: &Bound<'_, PyDict>,
callback: PyObject,
) -> PyResult<PyContextSubscription> {
let params_value: Value = depythonize(params.as_any())
.map_err(|e| PyErr::new::<pyo3::exceptions::PyValueError, _>(format!("params: {e}")))?;
// Build a serde-friendly fetch closure that delegates to the
// kernel's `fetch_context` (which itself runs the typed HTTP
// pipeline). The subscription's refetches go through this.
let inner_for_fetch = Arc::clone(&self.inner);
let name_for_fetch = name.clone();
let params_for_fetch = params_value.clone();
let fetch_fn = Arc::new(move || {
let inner = Arc::clone(&inner_for_fetch);
let name = name_for_fetch.clone();
let params = params_for_fetch.clone();
Box::pin(async move { inner.fetch_context(&name, &params).await })
as std::pin::Pin<Box<dyn std::future::Future<Output = _> + Send + 'static>>
});
let inner = Arc::clone(&self.inner);
let handle = py.allow_threads(|| {
self.rt.block_on(async move {
inner.register_context(name.clone(), params_value, fetch_fn).await
})
});
let cancel = handle.cancel_token();
let cancel_for_task = cancel.clone();
let callback = Arc::new(callback);
let callback_for_task = Arc::clone(&callback);
// Drive an initial refetch before destructuring so the first
// state lands without requiring the caller to invalidate.
handle.refetch();
let rx: watch::Receiver<ContextStateRaw> = handle.rx;
self.rt.spawn(async move {
let mut rx = rx;
loop {
tokio::select! {
_ = cancel_for_task.cancelled() => break,
res = rx.changed() => {
if res.is_err() { break; }
let snapshot = rx.borrow_and_update().clone();
Python::with_gil(|py| {
let dict = match state_to_pydict(py, &snapshot) {
Ok(d) => d,
Err(e) => { eprintln!("[pyo3_bridge] encode state: {e}"); return; }
};
if let Err(e) = callback_for_task.call1(py, (dict,)) {
eprintln!("[pyo3_bridge] callback raised: {e}");
}
});
}
}
}
});
Ok(PyContextSubscription { cancel })
}
/// Schedule a broad invalidation.
fn invalidate(&self, py: Python<'_>, name: String) {
let inner = Arc::clone(&self.inner);
py.allow_threads(|| {
self.rt.block_on(async move { inner.invalidate(name).await })
});
}
/// Schedule a scoped invalidation.
fn invalidate_scoped(&self, py: Python<'_>, name: String, params: &Bound<'_, PyDict>) -> PyResult<()> {
let params_value: Value = depythonize(params.as_any())
.map_err(|e| PyErr::new::<pyo3::exceptions::PyValueError, _>(format!("params: {e}")))?;
let inner = Arc::clone(&self.inner);
py.allow_threads(|| {
self.rt.block_on(async move { inner.invalidate_scoped(name, params_value).await })
});
Ok(())
}
}
fn state_to_pydict<'py>(py: Python<'py>, state: &ContextStateRaw) -> PyResult<Bound<'py, PyDict>> {
let dict = PyDict::new_bound(py);
let status = match state.status {
ContextStatus::Idle => "idle",
ContextStatus::Loading => "loading",
ContextStatus::Success => "success",
ContextStatus::Error => "error",
};
dict.set_item("status", status)?;
match &state.data {
Some(v) => {
let obj = pythonize(py, v)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyValueError, _>(format!("encode state.data: {e}")))?;
dict.set_item("data", obj)?;
}
None => dict.set_item("data", py.None())?,
}
match &state.error {
Some(err) => {
let err_dict = PyDict::new_bound(py);
err_dict.set_item("status", err.status)?;
err_dict.set_item("code", &err.code)?;
err_dict.set_item("message", &err.message)?;
if let Some(details) = &err.details {
let obj = pythonize(py, details)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyValueError, _>(format!("encode error.details: {e}")))?;
err_dict.set_item("details", obj)?;
} else {
err_dict.set_item("details", py.None())?;
}
dict.set_item("error", err_dict)?;
}
None => dict.set_item("error", py.None())?,
}
Ok(dict)
}
fn mizan_err_to_py(err: crate::MizanError) -> PyErr {
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{err}"))
}
/// Python extension module entry point. Wheels built via `maturin
/// develop --features pyo3` import the module as `mizan_rust`.
#[pymodule]
fn mizan_rust(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyMizanClient>()?;
m.add_class::<PyContextSubscription>()?;
Ok(())
}

View File

@@ -0,0 +1,153 @@
//! HTTP transport. Mirrors `mizanFetch` and `mizanCall` in
//! `frontends/mizan-base/src/index.ts`.
//!
//! - `mizan_fetch(client, context, params)` → `GET /api/mizan/ctx/<name>/?params`
//! - `mizan_call(client, fn_name, args)` → `POST /api/mizan/call/` with
//! `{fn, args}` body. On response, applies any `merge` entries first,
//! then `invalidate` entries, then returns the `result` field.
//!
//! Retries: 3 attempts total, 200ms × attempt linear backoff. Retries
//! on network errors and 5xx; surfaces 4xx immediately (matches TS).
//!
//! CSRF: the reqwest cookie jar stores the CSRF cookie from the
//! `/session/` bootstrap; on every call we read it via
//! `reqwest::cookie::Jar::cookies(&url)` and add it as the configured
//! header. Both names come from `MizanConfig`.
use std::time::Duration;
use reqwest::{Method, Url};
use serde::Deserialize;
use serde_json::Value;
use crate::client::MizanClient;
use crate::error::MizanError;
const MAX_ATTEMPTS: u32 = 3;
const BACKOFF_BASE: Duration = Duration::from_millis(200);
/// `GET /api/mizan/ctx/<context>/?params`.
pub async fn mizan_fetch(client: &MizanClient, context: &str, params: &Value) -> Result<Value, MizanError> {
let mut url = Url::parse(&format!("{}/ctx/{}/", client.config().base_url.trim_end_matches('/'), context))
.map_err(|e| MizanError::transport(format!("invalid base_url: {e}")))?;
if let Value::Object(map) = params {
let mut qp = url.query_pairs_mut();
for (k, v) in map {
let s = match v {
Value::String(s) => s.clone(),
other => other.to_string(),
};
qp.append_pair(k, &s);
}
}
let body = request_with_retry(client, Method::GET, url, None).await?;
serde_json::from_str(&body).map_err(|e| MizanError::transport(format!("decode: {e}")))
}
/// `POST /api/mizan/call/` with `{fn, args}` body. Applies merge +
/// invalidation entries from the response before returning `result`.
pub async fn mizan_call(client: &MizanClient, fn_name: &str, args: Value) -> Result<Value, MizanError> {
let url = Url::parse(&format!("{}/call/", client.config().base_url.trim_end_matches('/')))
.map_err(|e| MizanError::transport(format!("invalid base_url: {e}")))?;
let payload = serde_json::json!({ "fn": fn_name, "args": args });
let body_bytes = serde_json::to_vec(&payload)
.map_err(|e| MizanError::transport(format!("encode: {e}")))?;
let body = request_with_retry(client, Method::POST, url, Some(body_bytes)).await?;
let response: CallResponse = serde_json::from_str(&body)
.map_err(|e| MizanError::transport(format!("decode: {e}")))?;
if let Some(merges) = response.merge {
for entry in &merges {
client.context_registry()
.merge(&entry.context, entry.params.as_ref(), &entry.slot, &entry.value)
.await;
}
}
if let Some(invalidations) = response.invalidate {
for entry in invalidations {
match entry {
InvalidateEntry::Broad(name) => {
client.invalidation_queue().invalidate(name).await;
}
InvalidateEntry::Scoped { context, params } => {
client.invalidation_queue().invalidate_scoped(context, params).await;
}
}
}
}
Ok(response.result.unwrap_or(Value::Null))
}
async fn request_with_retry(
client: &MizanClient,
method: Method,
url: Url,
body: Option<Vec<u8>>,
) -> Result<String, MizanError> {
client.ensure_session_ready().await?;
let mut last_err: Option<MizanError> = None;
for attempt in 0..MAX_ATTEMPTS {
let headers = client.resolve_headers().await;
let mut req = client.http().request(method.clone(), url.clone()).headers(headers);
if let Some(bytes) = &body {
req = req.header(reqwest::header::CONTENT_TYPE, "application/json")
.body(bytes.clone());
}
match req.send().await {
Ok(res) => {
let status = res.status().as_u16();
let text = res.text().await.unwrap_or_default();
if status < 400 {
return Ok(text);
}
if (400..500).contains(&status) {
return Err(MizanError::from_response(status, text));
}
last_err = Some(MizanError::from_response(status, text));
}
Err(e) => {
last_err = Some(MizanError::transport(e.to_string()));
}
}
if attempt + 1 < MAX_ATTEMPTS {
tokio::time::sleep(BACKOFF_BASE.saturating_mul(attempt + 1)).await;
}
}
Err(last_err.unwrap_or_else(|| MizanError::transport("retry budget exhausted")))
}
#[derive(Deserialize)]
struct CallResponse {
result: Option<Value>,
#[serde(default)]
merge: Option<Vec<MergeEntry>>,
#[serde(default)]
invalidate: Option<Vec<InvalidateEntry>>,
}
#[derive(Deserialize)]
struct MergeEntry {
context: String,
#[serde(default)]
params: Option<Value>,
slot: String,
value: Value,
}
#[derive(Deserialize)]
#[serde(untagged)]
enum InvalidateEntry {
Broad(String),
Scoped { context: String, params: Value },
}