Fix critical issues C1-C5, C7, H1, H2, H4, H10

C1+C7: Cache purge now passes user_id and works for view-path
mutations. Extracted _purge_cache_for_invalidation() shared helper
used by both RPC and view-path branches.

C2: initSession retries 3x with backoff. Resets on total failure
so next call tries again instead of permanently broken CSRF.

C3: SSR template backend injects __MIZAN_SSR_DATA__ script tag
with serialized props for client-side hydration.

C4: SSR bridge uses _write_lock to serialize stdin writes from
concurrent Django threads. Prevents JSON interleaving.

C5: SSR bridge registers atexit handler for process cleanup.
No more orphaned Bun processes on Django reload/shutdown.

H1: pendingScoped changed from Map to Array — multiple scoped
invalidations for the same context no longer overwrite.

H2: registerContext uses stableKey() (sorted JSON) instead of
bare JSON.stringify. Property order no longer matters.

H4: Named context providers skip refetch if SSR data exists
(matches global context behavior).

H10: _meta always assigned as fresh dict, preventing shared-dict
mutation across ServerFunction subclasses.

373 Django + 33 React tests pass.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-07 12:24:41 -04:00
parent 499aa0e038
commit cdd15b3810
6 changed files with 89 additions and 71 deletions

View File

@@ -106,7 +106,7 @@ export function generateReactAdapter(schema) {
const deps = paramEntries.map(([pName]) => `params.${pName}`) const deps = paramEntries.map(([pName]) => `params.${pName}`)
lines.push(` }, [${deps.join(', ')}])`) lines.push(` }, [${deps.join(', ')}])`)
lines.push('') lines.push('')
lines.push(` useEffect(() => { refetch() }, [refetch])`) lines.push(` useEffect(() => { if (!data) refetch() }, [data, refetch])`)
lines.push(` useEffect(() => registerContext('${ctxName}', params, refetch), [${deps.join(', ')}, refetch])`) lines.push(` useEffect(() => registerContext('${ctxName}', params, refetch), [${deps.join(', ')}, refetch])`)
lines.push('') lines.push('')
lines.push(` return <${p}Ctx.Provider value={data}>{children}</${p}Ctx.Provider>`) lines.push(` return <${p}Ctx.Provider value={data}>{children}</${p}Ctx.Provider>`)

View File

@@ -161,6 +161,42 @@ def _check_auth_requirement(
return None return None
_cache_log = logging.getLogger("mizan.cache")
def _purge_cache_for_invalidation(
invalidate: list,
request: HttpRequest | None = None,
) -> None:
"""Purge origin-side cache for invalidation targets. Includes user_id if available."""
cache = get_cache()
if cache is None:
return
settings = get_settings()
if not settings.cache_secret:
return
user_id = None
if request and hasattr(request, 'user') and hasattr(request.user, 'pk'):
uid = getattr(request.user, 'pk', None)
if uid is not None:
user_id = str(uid)
try:
for entry in invalidate:
if isinstance(entry, str):
cache_purge(cache, entry)
elif isinstance(entry, dict):
cache_purge(
cache, entry["context"], entry.get("params"),
secret=settings.cache_secret,
user_id=user_id,
)
except Exception:
_cache_log.warning("Cache purge failed", exc_info=True)
def _resolve_affects_target(target_name: str) -> tuple[str, str, str | None]: def _resolve_affects_target(target_name: str) -> tuple[str, str, str | None]:
""" """
Determine whether an affects target is a context name or function name. Determine whether an affects target is a context name or function name.
@@ -444,10 +480,11 @@ def execute_function(
from django.http import HttpResponseBase from django.http import HttpResponseBase
if isinstance(output, HttpResponseBase): if isinstance(output, HttpResponseBase):
# View path — add invalidation header, pass through the response # View path — add invalidation header + purge origin cache
invalidate = _resolve_invalidation(view_class, input_data) invalidate = _resolve_invalidation(view_class, input_data)
if invalidate: if invalidate:
output["X-Mizan-Invalidate"] = _format_invalidate_header(invalidate) output["X-Mizan-Invalidate"] = _format_invalidate_header(invalidate)
_purge_cache_for_invalidation(invalidate, request)
output["Cache-Control"] = "no-store" output["Cache-Control"] = "no-store"
return output return output
@@ -701,28 +738,9 @@ def function_call_view(request: HttpRequest) -> JsonResponse:
response = JsonResponse(response_data) response = JsonResponse(response_data)
response["Cache-Control"] = "no-store" response["Cache-Control"] = "no-store"
# Always set the header transport too (Edge reads this)
if invalidate_contexts: if invalidate_contexts:
response["X-Mizan-Invalidate"] = _format_invalidate_header(invalidate_contexts) response["X-Mizan-Invalidate"] = _format_invalidate_header(invalidate_contexts)
_purge_cache_for_invalidation(invalidate_contexts, request)
# Purge origin-side cache for invalidated contexts
_cache_log = logging.getLogger("mizan.cache")
cache = get_cache()
cache_settings = get_settings()
if cache is not None:
try:
for entry in invalidate_contexts:
if isinstance(entry, str):
# Broad purge (no params) — prefix scan
cache_purge(cache, entry)
elif isinstance(entry, dict):
# Scoped purge — recompute key and delete directly
cache_purge(
cache, entry["context"], entry.get("params"),
secret=cache_settings.cache_secret,
)
except Exception:
_cache_log.warning("Cache purge failed", exc_info=True)
return response return response

View File

@@ -536,8 +536,8 @@ def _create_server_function(
if cache is not True: if cache is not True:
meta["cache"] = cache meta["cache"] = cache
if meta: # Always assign a fresh dict to prevent shared-dict mutation across classes
FunctionWrapper._meta = {**FunctionWrapper._meta, **meta} FunctionWrapper._meta = {**meta}
# Note: Registration happens via discovery (mizan_clients), not here. # Note: Registration happens via discovery (mizan_clients), not here.
# This allows the decorator to be used without import-time side effects. # This allows the decorator to be used without import-time side effects.

View File

@@ -35,13 +35,21 @@ class MizanTemplate:
self._bridge = bridge self._bridge = bridge
def render(self, context: dict[str, Any] | None = None, request: Any = None) -> str: def render(self, context: dict[str, Any] | None = None, request: Any = None) -> str:
import json as _json
props = dict(context) if context else {} props = dict(context) if context else {}
props.pop("request", None) props.pop("request", None)
props.pop("csrf_token", None) props.pop("csrf_token", None)
result = self._bridge.render(self.file_path, props) result = self._bridge.render(self.file_path, props)
return mark_safe(f'<div id="mizan-root">{result.html}</div>') # Serialize props as hydration data for client-side React
hydration_json = _json.dumps(props, sort_keys=True, default=str)
return mark_safe(
f'<div id="mizan-root">{result.html}</div>'
f'<script>window.__MIZAN_SSR_DATA__={hydration_json}</script>'
)
class MizanTemplates(BaseEngine): class MizanTemplates(BaseEngine):

View File

@@ -3,7 +3,7 @@ SSR Bridge — Manages a persistent Bun subprocess for React rendering.
Protocol: newline-delimited JSON-RPC over stdin/stdout. Protocol: newline-delimited JSON-RPC over stdin/stdout.
Request: {"id": 1, "method": "render", "params": {"component": "ProfilePage", "props": {...}}} Request: {"id": 1, "method": "render", "params": {"file": "/abs/path/Hello.tsx", "props": {...}}}
Response: {"id": 1, "html": "<div>...</div>"} Response: {"id": 1, "html": "<div>...</div>"}
The subprocess stays alive across requests. It is started on first use The subprocess stays alive across requests. It is started on first use
@@ -12,6 +12,7 @@ and restarted automatically if it crashes.
from __future__ import annotations from __future__ import annotations
import atexit
import json import json
import logging import logging
import subprocess import subprocess
@@ -41,12 +42,16 @@ class SSRBridge:
self._timeout = timeout self._timeout = timeout
self._proc: subprocess.Popen | None = None self._proc: subprocess.Popen | None = None
self._lock = threading.Lock() self._lock = threading.Lock()
self._write_lock = threading.Lock() # Serializes stdin writes
self._counter = 0 self._counter = 0
self._pending: dict[int, threading.Event] = {} self._pending: dict[int, threading.Event] = {}
self._results: dict[int, dict] = {} self._results: dict[int, dict] = {}
self._reader_thread: threading.Thread | None = None self._reader_thread: threading.Thread | None = None
self._ready = threading.Event() self._ready = threading.Event()
# Ensure cleanup on process exit
atexit.register(self.shutdown)
def _ensure_running(self) -> None: def _ensure_running(self) -> None:
"""Start the Bun subprocess if it's not running.""" """Start the Bun subprocess if it's not running."""
if self._proc is not None and self._proc.poll() is None: if self._proc is not None and self._proc.poll() is None:
@@ -134,12 +139,14 @@ class SSRBridge:
"params": {"file": file, "props": props or {}}, "params": {"file": file, "props": props or {}},
}) + "\n" }) + "\n"
try: # Serialize stdin writes to prevent interleaving from concurrent threads
self._proc.stdin.write(request.encode("utf-8")) with self._write_lock:
self._proc.stdin.flush() try:
except (BrokenPipeError, OSError) as e: self._proc.stdin.write(request.encode("utf-8"))
del self._pending[msg_id] self._proc.stdin.flush()
raise RuntimeError(f"SSR worker pipe broken: {e}") except (BrokenPipeError, OSError) as e:
self._pending.pop(msg_id, None)
raise RuntimeError(f"SSR worker pipe broken: {e}")
if not event.wait(self._timeout): if not event.wait(self._timeout):
self._pending.pop(msg_id, None) self._pending.pop(msg_id, None)
@@ -155,33 +162,6 @@ class SSRBridge:
return RenderResult(html=result["html"]) return RenderResult(html=result["html"])
def ping(self) -> bool:
"""Health check. Returns True if the worker responds."""
with self._lock:
self._ensure_running()
self._counter += 1
msg_id = self._counter
event = threading.Event()
self._pending[msg_id] = event
request = json.dumps({"id": msg_id, "method": "ping"}) + "\n"
try:
self._proc.stdin.write(request.encode("utf-8"))
self._proc.stdin.flush()
except (BrokenPipeError, OSError):
del self._pending[msg_id]
return False
if not event.wait(self._timeout):
self._pending.pop(msg_id, None)
return False
self._pending.pop(msg_id, None)
result = self._results.pop(msg_id)
return result.get("pong", False)
def shutdown(self) -> None: def shutdown(self) -> None:
"""Stop the Bun subprocess.""" """Stop the Bun subprocess."""
if self._proc is not None: if self._proc is not None:

View File

@@ -58,19 +58,26 @@ let _sessionReady: Promise<void> | null = null
* Initialize a session (fetches CSRF cookie from GET /session/). * Initialize a session (fetches CSRF cookie from GET /session/).
* Called automatically on first fetch if not called explicitly. * Called automatically on first fetch if not called explicitly.
* No-op if a CSRF cookie already exists. * No-op if a CSRF cookie already exists.
* Retries on failure — resets so next call tries again.
*/ */
export function initSession(): Promise<void> { export function initSession(): Promise<void> {
if (_sessionReady) return _sessionReady if (_sessionReady) return _sessionReady
_sessionReady = (async () => { _sessionReady = (async () => {
// If we already have a CSRF token, skip
if (getCSRFToken()) return if (getCSRFToken()) return
try { for (let attempt = 0; attempt < 3; attempt++) {
await fetch(`${config.baseUrl}/session/`, { credentials: 'include' }) try {
} catch (e) { await fetch(`${config.baseUrl}/session/`, { credentials: 'include' })
console.error('[mizan] Session init failed:', e) if (getCSRFToken()) return
} catch (e) {
console.warn(`[mizan] Session init attempt ${attempt + 1} failed:`, e)
}
if (attempt < 2) await new Promise(r => setTimeout(r, (attempt + 1) * 100))
} }
// All retries failed — reset so next call tries again
_sessionReady = null
})() })()
return _sessionReady return _sessionReady
@@ -88,26 +95,31 @@ interface ContextEntry {
const contexts: Map<string, Map<ParamKey, ContextEntry>> = new Map() const contexts: Map<string, Map<ParamKey, ContextEntry>> = new Map()
/** Deterministic JSON key for params — sorted to avoid order-dependency */
function stableKey(params: Record<string, any>): string {
return JSON.stringify(params, Object.keys(params).sort())
}
export function registerContext( export function registerContext(
name: string, name: string,
params: Record<string, any>, params: Record<string, any>,
refetch: RefetchFn, refetch: RefetchFn,
): () => void { ): () => void {
if (!contexts.has(name)) contexts.set(name, new Map()) if (!contexts.has(name)) contexts.set(name, new Map())
const key = JSON.stringify(params) const key = stableKey(params)
contexts.get(name)!.set(key, { params, refetch }) contexts.get(name)!.set(key, { params, refetch })
return () => contexts.get(name)!.delete(key) return () => contexts.get(name)?.delete(key)
} }
// === Invalidation === // === Invalidation ===
const pending: Set<string> = new Set() const pending: Set<string> = new Set()
const pendingScoped: Map<string, Record<string, any>> = new Map() const pendingScoped: Array<{ context: string; params: Record<string, any> }> = []
let scheduled = false let scheduled = false
export function invalidate(context: string, params?: Record<string, any>): void { export function invalidate(context: string, params?: Record<string, any>): void {
if (params) { if (params) {
pendingScoped.set(context, params) pendingScoped.push({ context, params })
} else { } else {
pending.add(context) pending.add(context)
} }
@@ -123,17 +135,17 @@ function flush(): void {
if (entries) entries.forEach(entry => entry.refetch()) if (entries) entries.forEach(entry => entry.refetch())
} }
for (const [name, params] of pendingScoped) { for (const { context: name, params } of pendingScoped) {
if (pending.has(name)) continue if (pending.has(name)) continue
const entries = contexts.get(name) const entries = contexts.get(name)
if (!entries) continue if (!entries) continue
const key = JSON.stringify(params) const key = stableKey(params)
const entry = entries.get(key) const entry = entries.get(key)
if (entry) entry.refetch() if (entry) entry.refetch()
} }
pending.clear() pending.clear()
pendingScoped.clear() pendingScoped.length = 0
scheduled = false scheduled = false
} }