Fix critical cache issues: user_id scoping, AND purge, error handling, TTL

Fixes from 8-expert review:
- Pass user_id from request.user.pk to cache key derivation (data leak fix)
- Scoped purge uses AND (intersection) not OR (union) semantics
- All cache ops in executor wrapped in try/except with logging fallthrough
- Thread-safe cache initialization with threading.Lock
- RedisCache: 24h safety-net TTL, connection timeouts, MULTI/EXEC pipeline
- RedisCache.clear() uses pipelined UNLINK instead of per-batch DELETE
- build_index_keys now stringifies values matching derive_cache_key
- get_cache() logs warnings for partial config and connection failures
- Wire-protocol internals removed from __all__

Remaining open: purge atomicity (Lua script), cross-language str() canon,
broad purge sub-index cleanup, thundering herd protection, RedisCache tests.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-06 22:50:05 -04:00
parent b2f990b4e5
commit b06a65e133
5 changed files with 140 additions and 97 deletions

View File

@@ -1,88 +1,65 @@
# Cache Module — Known Issues # Cache Module — Known Issues
Issues identified by 8-domain-expert review of the initial implementation. Issues identified by 8-domain-expert review. Status tracked here.
Fix in priority order before shipping.
## Critical (Security / Data Corruption) ## Critical (Security / Data Corruption)
### 1. User-scoped content cached without user_id ### 1. ~~User-scoped content cached without user_id~~ FIXED
`context_fetch_view` never passes `user_id` to `cache_get`/`cache_put`. `context_fetch_view` now extracts `user_id` from `request.user.pk` and
Authenticated User A's response gets cached and served to User B. passes it to `cache_get`/`cache_put`.
**Fix:** Extract user_id from request (MWT `sub` claim when available,
`request.user.pk` as interim) and pass to cache operations.
### 2. Purge race condition (non-atomic index operations) ### 2. Purge race condition (non-atomic index operations)
`cache_purge` does `get_index` -> `delete_index` -> `delete_many` as `cache_purge` does index reads and deletes as separate operations.
separate operations. Concurrent `cache_put` between steps can orphan Concurrent `cache_put` between steps can orphan entries.
entries or lose purges. **Status:** Partially mitigated by AND semantics fix. Full atomicity
**Fix:** Use Redis Lua script or WATCH/MULTI for atomic purge. (Lua script or WATCH/MULTI) still needed for Redis backend.
MemoryCache should use a threading lock.
### 3. No Redis error handling ### 3. ~~No Redis error handling~~ FIXED
Any Redis failure throws `ConnectionError` into the request path -> 500. All cache operations in `executor.py` wrapped in try/except with
No try/except, no fallback, no circuit breaker. `logger.warning`. Redis failure falls through to uncached execution.
**Fix:** Wrap all Redis calls in try/except. On failure, fall through
to uncached execution (cache miss behavior).
### 4. Scoped purge uses OR semantics, should use AND ### 4. ~~Scoped purge uses OR semantics~~ FIXED
`cache_purge({user_id: 5, org_id: 3})` deletes everything with Changed to AND (intersection). `{user_id: 5, org_id: 3}` now only
`user_id=5` OR `org_id=3`. Should intersect index lookups. deletes entries matching BOTH params.
**Fix:** Change `keys_to_delete.update()` to set intersection.
## High (Correctness / Operability) ## High (Correctness / Operability)
### 5. No TTL on Redis entries ### 5. ~~No TTL on Redis entries~~ FIXED
If purge fails or `affects` is misconfigured, stale data persists forever. `RedisCache.put` now sets `ex=86400` (24h safety-net TTL) by default.
**Fix:** Add safety-net TTL to `RedisCache.put` (e.g., 24h default).
### 6. Cross-language str() vs String() divergence ### 6. Cross-language str() vs String() divergence
Python `str(True)` -> `"True"`, JS `String(true)` -> `"true"`. Python `str(True)` -> `"True"`, JS `String(true)` -> `"true"`.
Python `str(None)` -> `"None"`, JS `String(null)` -> `"null"`. **Status:** Open. Needs canonical stringification rules in protocol spec.
Cache keys will mismatch between Python and TypeScript adapters.
**Fix:** Define canonical stringification rules in the protocol spec.
Normalize booleans to "true"/"false", null to "null", numbers to
consistent format before stringification.
### 7. Broad purge doesn't clean per-param sub-indexes ### 7. Broad purge doesn't clean per-param sub-indexes
After broad purge of `mizan:idx:user`, the per-param indexes **Status:** Open. Slow memory leak in Redis.
(`mizan:idx:user:user_id=5`) remain as dangling sets. Slow memory leak.
**Fix:** On broad purge, also scan and delete `mizan:idx:{context}:*` indexes.
### 8. build_index_keys doesn't stringify values ### 8. ~~build_index_keys doesn't stringify values~~ FIXED
`derive_cache_key` calls `str(v)` but `build_index_keys` uses raw `v`. Now calls `str(v)` on all values, matching `derive_cache_key`.
Latent inconsistency for non-string types.
**Fix:** Stringify values in `build_index_keys` too.
### 9. Silent exception swallowing in get_cache() ### 9. ~~Silent exception swallowing in get_cache()~~ FIXED
Misconfigured Redis URL or missing secret produces no log, no warning. Now logs warnings for partial config and connection failures.
**Fix:** Log warning on partial config and connection failure.
### 10. _initialized flag not thread-safe ### 10. ~~_initialized flag not thread-safe~~ FIXED
Two concurrent workers calling `get_cache()` race on globals. Now uses `threading.Lock` for thread-safe initialization.
**Fix:** Use `threading.Lock` or resolve at `AppConfig.ready()`.
## Medium (Design / Performance) ## Medium (Design / Performance)
### 11. No thundering-herd protection ### 11. No thundering-herd protection
Concurrent cold misses all execute and write. Origin cache should **Status:** Open. Concurrent cold misses all execute and write.
deduplicate in-flight requests (request coalescing).
### 12. Wire-protocol internals in __all__ ### 12. ~~Wire-protocol internals in __all__~~ FIXED
`derive_cache_key` and `build_index_keys` are promoted to public API. `derive_cache_key` and `build_index_keys` removed from `__all__`.
Changing key format requires semver major bump.
**Fix:** Remove from `__all__`, prefix with `_` or move to internal module.
### 13. Inconsistent API pattern ### 13. Inconsistent API pattern
`cache_get`/`cache_put` take explicit `secret`+`backend` args but **Status:** Open. `cache_get`/`cache_put` take explicit args but executor
executor fetches these from globals. Pick one pattern. fetches from globals.
### 14. clear() uses SCAN + DELETE without pipeline ### 14. ~~clear() uses SCAN + DELETE without pipeline~~ FIXED
O(N) round trips for large caches. Now uses pipeline with UNLINK for batched async deletes.
**Fix:** Pipeline the deletes.
### 15. No Redis connection timeouts ### 15. ~~No Redis connection timeouts~~ FIXED
`from_url()` has no `socket_connect_timeout`, `socket_timeout`, or `socket_connect_timeout=5`, `socket_timeout=5`, `health_check_interval=30`.
`health_check_interval`.
### 16. No RedisCache test coverage ### 16. No RedisCache test coverage
Only MemoryCache is tested. Use `fakeredis` for RedisCache tests. **Status:** Open. Only MemoryCache is tested.

View File

@@ -20,13 +20,18 @@ Configuration (Django settings):
from __future__ import annotations from __future__ import annotations
import logging
import threading
from typing import Any from typing import Any
from .backend import CacheBackend, MemoryCache, RedisCache from .backend import CacheBackend, MemoryCache, RedisCache
from .keys import derive_cache_key, build_index_keys from .keys import derive_cache_key, build_index_keys
logger = logging.getLogger("mizan.cache")
_cache_instance: CacheBackend | None = None _cache_instance: CacheBackend | None = None
_initialized = False _initialized = False
_init_lock = threading.Lock()
def get_cache() -> CacheBackend | None: def get_cache() -> CacheBackend | None:
@@ -35,20 +40,37 @@ def get_cache() -> CacheBackend | None:
Returns RedisCache if MIZAN_CACHE_SECRET and MIZAN_CACHE_REDIS_URL are Returns RedisCache if MIZAN_CACHE_SECRET and MIZAN_CACHE_REDIS_URL are
both set. Returns None otherwise. The instance is cached for the process both set. Returns None otherwise. The instance is cached for the process
lifetime. lifetime. Thread-safe.
""" """
global _cache_instance, _initialized global _cache_instance, _initialized
if _initialized: if _initialized:
return _cache_instance return _cache_instance
_initialized = True with _init_lock:
try: if _initialized:
from mizan.setup.settings import get_settings return _cache_instance
settings = get_settings()
if settings.cache_secret and settings.cache_redis_url: _initialized = True
_cache_instance = RedisCache(settings.cache_redis_url) try:
except Exception: from mizan.setup.settings import get_settings
_cache_instance = None settings = get_settings()
if settings.cache_secret and settings.cache_redis_url:
_cache_instance = RedisCache(settings.cache_redis_url)
logger.info("Mizan cache enabled (Redis: %s)", settings.cache_redis_url)
elif settings.cache_secret and not settings.cache_redis_url:
logger.warning(
"MIZAN_CACHE_SECRET is set but MIZAN_CACHE_REDIS_URL is missing. "
"Cache is disabled."
)
elif settings.cache_redis_url and not settings.cache_secret:
logger.warning(
"MIZAN_CACHE_REDIS_URL is set but MIZAN_CACHE_SECRET is missing. "
"Cache is disabled."
)
except Exception:
logger.warning("Failed to initialize Mizan cache", exc_info=True)
_cache_instance = None
return _cache_instance return _cache_instance
@@ -120,14 +142,26 @@ def cache_purge(
Returns the number of entries purged. Returns the number of entries purged.
""" """
if params: if params:
# Scoped purge — find entries matching each param via index # Scoped purge — intersect index lookups (AND semantics)
keys_to_delete: set[str] = set() # Entry must match ALL params, not just any one.
sets_per_param: list[set[str]] = []
param_index_keys: list[str] = []
for k, v in sorted(params.items()): for k, v in sorted(params.items()):
index_key = f"mizan:idx:{context}:{k}={v}" index_key = f"mizan:idx:{context}:{k}={str(v)}"
keys_to_delete.update(backend.get_index(index_key)) param_index_keys.append(index_key)
backend.delete_index(index_key) sets_per_param.append(backend.get_index(index_key))
if sets_per_param:
keys_to_delete = sets_per_param[0]
for s in sets_per_param[1:]:
keys_to_delete = keys_to_delete & s
else:
keys_to_delete = set()
if keys_to_delete: if keys_to_delete:
# Clean up per-param indexes
for idx_key in param_index_keys:
backend.remove_from_index(idx_key, keys_to_delete)
# Remove from the broad context index too # Remove from the broad context index too
backend.remove_from_index(f"mizan:idx:{context}", keys_to_delete) backend.remove_from_index(f"mizan:idx:{context}", keys_to_delete)
return backend.delete_many(list(keys_to_delete)) return backend.delete_many(list(keys_to_delete))
@@ -153,6 +187,4 @@ __all__ = [
"cache_get", "cache_get",
"cache_put", "cache_put",
"cache_purge", "cache_purge",
"derive_cache_key",
"build_index_keys",
] ]

View File

@@ -78,7 +78,15 @@ class RedisCache:
Requires `redis-py` (pip install mizan[cache]). Requires `redis-py` (pip install mizan[cache]).
""" """
def __init__(self, redis_url: str, prefix: str = "mizan:cache:") -> None: # Safety-net TTL: entries expire even if purge fails (24 hours)
DEFAULT_TTL = 86400
def __init__(
self,
redis_url: str,
prefix: str = "mizan:cache:",
ttl: int | None = None,
) -> None:
try: try:
import redis import redis
except ImportError: except ImportError:
@@ -86,8 +94,14 @@ class RedisCache:
"Redis is required for Mizan's cache backend. " "Redis is required for Mizan's cache backend. "
"Install it with: pip install mizan[cache]" "Install it with: pip install mizan[cache]"
) )
self._client = redis.from_url(redis_url) self._client = redis.from_url(
redis_url,
socket_connect_timeout=5,
socket_timeout=5,
health_check_interval=30,
)
self._prefix = prefix self._prefix = prefix
self._ttl = ttl if ttl is not None else self.DEFAULT_TTL
def _key(self, key: str) -> str: def _key(self, key: str) -> str:
return f"{self._prefix}{key}" return f"{self._prefix}{key}"
@@ -98,8 +112,8 @@ class RedisCache:
def put(self, key: str, value: bytes, indexes: list[str]) -> None: def put(self, key: str, value: bytes, indexes: list[str]) -> None:
prefixed_key = self._key(key) prefixed_key = self._key(key)
pipe = self._client.pipeline() pipe = self._client.pipeline(transaction=True)
pipe.set(prefixed_key, value) pipe.set(prefixed_key, value, ex=self._ttl)
for idx in indexes: for idx in indexes:
pipe.sadd(self._key(idx), key) pipe.sadd(self._key(idx), key)
pipe.execute() pipe.execute()
@@ -127,6 +141,9 @@ class RedisCache:
while True: while True:
cursor, keys = self._client.scan(cursor, match=pattern, count=100) cursor, keys = self._client.scan(cursor, match=pattern, count=100)
if keys: if keys:
self._client.delete(*keys) pipe = self._client.pipeline()
for key in keys:
pipe.unlink(key)
pipe.execute()
if cursor == 0: if cursor == 0:
break break

View File

@@ -75,5 +75,5 @@ def build_index_keys(context: str, params: dict[str, Any]) -> list[str]:
""" """
keys = [f"mizan:idx:{context}"] keys = [f"mizan:idx:{context}"]
for k, v in sorted(params.items()): for k, v in sorted(params.items()):
keys.append(f"mizan:idx:{context}:{k}={v}") keys.append(f"mizan:idx:{context}:{k}={str(v)}")
return keys return keys

View File

@@ -662,14 +662,19 @@ def function_call_view(request: HttpRequest) -> JsonResponse:
response["X-Mizan-Invalidate"] = _format_invalidate_header(invalidate_contexts) response["X-Mizan-Invalidate"] = _format_invalidate_header(invalidate_contexts)
# Purge origin-side cache for invalidated contexts # Purge origin-side cache for invalidated contexts
import logging
from mizan.cache import get_cache, cache_purge from mizan.cache import get_cache, cache_purge
_cache_log = logging.getLogger("mizan.cache")
cache = get_cache() cache = get_cache()
if cache is not None: if cache is not None:
for entry in invalidate_contexts: try:
if isinstance(entry, str): for entry in invalidate_contexts:
cache_purge(cache, entry) if isinstance(entry, str):
elif isinstance(entry, dict): cache_purge(cache, entry)
cache_purge(cache, entry["context"], entry.get("params")) elif isinstance(entry, dict):
cache_purge(cache, entry["context"], entry.get("params"))
except Exception:
_cache_log.warning("Cache purge failed", exc_info=True)
return response return response
@@ -771,19 +776,28 @@ def context_fetch_view(request: HttpRequest, context_name: str) -> JsonResponse:
params = request.GET.dict() params = request.GET.dict()
# Origin-side cache lookup # Origin-side cache lookup
import logging
from mizan.cache import get_cache, cache_get, cache_put from mizan.cache import get_cache, cache_get, cache_put
from mizan.setup.settings import get_settings from mizan.setup.settings import get_settings
_cache_log = logging.getLogger("mizan.cache")
cache = get_cache() cache = get_cache()
cache_settings = get_settings() cache_settings = get_settings()
user_id = None
if hasattr(request, "user") and hasattr(request.user, "pk") and request.user.pk:
user_id = str(request.user.pk)
if cache is not None and cache_settings.cache_secret: if cache is not None and cache_settings.cache_secret:
cached = cache_get( try:
cache_settings.cache_secret, cache, context_name, params, cached = cache_get(
) cache_settings.cache_secret, cache, context_name, params,
if cached is not None: user_id=user_id,
response = HttpResponse(cached, content_type="application/json") )
response["Cache-Control"] = "public, max-age=0, s-maxage=31536000" if cached is not None:
response["X-Mizan-Cache"] = "HIT" response = HttpResponse(cached, content_type="application/json")
return response response["Cache-Control"] = "public, max-age=0, s-maxage=31536000"
response["X-Mizan-Cache"] = "HIT"
return response
except Exception:
_cache_log.warning("Cache lookup failed, falling through", exc_info=True)
result = execute_context(request, context_name, params) result = execute_context(request, context_name, params)
@@ -813,10 +827,13 @@ def context_fetch_view(request: HttpRequest, context_name: str) -> JsonResponse:
# Store in origin-side cache # Store in origin-side cache
if cache is not None and cache_settings.cache_secret: if cache is not None and cache_settings.cache_secret:
cache_put( try:
cache_settings.cache_secret, cache, context_name, params, cache_put(
response.content, cache_settings.cache_secret, cache, context_name, params,
) response.content, user_id=user_id,
response["X-Mizan-Cache"] = "MISS" )
response["X-Mizan-Cache"] = "MISS"
except Exception:
_cache_log.warning("Cache store failed", exc_info=True)
return response return response