Extract Layer 1 to cores/mizan-python (mizan-core)

Pull cache/keys.py (HMAC cache key derivation) and mwt.py (Mizan Web Token)
out of backends/mizan-django and into a new cores/mizan-python package.
mizan-django re-imports them via the new mizan_core module.

Naming: directory cores/mizan-python/, distribution mizan-core, importable
module mizan_core. mizan-django keeps its existing 'mizan' distribution slot
on PyPI; the two coexist as distinct packages.

Wiring:
- backends/mizan-django/pyproject.toml gains a 'mizan-core' dep with a
  [tool.uv.sources] path entry (editable install from ../../cores/mizan-python).
- Makefile install target prepends 'cd cores/mizan-python && uv pip install -e .'
- 3 import sites in mizan-django updated: cache/__init__.py, jwt/functions.py,
  client/executor.py — all now import from mizan_core.

Test split:
- 3 unit-test classes (CacheKeyDerivationTests, MWTCreationTests,
  PermissionKeyTests) move to cores/mizan-python/tests/, rewritten against
  unittest.TestCase (no Django dep). The cross-language pin test (pinned
  HMAC hex digests against mizan-ts) moves with CacheKeyDerivationTests.
- Integration tests stay in mizan-django (CacheBackendTests, CachePurgeTests,
  CacheIntegrationTests, RevParameterTests, MWTAuthIntegrationTests) — they
  need the Django request flow.

Verified:
- mizan-core: 15/15 pass (incl. cross-language pin)
- mizan-django: 348 pass, 21 skip, 0 fail
- mizan-ts: edge-compat 34/34 pass — protocol invariant holds, the moved
  Python derive_cache_key still produces the exact hex digests TS pins against.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-06 01:32:54 -04:00
parent 9d2781b52c
commit 37e61c646b
14 changed files with 202 additions and 192 deletions

View File

@@ -0,0 +1,26 @@
[project]
name = "mizan-core"
version = "0.1.0"
description = "Mizan Python core — HMAC cache keys, MWT identity. Framework-agnostic primitives shared by every Python backend adapter."
requires-python = ">=3.10"
dependencies = [
"PyJWT>=2.0",
]
[project.optional-dependencies]
dev = [
"pytest>=8.0",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/mizan_core"]
[tool.pytest.ini_options]
pythonpath = ["src"]
testpaths = ["tests"]
python_classes = ["*Tests", "*Test", "Test*"]
python_functions = ["test_*"]

View File

View File

@@ -0,0 +1,59 @@
"""
Cache key derivation — HMAC-SHA256 over JSON-canonical form.
Protocol-critical: every Mizan adapter must produce identical output
for identical inputs. Cross-language conformance verified by pin tests.
Scoped purge recomputes the key directly — no reverse index needed.
Broad purge uses a context prefix scan.
"""
from __future__ import annotations
import hashlib
import hmac
import json
from typing import Any
# Context prefix for broad purge (SCAN pattern)
CONTEXT_KEY_PREFIX = "ctx:"
def derive_cache_key(
secret: str,
context: str,
params: dict[str, Any],
user_id: str | None = None,
rev: int = 0,
) -> str:
"""
Derive a deterministic HMAC-SHA256 cache key.
Returns a prefixed key: "ctx:{context}:{hmac_hex}" so that
broad purge can SCAN by prefix "ctx:{context}:*".
"""
def _normalize(v: Any) -> str:
"""Normalize values for cross-language HMAC consistency.
Python str(True)="True" but JS String(true)="true". Use JSON-native forms."""
if v is True:
return "true"
if v is False:
return "false"
if v is None:
return "null"
return str(v)
sorted_params = {k: _normalize(v) for k, v in sorted(params.items())}
key_data: dict[str, Any] = {"c": context, "p": sorted_params, "r": rev}
if user_id is not None:
key_data["u"] = str(user_id)
message = json.dumps(key_data, sort_keys=True, separators=(",", ":"))
hmac_hex = hmac.new(
secret.encode("utf-8"),
message.encode("utf-8"),
hashlib.sha256,
).hexdigest()
return f"{CONTEXT_KEY_PREFIX}{context}:{hmac_hex}"

View File

@@ -0,0 +1,169 @@
"""
MWT (Mizan Web Token) — Protocol-owned identity layer.
MWT is a standard JWT (RFC 7519, HMAC-SHA256) with Mizan-specific claims,
traveling on the `X-Mizan-Token` header. It provides:
- `sub`: user_id for HMAC cache key derivation
- `pkey`: permission state hash for staleness detection
- `kid`: key ID in the JOSE header (per RFC 7515) for secret rotation
- `aud`: audience binding to prevent cross-tenant replay
- `nbf`: not-before to handle clock skew
MWT is issued from an authenticated Django session. The app handles
authentication (session, social auth, etc.); Mizan issues MWT from
the authenticated identity. Edge Workers and the origin-side cache
validate MWT to extract user identity for cache operations.
Usage:
from mizan.mwt import create_mwt, decode_mwt, MWTUser
Configuration:
MIZAN_MWT_SECRET: MWT signing key (separate from MIZAN_CACHE_SECRET)
MIZAN_MWT_TTL: token lifetime in seconds (default: 300)
"""
from __future__ import annotations
import hashlib
import logging
import time
from dataclasses import dataclass
from typing import Any
import jwt
logger = logging.getLogger("mizan.mwt")
@dataclass
class MWTPayload:
"""Decoded MWT claims."""
sub: str # user_id
staff: bool # is_staff
super: bool # is_superuser
pkey: str # permission state hash (full SHA-256 hex)
kid: str # key ID (from JOSE header)
aud: str # audience
iat: int # issued at
exp: int # expiration
class MWTUser:
"""
Minimal user object created from MWT claims.
Used as request.user for MWT-authenticated requests.
No database query required — all data comes from the token.
"""
def __init__(self, payload: MWTPayload):
self.id = int(payload.sub)
self.pk = self.id
self.is_staff = payload.staff
self.is_superuser = payload.super
self.is_authenticated = True
self.is_anonymous = False
self.is_active = True
self.pkey = payload.pkey
def __str__(self) -> str:
return f"MWTUser(id={self.id})"
def __repr__(self) -> str:
return f"MWTUser(id={self.id}, pkey={self.pkey[:8]}...)"
def compute_permission_key(user: Any) -> str:
"""
Compute a deterministic hash of the user's permission state.
Includes is_staff, is_superuser, and all Django permissions.
When the MWT expires and is refreshed, the new pkey reflects
any permission changes. The short TTL controls the staleness window.
Returns the full 64-character SHA-256 hex digest.
"""
perms = sorted(user.get_all_permissions()) if hasattr(user, "get_all_permissions") else []
staff = "1" if getattr(user, "is_staff", False) else "0"
superuser = "1" if getattr(user, "is_superuser", False) else "0"
blob = f"{staff}:{superuser}:{','.join(perms)}"
return hashlib.sha256(blob.encode("utf-8")).hexdigest()
def create_mwt(
user: Any,
secret: str,
ttl: int = 300,
audience: str = "mizan",
kid: str = "v1",
) -> str:
"""
Create an MWT from an authenticated Django user.
Args:
user: Django user object (must have pk, is_staff, is_superuser).
secret: MIZAN_MWT_SECRET signing key.
ttl: Token lifetime in seconds (default: 300 = 5 minutes).
audience: Audience claim for cross-tenant protection.
kid: Key ID placed in JOSE header (per RFC 7515) for rotation.
Returns:
Encoded JWT string.
"""
now = int(time.time())
payload = {
"sub": str(user.pk),
"staff": getattr(user, "is_staff", False),
"super": getattr(user, "is_superuser", False),
"pkey": compute_permission_key(user),
"aud": audience,
"iat": now,
"nbf": now,
"exp": now + ttl,
}
# kid goes in the JOSE header per RFC 7515, not the payload
headers = {"kid": kid}
return jwt.encode(payload, secret, algorithm="HS256", headers=headers)
def decode_mwt(
token: str,
secret: str,
audience: str = "mizan",
) -> MWTPayload | None:
"""
Decode and validate an MWT.
Returns MWTPayload on success, None on any failure (expired, invalid
signature, wrong audience, not-yet-valid, malformed).
"""
try:
# Decode header first to extract kid
unverified_header = jwt.get_unverified_header(token)
kid = unverified_header.get("kid", "v1")
data = jwt.decode(
token,
secret,
algorithms=["HS256"],
audience=audience,
)
except jwt.PyJWTError:
logger.debug("MWT decode failed", exc_info=True)
return None
try:
return MWTPayload(
sub=data["sub"],
staff=data.get("staff", False),
super=data.get("super", False),
pkey=data.get("pkey", ""),
kid=kid,
aud=audience,
iat=data["iat"],
exp=data["exp"],
)
except (KeyError, TypeError):
logger.debug("MWT payload missing required claims", exc_info=True)
return None

View File

View File

@@ -0,0 +1,68 @@
"""Unit tests for cache key derivation. Includes the cross-language pin against mizan-ts."""
from unittest import TestCase
from mizan_core.cache.keys import derive_cache_key
class CacheKeyDerivationTests(TestCase):
"""Tests that HMAC cache key derivation is deterministic and correct."""
SECRET = "test-cache-secret"
def test_deterministic_output(self):
"""Same inputs always produce the same key."""
key1 = derive_cache_key(self.SECRET, "user", {"user_id": "5"})
key2 = derive_cache_key(self.SECRET, "user", {"user_id": "5"})
self.assertEqual(key1, key2)
self.assertTrue(key1.startswith("ctx:user:"))
self.assertEqual(len(key1), len("ctx:user:") + 64) # prefix + SHA-256 hex
def test_param_order_irrelevant(self):
"""Parameter ordering does not affect the key."""
key1 = derive_cache_key(self.SECRET, "ctx", {"a": "1", "b": "2"})
key2 = derive_cache_key(self.SECRET, "ctx", {"b": "2", "a": "1"})
self.assertEqual(key1, key2)
def test_different_user_ids_different_keys(self):
"""Different user_ids produce different cache keys."""
key1 = derive_cache_key(self.SECRET, "user", {"user_id": "5"}, user_id="5")
key2 = derive_cache_key(self.SECRET, "user", {"user_id": "5"}, user_id="6")
self.assertNotEqual(key1, key2)
def test_rev_changes_key(self):
"""Different rev values produce different cache keys."""
key1 = derive_cache_key(self.SECRET, "user", {"user_id": "5"}, rev=0)
key2 = derive_cache_key(self.SECRET, "user", {"user_id": "5"}, rev=1)
self.assertNotEqual(key1, key2)
def test_no_delimiter_collision(self):
"""JSON-canonical form prevents delimiter-free concatenation collisions."""
# "user" + user_id="12" + params="3" vs "user1" + user_id="2" + params="3"
key1 = derive_cache_key(self.SECRET, "user", {"id": "3"}, user_id="12")
key2 = derive_cache_key(self.SECRET, "user1", {"id": "3"}, user_id="2")
self.assertNotEqual(key1, key2)
def test_public_vs_user_scoped(self):
"""Public (no user_id) and user-scoped produce different keys."""
public = derive_cache_key(self.SECRET, "products", {"id": "1"})
scoped = derive_cache_key(self.SECRET, "products", {"id": "1"}, user_id="5")
self.assertNotEqual(public, scoped)
def test_cross_language_pin(self):
"""Pinned HMAC values — must match TypeScript adapter exactly."""
pin_secret = "test-pin-secret-that-is-32bytes!"
public_key = derive_cache_key(pin_secret, "user", {"user_id": "5"}, rev=0)
self.assertEqual(
public_key,
"ctx:user:605a1ca5ad5994e9b765c8d1b330474c2a0d51a7b8fbbdc402f992da7ba902f6",
)
user_scoped_key = derive_cache_key(
pin_secret, "user", {"user_id": "5"}, user_id="5", rev=0,
)
self.assertEqual(
user_scoped_key,
"ctx:user:30fc08eb46ee4ff2cf7d317e97dca90fd616511e0587304416f71dc863338dc2",
)

View File

@@ -0,0 +1,97 @@
"""Unit tests for MWT creation, decoding, and permission key derivation."""
from unittest import TestCase
from unittest.mock import MagicMock
from mizan_core.mwt import (
MWTUser,
compute_permission_key,
create_mwt,
decode_mwt,
)
def _make_user(**kwargs):
user = MagicMock()
user.pk = kwargs.get("pk", 1)
user.is_staff = kwargs.get("is_staff", False)
user.is_superuser = kwargs.get("is_superuser", False)
user.get_all_permissions = MagicMock(return_value=kwargs.get("perms", set()))
return user
class MWTCreationTests(TestCase):
"""Tests for MWT creation and decoding."""
SECRET = "test-mwt-secret-that-is-32bytes!"
def test_create_and_decode(self):
"""Create an MWT and decode it successfully."""
user = _make_user(pk=42, is_staff=True)
token = create_mwt(user, self.SECRET, ttl=300)
payload = decode_mwt(token, self.SECRET)
self.assertIsNotNone(payload)
self.assertEqual(payload.sub, "42")
self.assertTrue(payload.staff)
self.assertFalse(payload.super)
self.assertEqual(payload.kid, "v1")
self.assertEqual(len(payload.pkey), 64)
def test_decode_expired(self):
"""Expired MWT returns None."""
user = _make_user()
token = create_mwt(user, self.SECRET, ttl=-1)
payload = decode_mwt(token, self.SECRET)
self.assertIsNone(payload)
def test_decode_wrong_secret(self):
"""MWT signed with wrong secret returns None."""
user = _make_user()
token = create_mwt(user, self.SECRET)
payload = decode_mwt(token, "wrong-secret")
self.assertIsNone(payload)
def test_decode_wrong_audience(self):
"""MWT with wrong audience returns None."""
user = _make_user()
token = create_mwt(user, self.SECRET, audience="app1")
payload = decode_mwt(token, self.SECRET, audience="app2")
self.assertIsNone(payload)
def test_mwt_user_has_pkey(self):
"""MWTUser carries the permission key."""
user = _make_user(pk=5, perms={"app.view_thing"})
token = create_mwt(user, self.SECRET)
payload = decode_mwt(token, self.SECRET)
mwt_user = MWTUser(payload)
self.assertEqual(mwt_user.pk, 5)
self.assertTrue(mwt_user.is_authenticated)
self.assertEqual(len(mwt_user.pkey), 64)
class PermissionKeyTests(TestCase):
"""Tests for pkey determinism and sensitivity."""
def test_deterministic(self):
"""Same permissions produce same pkey."""
user = _make_user(perms={"app.view_thing", "app.add_thing"})
pkey1 = compute_permission_key(user)
pkey2 = compute_permission_key(user)
self.assertEqual(pkey1, pkey2)
def test_changes_on_permission_change(self):
"""Different permissions produce different pkey."""
user1 = _make_user(perms={"app.view_thing"})
user2 = _make_user(perms={"app.view_thing", "app.add_thing"})
self.assertNotEqual(compute_permission_key(user1), compute_permission_key(user2))
def test_changes_on_staff_change(self):
"""Staff status change produces different pkey."""
user_normal = _make_user(is_staff=False)
user_staff = _make_user(is_staff=True)
self.assertNotEqual(
compute_permission_key(user_normal),
compute_permission_key(user_staff),
)