Mizan IR: cut over to KDL, delete OpenAPI envelope
Replaces the transitional OpenAPI 3.0 + `x-mizan-*` extensions
substrate with the canonical Mizan IR as KDL, per docs/AFI_ARCHITECTURE.md:
"KDL is the contract; everything else (REST envelopes, OpenAPI
documents, framework idioms) is sediment around it."
End-to-end cutover. No transitional path left on main.
Forward direction:
cores/mizan-python/src/mizan_core/ir.py
build_ir() walks mizan_core.registry, introspects Pydantic
models directly (no JSON-Schema indirection), and emits the
Mizan IR document. The KDL grammar is locked in this file's
module docstring.
Backends emit KDL:
backends/mizan-fastapi/src/mizan_fastapi/ir.py
`python -m mizan_fastapi.ir <module>` — CLI entry point.
backends/mizan-django/.../management/commands/export_mizan_ir.py
`manage.py export_mizan_ir` — Django mgmt command.
Codegen consumes KDL:
protocol/mizan-codegen/Cargo.toml: + kdl = "6"
protocol/mizan-codegen/src/ir.rs: NamedType { Struct/List/Enum/Alias }
+ TypeShape { Primitive/Ref/List/Optional/Enum/Union } sum types,
replacing the JsonSchema sprawl. KDL parser walks the
`kdl::KdlDocument` tree into typed Rust structs.
protocol/mizan-codegen/src/fetch.rs: subprocess command switches
to the new IR-export entry points.
All emit modules (stage1 / react / python / rust / vue / svelte /
channels) port their type-walkers from JsonSchema to the new
sum types — case analysis collapses substantially.
Substrate-honesty wins beyond the moat closure:
- `int | bool` multi-arm unions land as `TypeShape::Union` (was
silently coerced to "string" before).
- `<CamelName>Output = list[T]` returns emit as named alias
types instead of struct-shaped wrappers, so consumer code
`.map()` works directly on the type.
- Pydantic field defaults flow through to `default` properties
in KDL, then back to non-optional shape in every target.
Deleted:
- backends/mizan-fastapi/src/mizan_fastapi/{cli,schema}.py
- backends/mizan-django/.../export_mizan_schema.py
- openapi-bearing half of mizan/export/__init__.py (edge
manifest generator preserved — separate concern).
- tests/afi/schema_normalizer.py
- tests/fixtures/{afi_schema.json, channels_schema.json}
- tests/fixtures/js_* baseline directories.
Verification:
- 20 mizan-codegen unit tests green (IR deserialization,
byte-equivalence parity across stage1/rust/python/react/vue/svelte
against fresh KDL-driven baselines, channels structural).
- tests/rust/run_wire_parity.py: 12/12 probes green driving
the binary end-to-end through KDL.
- Blazr studio-ui typechecks against the regenerated React client.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
575
cores/mizan-python/src/mizan_core/ir.py
Normal file
575
cores/mizan-python/src/mizan_core/ir.py
Normal file
@@ -0,0 +1,575 @@
|
||||
"""
|
||||
Mizan IR — KDL emission from the live `mizan_core.registry`.
|
||||
|
||||
`build_ir()` walks every registered function class, introspects its
|
||||
Pydantic Input/Output models directly (not via JSON-Schema), and emits
|
||||
KDL — the canonical Mizan protocol IR. Every backend adapter exposes
|
||||
this via a backend-specific entry point (Django management command,
|
||||
FastAPI CLI, mizan-ts equivalent); every codegen target consumes this.
|
||||
|
||||
KDL grammar — locked contract:
|
||||
|
||||
type "<Name>" {
|
||||
struct {
|
||||
field "<name>" required=#true|#false default=<lit> {
|
||||
primitive "integer|number|boolean|string"
|
||||
| ref "<TypeName>"
|
||||
| list { <type-child> }
|
||||
| optional { <type-child> }
|
||||
| enum "<v1>" "<v2>" ...
|
||||
}
|
||||
...
|
||||
}
|
||||
| list { <type-child> }
|
||||
| enum "<v1>" "<v2>" ...
|
||||
| alias { <type-child> }
|
||||
}
|
||||
|
||||
function "<wire_name>" {
|
||||
camel "<camelCase>"
|
||||
has-input #true|#false
|
||||
input "<TypeName>" // omitted if has-input=#false
|
||||
output "<TypeName>"
|
||||
output-nullable #true|#false // omitted when #false (default)
|
||||
transport "http"|"websocket"|"both"
|
||||
context "<ctx_name>" // omitted unless context-grouped
|
||||
affects "<ctx_name>" // 0..N occurrences
|
||||
merge "<ctx_name>" // 0..N occurrences
|
||||
is-form #true // omitted when #false (default)
|
||||
form-name "<name>"
|
||||
form-role "<role>"
|
||||
}
|
||||
|
||||
context "<name>" {
|
||||
function "<fn_name>"
|
||||
...
|
||||
param "<param_name>" {
|
||||
type "integer|number|boolean|string"
|
||||
required #true|#false
|
||||
shared-by "<fn_name>"
|
||||
...
|
||||
}
|
||||
}
|
||||
|
||||
channel "<name>" {
|
||||
pascal-name "<PascalCase>"
|
||||
params "<TypeName>" // omitted if no params
|
||||
react-message "<TypeName>" // omitted if no react message
|
||||
django-message "<TypeName>" // omitted if no django message
|
||||
}
|
||||
|
||||
Nothing else lives in the IR. OpenAPI envelope, JSON-Schema $ref dance,
|
||||
the Pydantic→json-schema converter — all gone.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import types
|
||||
from typing import Any, Literal, Union, get_args, get_origin
|
||||
|
||||
from pydantic import BaseModel
|
||||
from pydantic_core import PydanticUndefined
|
||||
|
||||
from mizan_core.registry import get_all_functions, get_context_groups, get_function
|
||||
from mizan_core.type_utils import extract_list_element, extract_optional
|
||||
|
||||
|
||||
__all__ = ["build_ir"]
|
||||
|
||||
|
||||
# Common user-identity param names; mirrors the equivalent in mizan-django /
|
||||
# mizan-fastapi schema-export logic.
|
||||
_USER_SCOPED_PARAMS = {"user_id", "user", "owner_id", "account_id"}
|
||||
|
||||
|
||||
# ─── KDL value formatting ───────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _kdl_string(s: str) -> str:
|
||||
"""KDL-escape a string and wrap in quotes."""
|
||||
escaped = (
|
||||
s.replace("\\", "\\\\")
|
||||
.replace("\"", "\\\"")
|
||||
.replace("\n", "\\n")
|
||||
.replace("\r", "\\r")
|
||||
.replace("\t", "\\t")
|
||||
)
|
||||
return f'"{escaped}"'
|
||||
|
||||
|
||||
def _kdl_bool(b: bool) -> str:
|
||||
return "#true" if b else "#false"
|
||||
|
||||
|
||||
def _kdl_value(v: Any) -> str:
|
||||
"""Render a JSON-shape Python value as a KDL literal."""
|
||||
if v is None:
|
||||
return "#null"
|
||||
if v is True or v is False:
|
||||
return _kdl_bool(v)
|
||||
if isinstance(v, (int, float)):
|
||||
return repr(v)
|
||||
if isinstance(v, str):
|
||||
return _kdl_string(v)
|
||||
# Fallback for compound values — defaults aren't typed in our IR.
|
||||
import json
|
||||
return _kdl_string(json.dumps(v))
|
||||
|
||||
|
||||
# ─── KDL Builder ────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class _Block:
|
||||
"""Open-children context for a KDL node. Tracks indent level."""
|
||||
|
||||
__slots__ = ("lines", "indent")
|
||||
|
||||
def __init__(self, lines: list[str], indent: int):
|
||||
self.lines = lines
|
||||
self.indent = indent
|
||||
|
||||
def _prefix(self) -> str:
|
||||
return " " * self.indent
|
||||
|
||||
def node(self, name: str, *args: str, **props: str) -> "_OpenNode":
|
||||
"""Open a node. `args` are positional KDL args; `props` are key=value pairs."""
|
||||
return _OpenNode(self.lines, self.indent, name, list(args), dict(props))
|
||||
|
||||
def leaf(self, name: str, *args: str, **props: str) -> None:
|
||||
"""Emit a leaf node — no children block."""
|
||||
parts = [name]
|
||||
parts.extend(args)
|
||||
for k, v in props.items():
|
||||
parts.append(f"{k}={v}")
|
||||
self.lines.append(f"{self._prefix()}{' '.join(parts)}")
|
||||
|
||||
|
||||
class _OpenNode:
|
||||
"""A KDL node whose children are being built."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
lines: list[str],
|
||||
indent: int,
|
||||
name: str,
|
||||
args: list[str],
|
||||
props: dict[str, str],
|
||||
):
|
||||
self.lines = lines
|
||||
self.indent = indent
|
||||
self.name = name
|
||||
self.args = args
|
||||
self.props = props
|
||||
self._children_emitted = False
|
||||
|
||||
def __enter__(self) -> _Block:
|
||||
parts = [self.name]
|
||||
parts.extend(self.args)
|
||||
for k, v in self.props.items():
|
||||
parts.append(f"{k}={v}")
|
||||
self.lines.append(f"{' ' * self.indent}{' '.join(parts)} {{")
|
||||
self._children_emitted = True
|
||||
return _Block(self.lines, self.indent + 1)
|
||||
|
||||
def __exit__(self, *_exc: Any) -> None:
|
||||
if self._children_emitted:
|
||||
self.lines.append(f"{' ' * self.indent}}}")
|
||||
|
||||
|
||||
# ─── Type emission ──────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _emit_type_child(block: _Block, annotation: Any, named_types: dict[str, Any]) -> None:
|
||||
"""Emit the type-shape KDL for a Python annotation, recursing as needed."""
|
||||
# Strip Optional[T] → emit `optional` wrapper.
|
||||
inner, is_opt = extract_optional(annotation)
|
||||
if is_opt:
|
||||
with block.node("optional") as inner_block:
|
||||
_emit_type_child(inner_block, inner, named_types)
|
||||
return
|
||||
|
||||
# Multi-arm union (T | U) — emit `union { <each-branch> }`.
|
||||
origin = get_origin(annotation)
|
||||
if origin is Union or isinstance(annotation, types.UnionType):
|
||||
branches = [a for a in get_args(annotation) if a is not type(None)]
|
||||
if len(branches) > 1:
|
||||
with block.node("union") as inner_block:
|
||||
for branch in branches:
|
||||
_emit_type_child(inner_block, branch, named_types)
|
||||
return
|
||||
|
||||
# list[T] / tuple[T, ...] / set[T] / frozenset[T] → `list { ... }`
|
||||
elem = extract_list_element(annotation)
|
||||
if elem is not None:
|
||||
with block.node("list") as inner_block:
|
||||
_emit_type_child(inner_block, elem, named_types)
|
||||
return
|
||||
|
||||
# Literal[a, b, c] → enum
|
||||
if origin is Literal:
|
||||
args = get_args(annotation)
|
||||
if all(isinstance(a, str) for a in args):
|
||||
quoted = " ".join(_kdl_string(a) for a in args)
|
||||
block.lines.append(f"{block._prefix()}enum {quoted}")
|
||||
return
|
||||
|
||||
# Pydantic model → reference by name.
|
||||
if isinstance(annotation, type) and issubclass(annotation, BaseModel):
|
||||
type_name = annotation.__name__
|
||||
named_types.setdefault(type_name, _StructShape(annotation))
|
||||
block.leaf("ref", _kdl_string(type_name))
|
||||
return
|
||||
|
||||
# Primitives
|
||||
if annotation is int:
|
||||
block.leaf("primitive", _kdl_string("integer"))
|
||||
return
|
||||
if annotation is float:
|
||||
block.leaf("primitive", _kdl_string("number"))
|
||||
return
|
||||
if annotation is bool:
|
||||
block.leaf("primitive", _kdl_string("boolean"))
|
||||
return
|
||||
if annotation is str:
|
||||
block.leaf("primitive", _kdl_string("string"))
|
||||
return
|
||||
|
||||
# Open-shape fallback (dict / Any / etc).
|
||||
block.leaf("primitive", _kdl_string("string"))
|
||||
|
||||
|
||||
def _emit_alias_type(block: _Block, annotation: Any, named_types: dict[str, Any]) -> None:
|
||||
"""Emit `type "X" { alias { <type-child> } }` for a non-struct wrapper."""
|
||||
with block.node("alias") as alias_block:
|
||||
_emit_type_child(alias_block, annotation, named_types)
|
||||
|
||||
|
||||
def _emit_struct_type(block: _Block, model: type[BaseModel], named_types: dict[str, Any]) -> None:
|
||||
"""Emit a `struct { field ... }` block for a Pydantic model."""
|
||||
with block.node("struct") as struct_block:
|
||||
for field_name, field_info in model.model_fields.items():
|
||||
props: dict[str, str] = {}
|
||||
# `field_info.is_required()` checks both the explicit Required
|
||||
# marker and the presence of a default.
|
||||
required = field_info.is_required()
|
||||
if not required:
|
||||
props["required"] = _kdl_bool(False)
|
||||
default = field_info.default
|
||||
if default is not None and default is not PydanticUndefined and default is not ...:
|
||||
props["default"] = _kdl_value(default)
|
||||
|
||||
with struct_block.node("field", _kdl_string(field_name), **props) as field_block:
|
||||
_emit_type_child(field_block, field_info.annotation, named_types)
|
||||
|
||||
|
||||
class _StructShape:
|
||||
"""A Pydantic BaseModel that emits as `type "X" { struct { ... } }`."""
|
||||
__slots__ = ("model",)
|
||||
def __init__(self, model: type[BaseModel]):
|
||||
self.model = model
|
||||
|
||||
|
||||
class _AliasShape:
|
||||
"""A named alias wrapper — e.g. `<CamelName>Output = list[<Inner>]`."""
|
||||
__slots__ = ("annotation",)
|
||||
def __init__(self, annotation: Any):
|
||||
self.annotation = annotation
|
||||
|
||||
|
||||
def _collect_named_types(functions: dict[str, Any]) -> dict[str, Any]:
|
||||
"""First pass: collect every named type the IR's `function` section references.
|
||||
|
||||
Two kinds:
|
||||
- Pydantic BaseModels seen anywhere in Input/Output traversal — emit
|
||||
as `type "X" { struct { ... } }`.
|
||||
- Function-output wrapper aliases (`<CamelName>Output = list[T]` /
|
||||
`<CamelName>Output = T | None`) — emit as `type "X" { alias { ... } }`
|
||||
so the consumer has a single named type to reference.
|
||||
"""
|
||||
seen: dict[str, Any] = {}
|
||||
|
||||
def visit_model(model: type[BaseModel]) -> None:
|
||||
if model.__name__ in seen:
|
||||
return
|
||||
seen[model.__name__] = _StructShape(model)
|
||||
for field_info in model.model_fields.values():
|
||||
for nested in _nested_models(field_info.annotation):
|
||||
visit_model(nested)
|
||||
|
||||
def visit_annotation(ann: Any) -> None:
|
||||
for nested in _nested_models(ann):
|
||||
visit_model(nested)
|
||||
|
||||
for fn_class in functions.values():
|
||||
input_cls = getattr(fn_class, "Input", None)
|
||||
if _has_input(input_cls):
|
||||
input_named = _name_input_model(fn_class)
|
||||
visit_model(input_named)
|
||||
|
||||
output_cls = getattr(fn_class, "Output", None)
|
||||
if output_cls is None:
|
||||
continue
|
||||
camel = _snake_to_camel(fn_class.name)
|
||||
output_name = f"{camel}Output"
|
||||
|
||||
inner, _ = extract_optional(output_cls)
|
||||
elem = extract_list_element(inner)
|
||||
|
||||
if elem is not None:
|
||||
# `list[T]` (possibly wrapped in Optional) — emit a list alias.
|
||||
# Visit the element type so its struct shape gets emitted too.
|
||||
visit_annotation(output_cls)
|
||||
if output_name not in seen:
|
||||
seen[output_name] = _AliasShape(output_cls)
|
||||
elif isinstance(inner, type) and issubclass(inner, BaseModel):
|
||||
# `<Model>` or `Optional[<Model>]` — emit the model under the
|
||||
# canonical name (rename if necessary).
|
||||
output_named = _name_output_model(fn_class, inner)
|
||||
visit_model(output_named)
|
||||
# If the Optional wrapper differs from the bare model, emit an
|
||||
# alias under the canonical output name too.
|
||||
if output_named.__name__ != output_name:
|
||||
seen.setdefault(output_name, _AliasShape(output_cls))
|
||||
else:
|
||||
# Primitive-wrapped output (`result: int`) — emit as alias.
|
||||
seen.setdefault(output_name, _AliasShape(output_cls))
|
||||
|
||||
return seen
|
||||
|
||||
|
||||
def _nested_models(annotation: Any) -> list[type[BaseModel]]:
|
||||
"""All Pydantic models that appear anywhere inside `annotation`."""
|
||||
out: list[type[BaseModel]] = []
|
||||
inner, _ = extract_optional(annotation)
|
||||
elem = extract_list_element(inner)
|
||||
if elem is not None:
|
||||
out.extend(_nested_models(elem))
|
||||
return out
|
||||
if isinstance(inner, type) and issubclass(inner, BaseModel):
|
||||
out.append(inner)
|
||||
return out
|
||||
|
||||
|
||||
def _has_input(input_cls: Any) -> bool:
|
||||
return (
|
||||
input_cls is not None
|
||||
and input_cls is not BaseModel
|
||||
and hasattr(input_cls, "model_fields")
|
||||
and bool(input_cls.model_fields)
|
||||
)
|
||||
|
||||
|
||||
def _snake_to_camel(name: str) -> str:
|
||||
parts = name.replace(".", "_").replace("-", "_").split("_")
|
||||
return parts[0] + "".join(p.title() for p in parts[1:] if p)
|
||||
|
||||
|
||||
def _name_input_model(fn_class: Any) -> type[BaseModel]:
|
||||
"""Return a copy of the function's Input model named `<CamelName>Input`."""
|
||||
from pydantic import create_model
|
||||
|
||||
camel = _snake_to_camel(fn_class.name)
|
||||
canonical = f"{camel}Input"
|
||||
src = fn_class.Input
|
||||
if src.__name__ == canonical:
|
||||
return src
|
||||
# Re-derive under the canonical name so codegen consumers see a stable name.
|
||||
return create_model(canonical, __base__=src)
|
||||
|
||||
|
||||
def _name_output_model(fn_class: Any, base: type[BaseModel]) -> type[BaseModel]:
|
||||
"""Return a copy of the model named `<CamelName>Output`."""
|
||||
from pydantic import create_model
|
||||
|
||||
camel = _snake_to_camel(fn_class.name)
|
||||
canonical = f"{camel}Output"
|
||||
if base.__name__ == canonical:
|
||||
return base
|
||||
return create_model(canonical, __base__=base)
|
||||
|
||||
|
||||
# ─── Function / context / channel emission ──────────────────────────────────
|
||||
|
||||
|
||||
def _function_props(fn_class: Any, output_type_name: str, output_nullable: bool) -> dict[str, Any]:
|
||||
"""Collect every value that goes inside a `function` block."""
|
||||
meta = getattr(fn_class, "_meta", {})
|
||||
name = fn_class.name
|
||||
camel = _snake_to_camel(name)
|
||||
input_cls = getattr(fn_class, "Input", None)
|
||||
has_input = _has_input(input_cls)
|
||||
is_context = meta.get("context")
|
||||
is_form = meta.get("form", False)
|
||||
|
||||
return {
|
||||
"name": name,
|
||||
"camel": camel,
|
||||
"has_input": has_input,
|
||||
"input_type": f"{camel}Input" if has_input else None,
|
||||
"output_type": output_type_name,
|
||||
"output_nullable": output_nullable,
|
||||
"transport": "websocket" if meta.get("websocket") else "http",
|
||||
"context": is_context if isinstance(is_context, str) else None,
|
||||
"affects": [a["name"] for a in meta.get("affects") or [] if a.get("type") == "context"],
|
||||
"merge": list(meta.get("merge") or []),
|
||||
"is_form": bool(is_form),
|
||||
"form_name": meta.get("form_name"),
|
||||
"form_role": meta.get("form_role"),
|
||||
}
|
||||
|
||||
|
||||
def _resolve_output(fn_class: Any) -> tuple[str, bool]:
|
||||
"""Return `(output_type_name, output_nullable)` for an emitted function block."""
|
||||
camel = _snake_to_camel(fn_class.name)
|
||||
canonical = f"{camel}Output"
|
||||
output_cls = getattr(fn_class, "Output", None)
|
||||
if output_cls is None:
|
||||
return canonical, False
|
||||
_, nullable = extract_optional(output_cls)
|
||||
return canonical, nullable
|
||||
|
||||
|
||||
def _collect_channels() -> list[dict[str, Any]]:
|
||||
"""Pull channel registrations from the optional `channels` registry extension."""
|
||||
from mizan_core.registry import _extensions # type: ignore[attr-defined]
|
||||
|
||||
ext = _extensions.get("channels")
|
||||
if ext is None:
|
||||
return []
|
||||
schema = ext.schema()
|
||||
return list(schema or [])
|
||||
|
||||
|
||||
# ─── Top-level builder ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def build_ir() -> str:
|
||||
"""Build the Mizan IR for every registered function. Returns KDL source."""
|
||||
functions = get_all_functions()
|
||||
context_groups = get_context_groups()
|
||||
channels = _collect_channels()
|
||||
|
||||
named_types = _collect_named_types(functions)
|
||||
|
||||
lines: list[str] = []
|
||||
root = _Block(lines, indent=0)
|
||||
|
||||
# ── Type definitions ──
|
||||
for type_name in sorted(named_types):
|
||||
shape = named_types[type_name]
|
||||
with root.node("type", _kdl_string(type_name)) as type_block:
|
||||
if isinstance(shape, _StructShape):
|
||||
_emit_struct_type(type_block, shape.model, named_types)
|
||||
elif isinstance(shape, _AliasShape):
|
||||
_emit_alias_type(type_block, shape.annotation, named_types)
|
||||
else:
|
||||
raise TypeError(f"unknown named-type shape: {type(shape).__name__}")
|
||||
|
||||
if named_types:
|
||||
lines.append("")
|
||||
|
||||
# ── Functions ──
|
||||
for fn_name, fn_class in functions.items():
|
||||
meta = getattr(fn_class, "_meta", {})
|
||||
if meta.get("private") or meta.get("view_path"):
|
||||
continue
|
||||
output_type_name, output_nullable = _resolve_output(fn_class)
|
||||
props = _function_props(fn_class, output_type_name, output_nullable)
|
||||
_emit_function(root, props)
|
||||
|
||||
if functions:
|
||||
lines.append("")
|
||||
|
||||
# ── Contexts ──
|
||||
for ctx_name, fn_names in context_groups.items():
|
||||
_emit_context(root, ctx_name, fn_names)
|
||||
|
||||
if context_groups:
|
||||
lines.append("")
|
||||
|
||||
# ── Channels ──
|
||||
for channel in channels:
|
||||
_emit_channel(root, channel)
|
||||
|
||||
# Trim trailing blanks then add a single terminating newline.
|
||||
while lines and not lines[-1]:
|
||||
lines.pop()
|
||||
return "\n".join(lines) + "\n"
|
||||
|
||||
|
||||
def _emit_function(root: _Block, props: dict[str, Any]) -> None:
|
||||
with root.node("function", _kdl_string(props["name"])) as block:
|
||||
block.leaf("camel", _kdl_string(props["camel"]))
|
||||
block.leaf("has-input", _kdl_bool(props["has_input"]))
|
||||
if props["input_type"]:
|
||||
block.leaf("input", _kdl_string(props["input_type"]))
|
||||
block.leaf("output", _kdl_string(props["output_type"]))
|
||||
if props["output_nullable"]:
|
||||
block.leaf("output-nullable", _kdl_bool(True))
|
||||
block.leaf("transport", _kdl_string(props["transport"]))
|
||||
if props["context"]:
|
||||
block.leaf("context", _kdl_string(props["context"]))
|
||||
for affect_name in props["affects"]:
|
||||
block.leaf("affects", _kdl_string(affect_name))
|
||||
for merge_name in props["merge"]:
|
||||
block.leaf("merge", _kdl_string(merge_name))
|
||||
if props["is_form"]:
|
||||
block.leaf("is-form", _kdl_bool(True))
|
||||
if props["form_name"]:
|
||||
block.leaf("form-name", _kdl_string(props["form_name"]))
|
||||
if props["form_role"]:
|
||||
block.leaf("form-role", _kdl_string(props["form_role"]))
|
||||
|
||||
|
||||
def _emit_context(root: _Block, ctx_name: str, fn_names: list[str]) -> None:
|
||||
# First pass: collect param info across every function in the context.
|
||||
param_info: dict[str, dict[str, Any]] = {}
|
||||
for fn_name in fn_names:
|
||||
fn_class = get_function(fn_name)
|
||||
if fn_class is None:
|
||||
continue
|
||||
input_cls = getattr(fn_class, "Input", None)
|
||||
if not _has_input(input_cls):
|
||||
continue
|
||||
for param_name, field_info in input_cls.model_fields.items():
|
||||
slot = param_info.setdefault(param_name, {"type": None, "shared_by": []})
|
||||
slot["type"] = _annotation_to_primitive(field_info.annotation)
|
||||
slot["shared_by"].append(fn_name)
|
||||
|
||||
# A param is required iff every function in the context declares it.
|
||||
for slot in param_info.values():
|
||||
slot["required"] = len(slot["shared_by"]) == len(fn_names)
|
||||
|
||||
with root.node("context", _kdl_string(ctx_name)) as block:
|
||||
for fn_name in fn_names:
|
||||
block.leaf("function", _kdl_string(fn_name))
|
||||
for param_name in sorted(param_info):
|
||||
slot = param_info[param_name]
|
||||
with block.node("param", _kdl_string(param_name)) as param_block:
|
||||
param_block.leaf("type", _kdl_string(slot["type"]))
|
||||
param_block.leaf("required", _kdl_bool(slot["required"]))
|
||||
for sharer in slot["shared_by"]:
|
||||
param_block.leaf("shared-by", _kdl_string(sharer))
|
||||
|
||||
|
||||
def _annotation_to_primitive(annotation: Any) -> str:
|
||||
inner, _ = extract_optional(annotation)
|
||||
if inner is int:
|
||||
return "integer"
|
||||
if inner is float:
|
||||
return "number"
|
||||
if inner is bool:
|
||||
return "boolean"
|
||||
return "string"
|
||||
|
||||
|
||||
def _emit_channel(root: _Block, channel: dict[str, Any]) -> None:
|
||||
name = channel["name"]
|
||||
with root.node("channel", _kdl_string(name)) as block:
|
||||
block.leaf("pascal-name", _kdl_string(channel["pascalName"]))
|
||||
if channel.get("hasParams") and channel.get("paramsType"):
|
||||
block.leaf("params", _kdl_string(channel["paramsType"]))
|
||||
if channel.get("hasReactMessage") and channel.get("reactMessageType"):
|
||||
block.leaf("react-message", _kdl_string(channel["reactMessageType"]))
|
||||
if channel.get("hasDjangoMessage") and channel.get("djangoMessageType"):
|
||||
block.leaf("django-message", _kdl_string(channel["djangoMessageType"]))
|
||||
Reference in New Issue
Block a user