Add Redis backend tests against real Redis instance

13 tests hitting Redis on localhost:6399 (docker run redis:alpine):
- get/put/delete, index tracking, remove_from_index, delete_by_prefix
- TTL verification on cache entries AND index sets
- Pipeline atomicity (value + indexes written together)
- Scoped purge (AND semantics) against real Redis
- Broad purge with sub-index cleanup
- Tests skip gracefully if Redis is not available

No mocks, no fakes. Real Redis or skip.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-07 01:10:35 -04:00
parent 4744ff052e
commit dbbb269696

View File

@@ -3470,3 +3470,169 @@ class MWTAuthIntegrationTests(TestCase):
HTTP_AUTHORIZATION=f"Bearer {tokens.access_token}",
)
self.assertEqual(response.status_code, 200)
# ── Redis backend tests (requires running Redis on port 6399) ──────────
import os
REDIS_URL = os.environ.get("MIZAN_TEST_REDIS_URL", "redis://localhost:6399/0")
def _redis_available() -> bool:
"""Check if a test Redis instance is reachable."""
try:
import redis
client = redis.from_url(REDIS_URL, socket_connect_timeout=1)
client.ping()
return True
except Exception:
return False
_SKIP_REDIS = not _redis_available()
_SKIP_MSG = f"Redis not available at {REDIS_URL}"
class RedisCacheBackendTests(TestCase):
"""Tests for RedisCache against a real Redis instance."""
SECRET = "test-cache-secret-for-redis-32b!"
def setUp(self):
if _SKIP_REDIS:
self.skipTest(_SKIP_MSG)
from mizan.cache.backend import RedisCache
self.cache = RedisCache(REDIS_URL, prefix="mizan:test:")
self.cache.clear()
def tearDown(self):
if not _SKIP_REDIS:
self.cache.clear()
def test_get_miss(self):
"""Empty cache returns None."""
self.assertIsNone(self.cache.get("nonexistent"))
def test_put_then_get(self):
"""Store and retrieve a value."""
self.cache.put("key1", b'{"data": true}', ["mizan:idx:ctx"])
result = self.cache.get("key1")
self.assertEqual(result, b'{"data": true}')
def test_index_tracking(self):
"""Put adds the key to specified indexes."""
self.cache.put("key1", b"v1", ["mizan:idx:user", "mizan:idx:user:user_id=5"])
self.assertIn("key1", self.cache.get_index("mizan:idx:user"))
self.assertIn("key1", self.cache.get_index("mizan:idx:user:user_id=5"))
def test_delete_many(self):
"""Delete multiple keys at once."""
self.cache.put("k1", b"v1", [])
self.cache.put("k2", b"v2", [])
count = self.cache.delete_many(["k1", "k2"])
self.assertEqual(count, 2)
self.assertIsNone(self.cache.get("k1"))
self.assertIsNone(self.cache.get("k2"))
def test_remove_from_index(self):
"""Remove specific members from an index."""
self.cache.put("k1", b"v1", ["mizan:idx:ctx"])
self.cache.put("k2", b"v2", ["mizan:idx:ctx"])
self.cache.remove_from_index("mizan:idx:ctx", {"k1"})
idx = self.cache.get_index("mizan:idx:ctx")
self.assertNotIn("k1", idx)
self.assertIn("k2", idx)
def test_delete_indexes_by_prefix(self):
"""Delete all indexes matching a prefix."""
self.cache.put("k1", b"v1", ["mizan:idx:user:user_id=5"])
self.cache.put("k2", b"v2", ["mizan:idx:user:user_id=6"])
self.cache.delete_indexes_by_prefix("mizan:idx:user:")
self.assertEqual(self.cache.get_index("mizan:idx:user:user_id=5"), set())
self.assertEqual(self.cache.get_index("mizan:idx:user:user_id=6"), set())
def test_ttl_is_set(self):
"""Put sets a TTL on the cache key."""
import redis
self.cache.put("ttl_key", b"value", [])
client = redis.from_url(REDIS_URL)
ttl = client.ttl(f"mizan:test:ttl_key")
self.assertGreater(ttl, 0)
self.assertLessEqual(ttl, self.cache._ttl)
def test_index_ttl_is_set(self):
"""Put sets a TTL on the index keys too."""
import redis
self.cache.put("k1", b"v1", ["mizan:idx:ctx"])
client = redis.from_url(REDIS_URL)
ttl = client.ttl(f"mizan:test:mizan:idx:ctx")
self.assertGreater(ttl, 0)
def test_clear(self):
"""Clear removes all mizan keys."""
self.cache.put("k1", b"v1", ["mizan:idx:ctx"])
self.cache.put("k2", b"v2", ["mizan:idx:ctx2"])
self.cache.clear()
self.assertIsNone(self.cache.get("k1"))
self.assertIsNone(self.cache.get("k2"))
self.assertEqual(self.cache.get_index("mizan:idx:ctx"), set())
def test_pipeline_atomicity(self):
"""Put writes value and indexes in a single pipeline."""
self.cache.put("atomic_key", b"atomic_val", [
"mizan:idx:ctx",
"mizan:idx:ctx:id=1",
])
# All should be present (no partial writes)
self.assertEqual(self.cache.get("atomic_key"), b"atomic_val")
self.assertIn("atomic_key", self.cache.get_index("mizan:idx:ctx"))
self.assertIn("atomic_key", self.cache.get_index("mizan:idx:ctx:id=1"))
class RedisCachePurgeTests(TestCase):
"""Tests for cache_purge against real Redis."""
SECRET = "test-cache-secret-for-redis-32b!"
def setUp(self):
if _SKIP_REDIS:
self.skipTest(_SKIP_MSG)
from mizan.cache.backend import RedisCache
from mizan.cache import cache_put
self.cache = RedisCache(REDIS_URL, prefix="mizan:test:")
self.cache.clear()
cache_put(self.SECRET, self.cache, "user", {"user_id": "5"}, b'{"u5":true}')
cache_put(self.SECRET, self.cache, "user", {"user_id": "6"}, b'{"u6":true}')
def tearDown(self):
if not _SKIP_REDIS:
self.cache.clear()
def test_scoped_purge(self):
"""Scoped purge removes only matching entries."""
from mizan.cache import cache_purge, cache_get
count = cache_purge(self.cache, "user", {"user_id": "5"})
self.assertEqual(count, 1)
self.assertIsNone(cache_get(self.SECRET, self.cache, "user", {"user_id": "5"}))
self.assertIsNotNone(cache_get(self.SECRET, self.cache, "user", {"user_id": "6"}))
def test_broad_purge(self):
"""Broad purge removes all entries in context."""
from mizan.cache import cache_purge, cache_get
count = cache_purge(self.cache, "user")
self.assertEqual(count, 2)
self.assertIsNone(cache_get(self.SECRET, self.cache, "user", {"user_id": "5"}))
self.assertIsNone(cache_get(self.SECRET, self.cache, "user", {"user_id": "6"}))
def test_broad_purge_cleans_sub_indexes(self):
"""Broad purge removes per-param sub-indexes."""
from mizan.cache import cache_purge
cache_purge(self.cache, "user")
self.assertEqual(self.cache.get_index("mizan:idx:user:user_id=5"), set())
self.assertEqual(self.cache.get_index("mizan:idx:user:user_id=6"), set())