Add MWT (Mizan Web Token) — protocol-owned identity layer
MWT is a standard JWT with Mizan-specific claims on X-Mizan-Token header: - sub: user_id for HMAC cache key derivation - pkey: deterministic hash of user's permission state (staff + superuser + perms) - kid: key ID for future secret rotation - aud: audience binding for cross-tenant protection Executor checks X-Mizan-Token first, falls back to Authorization: Bearer for legacy JWT compat. Invalid tokens return 401 (no session fallback). New: mizan/mwt.py (create_mwt, decode_mwt, MWTUser, compute_permission_key) New: mwt_obtain server function for session-to-MWT issuance New: MIZAN_MWT_TTL setting (default 300s = 5 min permission staleness window) 11 new tests covering creation, decode, pkey determinism, auth integration. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -457,6 +457,40 @@ def execute_function(
|
||||
return FunctionResult(data=output.model_dump())
|
||||
|
||||
|
||||
def _try_mwt_auth(request: HttpRequest) -> bool:
|
||||
"""
|
||||
Attempt to authenticate the request using MWT (Mizan Web Token).
|
||||
|
||||
Checks the X-Mizan-Token header. If present and valid, sets request.user
|
||||
to an MWTUser. Returns True on success, False if no MWT header or invalid.
|
||||
"""
|
||||
token = request.META.get("HTTP_X_MIZAN_TOKEN", "")
|
||||
if not token:
|
||||
return False
|
||||
|
||||
try:
|
||||
settings = get_settings()
|
||||
if not settings.cache_secret:
|
||||
return False
|
||||
|
||||
from mizan.mwt import decode_mwt, MWTUser
|
||||
|
||||
payload = decode_mwt(token, settings.cache_secret)
|
||||
if payload is None:
|
||||
return False
|
||||
|
||||
request.user = MWTUser(payload)
|
||||
request._mizan_mwt_authenticated = True
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def _has_mwt_header(request: HttpRequest) -> bool:
|
||||
"""Check if request has an X-Mizan-Token header."""
|
||||
return bool(request.META.get("HTTP_X_MIZAN_TOKEN", ""))
|
||||
|
||||
|
||||
def _try_jwt_auth(request: HttpRequest) -> bool:
|
||||
"""
|
||||
Attempt to authenticate the request using JWT.
|
||||
@@ -504,36 +538,38 @@ def _has_jwt_header(request: HttpRequest) -> bool:
|
||||
|
||||
def _csrf_protect_unless_jwt(view_func):
|
||||
"""
|
||||
Decorator that applies CSRF protection unless JWT auth is used.
|
||||
Decorator that applies CSRF protection unless token auth is used.
|
||||
|
||||
JWT tokens are self-authenticating (the token itself proves the request
|
||||
is legitimate), so CSRF protection is not needed.
|
||||
MWT (X-Mizan-Token) is checked first, then legacy JWT (Authorization: Bearer).
|
||||
Both are self-authenticating, so CSRF protection is not needed.
|
||||
|
||||
Security: If JWT is provided but invalid, reject the request - do NOT
|
||||
fall back to session auth. This prevents attacks where an invalid token
|
||||
is sent alongside a valid session cookie.
|
||||
Security: If a token is provided but invalid, reject the request - do NOT
|
||||
fall back to session auth.
|
||||
"""
|
||||
csrf_protected_view = csrf_protect(view_func)
|
||||
|
||||
@wraps(view_func)
|
||||
def wrapper(request: HttpRequest, *args, **kwargs):
|
||||
# Check if JWT header is present
|
||||
has_jwt = _has_jwt_header(request)
|
||||
|
||||
if has_jwt:
|
||||
# JWT header present - try to authenticate
|
||||
if _try_jwt_auth(request):
|
||||
# JWT valid - skip CSRF, proceed
|
||||
# MWT takes priority
|
||||
if _has_mwt_header(request):
|
||||
if _try_mwt_auth(request):
|
||||
return view_func(request, *args, **kwargs)
|
||||
else:
|
||||
# JWT invalid - reject (do NOT fall back to session)
|
||||
return FunctionError(
|
||||
code=ErrorCode.UNAUTHORIZED,
|
||||
message="Invalid or expired JWT token",
|
||||
).to_response(status=401)
|
||||
else:
|
||||
# No JWT - use session auth with CSRF
|
||||
return csrf_protected_view(request, *args, **kwargs)
|
||||
return FunctionError(
|
||||
code=ErrorCode.UNAUTHORIZED,
|
||||
message="Invalid or expired MWT",
|
||||
).to_response(status=401)
|
||||
|
||||
# Legacy JWT fallback
|
||||
if _has_jwt_header(request):
|
||||
if _try_jwt_auth(request):
|
||||
return view_func(request, *args, **kwargs)
|
||||
return FunctionError(
|
||||
code=ErrorCode.UNAUTHORIZED,
|
||||
message="Invalid or expired JWT token",
|
||||
).to_response(status=401)
|
||||
|
||||
# No token — session auth with CSRF
|
||||
return csrf_protected_view(request, *args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
@@ -732,20 +768,30 @@ def execute_context(
|
||||
|
||||
def _jwt_auth_only(view_func):
|
||||
"""
|
||||
Decorator that handles JWT auth for GET endpoints (no CSRF needed for GET).
|
||||
Decorator that handles token auth for GET endpoints (no CSRF needed for GET).
|
||||
Checks MWT first, then legacy JWT.
|
||||
"""
|
||||
@wraps(view_func)
|
||||
def wrapper(request: HttpRequest, *args, **kwargs):
|
||||
has_jwt = _has_jwt_header(request)
|
||||
if has_jwt:
|
||||
# MWT takes priority
|
||||
if _has_mwt_header(request):
|
||||
if _try_mwt_auth(request):
|
||||
return view_func(request, *args, **kwargs)
|
||||
return FunctionError(
|
||||
code=ErrorCode.UNAUTHORIZED,
|
||||
message="Invalid or expired MWT",
|
||||
).to_response(status=401)
|
||||
|
||||
# Legacy JWT fallback
|
||||
if _has_jwt_header(request):
|
||||
if _try_jwt_auth(request):
|
||||
return view_func(request, *args, **kwargs)
|
||||
else:
|
||||
return FunctionError(
|
||||
code=ErrorCode.UNAUTHORIZED,
|
||||
message="Invalid or expired JWT token",
|
||||
).to_response(status=401)
|
||||
# No JWT — session auth (no CSRF needed for GET)
|
||||
return FunctionError(
|
||||
code=ErrorCode.UNAUTHORIZED,
|
||||
message="Invalid or expired JWT token",
|
||||
).to_response(status=401)
|
||||
|
||||
# No token — session auth (no CSRF needed for GET)
|
||||
return view_func(request, *args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
"""
|
||||
JWT Server Functions
|
||||
JWT & MWT Server Functions
|
||||
|
||||
JWT token operations exposed as mizan server functions.
|
||||
Token operations exposed as mizan server functions.
|
||||
Works over WebSocket RPC (primary) or HTTP fallback.
|
||||
"""
|
||||
|
||||
@@ -10,6 +10,7 @@ from pydantic import BaseModel
|
||||
|
||||
from mizan.client import client
|
||||
from mizan.jwt.tokens import create_token_pair, refresh_tokens
|
||||
from mizan.mwt import create_mwt
|
||||
|
||||
|
||||
class TokenPairOutput(BaseModel):
|
||||
@@ -99,3 +100,42 @@ def jwt_refresh(request: HttpRequest, refresh_token: str) -> TokenPairOutput:
|
||||
refresh_token=tokens.refresh_token,
|
||||
expires_in=tokens.expires_in,
|
||||
)
|
||||
|
||||
|
||||
# ── MWT (Mizan Web Token) ──────────────────────────────────────────────
|
||||
|
||||
|
||||
class MWTOutput(BaseModel):
|
||||
"""MWT token response."""
|
||||
token: str
|
||||
expires_in: int
|
||||
|
||||
|
||||
@client
|
||||
def mwt_obtain(request: HttpRequest) -> MWTOutput:
|
||||
"""
|
||||
Obtain a Mizan Web Token from an authenticated session.
|
||||
|
||||
Requires session authentication (cookie-based login).
|
||||
Returns an MWT for the X-Mizan-Token header — stateless,
|
||||
cache-aware authentication with permission staleness detection.
|
||||
|
||||
Usage (from frontend):
|
||||
const { token, expires_in } = await call('mwt_obtain')
|
||||
// Use token in X-Mizan-Token header
|
||||
"""
|
||||
user = request.user
|
||||
|
||||
if not user.is_authenticated:
|
||||
raise PermissionError("Authentication required")
|
||||
|
||||
from mizan.setup.settings import get_settings
|
||||
settings = get_settings()
|
||||
|
||||
if not settings.cache_secret:
|
||||
raise ValueError(
|
||||
"MIZAN_CACHE_SECRET is not configured. MWT requires a signing secret."
|
||||
)
|
||||
|
||||
token = create_mwt(user, settings.cache_secret, ttl=settings.mwt_ttl)
|
||||
return MWTOutput(token=token, expires_in=settings.mwt_ttl)
|
||||
|
||||
157
packages/mizan-django/src/mizan/mwt.py
Normal file
157
packages/mizan-django/src/mizan/mwt.py
Normal file
@@ -0,0 +1,157 @@
|
||||
"""
|
||||
MWT (Mizan Web Token) — Protocol-owned identity layer.
|
||||
|
||||
MWT is a standard JWT (RFC 7519, HMAC-SHA256) with Mizan-specific claims,
|
||||
traveling on the `X-Mizan-Token` header. It provides:
|
||||
|
||||
- `sub`: user_id for HMAC cache key derivation
|
||||
- `pkey`: permission state hash for staleness detection
|
||||
- `kid`: key ID for secret rotation
|
||||
- `aud`: audience binding to prevent cross-tenant replay
|
||||
|
||||
MWT is issued from an authenticated Django session. The app handles
|
||||
authentication (session, social auth, etc.); Mizan issues MWT from
|
||||
the authenticated identity. Edge Workers and the origin-side cache
|
||||
validate MWT to extract user identity for cache operations.
|
||||
|
||||
Usage:
|
||||
from mizan.mwt import create_mwt, decode_mwt, MWTUser
|
||||
|
||||
Configuration:
|
||||
MIZAN_CACHE_SECRET: signing key (shared with cache key derivation)
|
||||
MIZAN_MWT_TTL: token lifetime in seconds (default: 300)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
import jwt
|
||||
|
||||
|
||||
@dataclass
|
||||
class MWTPayload:
|
||||
"""Decoded MWT claims."""
|
||||
sub: str # user_id
|
||||
staff: bool # is_staff
|
||||
super: bool # is_superuser
|
||||
pkey: str # permission state hash
|
||||
kid: str # key ID
|
||||
aud: str # audience
|
||||
iat: int # issued at
|
||||
exp: int # expiration
|
||||
|
||||
|
||||
class MWTUser:
|
||||
"""
|
||||
Minimal user object created from MWT claims.
|
||||
|
||||
Used as request.user for MWT-authenticated requests.
|
||||
No database query required — all data comes from the token.
|
||||
"""
|
||||
|
||||
def __init__(self, payload: MWTPayload):
|
||||
self.id = int(payload.sub)
|
||||
self.pk = self.id
|
||||
self.is_staff = payload.staff
|
||||
self.is_superuser = payload.super
|
||||
self.is_authenticated = True
|
||||
self.is_anonymous = False
|
||||
self.is_active = True
|
||||
self.pkey = payload.pkey
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"MWTUser(id={self.id})"
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"MWTUser(id={self.id}, pkey={self.pkey[:8]}...)"
|
||||
|
||||
|
||||
def compute_permission_key(user: Any) -> str:
|
||||
"""
|
||||
Compute a deterministic hash of the user's permission state.
|
||||
|
||||
Includes is_staff, is_superuser, and all Django permissions.
|
||||
When the MWT expires and is refreshed, the new pkey reflects
|
||||
any permission changes. The short TTL controls the staleness window.
|
||||
|
||||
Returns a 16-character hex digest (SHA-256 truncated).
|
||||
"""
|
||||
perms = sorted(user.get_all_permissions()) if hasattr(user, "get_all_permissions") else []
|
||||
staff = "1" if getattr(user, "is_staff", False) else "0"
|
||||
superuser = "1" if getattr(user, "is_superuser", False) else "0"
|
||||
blob = f"{staff}:{superuser}:{','.join(perms)}"
|
||||
return hashlib.sha256(blob.encode("utf-8")).hexdigest()[:16]
|
||||
|
||||
|
||||
def create_mwt(
|
||||
user: Any,
|
||||
secret: str,
|
||||
ttl: int = 300,
|
||||
audience: str = "mizan",
|
||||
kid: str = "v1",
|
||||
) -> str:
|
||||
"""
|
||||
Create an MWT from an authenticated Django user.
|
||||
|
||||
Args:
|
||||
user: Django user object (must have pk, is_staff, is_superuser).
|
||||
secret: MIZAN_CACHE_SECRET signing key.
|
||||
ttl: Token lifetime in seconds (default: 300 = 5 minutes).
|
||||
audience: Audience claim for cross-tenant protection.
|
||||
kid: Key ID for future secret rotation.
|
||||
|
||||
Returns:
|
||||
Encoded JWT string.
|
||||
"""
|
||||
now = int(time.time())
|
||||
payload = {
|
||||
"sub": str(user.pk),
|
||||
"staff": getattr(user, "is_staff", False),
|
||||
"super": getattr(user, "is_superuser", False),
|
||||
"pkey": compute_permission_key(user),
|
||||
"aud": audience,
|
||||
"kid": kid,
|
||||
"iat": now,
|
||||
"exp": now + ttl,
|
||||
}
|
||||
return jwt.encode(payload, secret, algorithm="HS256")
|
||||
|
||||
|
||||
def decode_mwt(
|
||||
token: str,
|
||||
secret: str,
|
||||
audience: str = "mizan",
|
||||
) -> MWTPayload | None:
|
||||
"""
|
||||
Decode and validate an MWT.
|
||||
|
||||
Returns MWTPayload on success, None on any failure (expired, invalid
|
||||
signature, wrong audience, malformed).
|
||||
"""
|
||||
try:
|
||||
data = jwt.decode(
|
||||
token,
|
||||
secret,
|
||||
algorithms=["HS256"],
|
||||
audience=audience,
|
||||
)
|
||||
except jwt.PyJWTError:
|
||||
return None
|
||||
|
||||
try:
|
||||
return MWTPayload(
|
||||
sub=data["sub"],
|
||||
staff=data.get("staff", False),
|
||||
super=data.get("super", False),
|
||||
pkey=data.get("pkey", ""),
|
||||
kid=data.get("kid", "v1"),
|
||||
aud=audience,
|
||||
iat=data["iat"],
|
||||
exp=data["exp"],
|
||||
)
|
||||
except (KeyError, TypeError):
|
||||
return None
|
||||
@@ -23,6 +23,9 @@ class mizanSettings:
|
||||
# Redis URL for cache backend (None = cache disabled)
|
||||
cache_redis_url: str | None
|
||||
|
||||
# MWT token lifetime in seconds (default: 300 = 5 minutes)
|
||||
mwt_ttl: int
|
||||
|
||||
|
||||
@lru_cache
|
||||
def get_settings() -> mizanSettings:
|
||||
@@ -38,6 +41,7 @@ def get_settings() -> mizanSettings:
|
||||
debug_expose_names=getattr(django_settings, "mizan_DEBUG_EXPOSE_NAMES", True),
|
||||
cache_secret=getattr(django_settings, "MIZAN_CACHE_SECRET", None),
|
||||
cache_redis_url=getattr(django_settings, "MIZAN_CACHE_REDIS_URL", None),
|
||||
mwt_ttl=getattr(django_settings, "MIZAN_MWT_TTL", 300),
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -3252,3 +3252,201 @@ class CachePolicyIntegrationTests(TestCase):
|
||||
|
||||
reset_cache()
|
||||
clear_settings_cache()
|
||||
|
||||
|
||||
# ── MWT (Mizan Web Token) tests ────────────────────────────────────────────
|
||||
|
||||
|
||||
class MWTCreationTests(TestCase):
|
||||
"""Tests for MWT creation and decoding."""
|
||||
|
||||
SECRET = "test-mwt-secret-that-is-32bytes!"
|
||||
|
||||
def _make_user(self, **kwargs):
|
||||
user = MagicMock()
|
||||
user.pk = kwargs.get("pk", 1)
|
||||
user.is_staff = kwargs.get("is_staff", False)
|
||||
user.is_superuser = kwargs.get("is_superuser", False)
|
||||
user.get_all_permissions = MagicMock(return_value=kwargs.get("perms", set()))
|
||||
return user
|
||||
|
||||
def test_create_and_decode(self):
|
||||
"""Create an MWT and decode it successfully."""
|
||||
from mizan.mwt import create_mwt, decode_mwt
|
||||
|
||||
user = self._make_user(pk=42, is_staff=True)
|
||||
token = create_mwt(user, self.SECRET, ttl=300)
|
||||
payload = decode_mwt(token, self.SECRET)
|
||||
|
||||
self.assertIsNotNone(payload)
|
||||
self.assertEqual(payload.sub, "42")
|
||||
self.assertTrue(payload.staff)
|
||||
self.assertFalse(payload.super)
|
||||
self.assertEqual(payload.kid, "v1")
|
||||
self.assertEqual(len(payload.pkey), 16)
|
||||
|
||||
def test_decode_expired(self):
|
||||
"""Expired MWT returns None."""
|
||||
from mizan.mwt import create_mwt, decode_mwt
|
||||
|
||||
user = self._make_user()
|
||||
token = create_mwt(user, self.SECRET, ttl=-1)
|
||||
payload = decode_mwt(token, self.SECRET)
|
||||
self.assertIsNone(payload)
|
||||
|
||||
def test_decode_wrong_secret(self):
|
||||
"""MWT signed with wrong secret returns None."""
|
||||
from mizan.mwt import create_mwt, decode_mwt
|
||||
|
||||
user = self._make_user()
|
||||
token = create_mwt(user, self.SECRET)
|
||||
payload = decode_mwt(token, "wrong-secret")
|
||||
self.assertIsNone(payload)
|
||||
|
||||
def test_decode_wrong_audience(self):
|
||||
"""MWT with wrong audience returns None."""
|
||||
from mizan.mwt import create_mwt, decode_mwt
|
||||
|
||||
user = self._make_user()
|
||||
token = create_mwt(user, self.SECRET, audience="app1")
|
||||
payload = decode_mwt(token, self.SECRET, audience="app2")
|
||||
self.assertIsNone(payload)
|
||||
|
||||
def test_mwt_user_has_pkey(self):
|
||||
"""MWTUser carries the permission key."""
|
||||
from mizan.mwt import create_mwt, decode_mwt, MWTUser
|
||||
|
||||
user = self._make_user(pk=5, perms={"app.view_thing"})
|
||||
token = create_mwt(user, self.SECRET)
|
||||
payload = decode_mwt(token, self.SECRET)
|
||||
mwt_user = MWTUser(payload)
|
||||
|
||||
self.assertEqual(mwt_user.pk, 5)
|
||||
self.assertTrue(mwt_user.is_authenticated)
|
||||
self.assertEqual(len(mwt_user.pkey), 16)
|
||||
|
||||
|
||||
class PermissionKeyTests(TestCase):
|
||||
"""Tests for pkey determinism and sensitivity."""
|
||||
|
||||
def _make_user(self, **kwargs):
|
||||
user = MagicMock()
|
||||
user.pk = kwargs.get("pk", 1)
|
||||
user.is_staff = kwargs.get("is_staff", False)
|
||||
user.is_superuser = kwargs.get("is_superuser", False)
|
||||
user.get_all_permissions = MagicMock(return_value=kwargs.get("perms", set()))
|
||||
return user
|
||||
|
||||
def test_deterministic(self):
|
||||
"""Same permissions produce same pkey."""
|
||||
from mizan.mwt import compute_permission_key
|
||||
|
||||
user = self._make_user(perms={"app.view_thing", "app.add_thing"})
|
||||
pkey1 = compute_permission_key(user)
|
||||
pkey2 = compute_permission_key(user)
|
||||
self.assertEqual(pkey1, pkey2)
|
||||
|
||||
def test_changes_on_permission_change(self):
|
||||
"""Different permissions produce different pkey."""
|
||||
from mizan.mwt import compute_permission_key
|
||||
|
||||
user1 = self._make_user(perms={"app.view_thing"})
|
||||
user2 = self._make_user(perms={"app.view_thing", "app.add_thing"})
|
||||
self.assertNotEqual(compute_permission_key(user1), compute_permission_key(user2))
|
||||
|
||||
def test_changes_on_staff_change(self):
|
||||
"""Staff status change produces different pkey."""
|
||||
from mizan.mwt import compute_permission_key
|
||||
|
||||
user_normal = self._make_user(is_staff=False)
|
||||
user_staff = self._make_user(is_staff=True)
|
||||
self.assertNotEqual(
|
||||
compute_permission_key(user_normal),
|
||||
compute_permission_key(user_staff),
|
||||
)
|
||||
|
||||
|
||||
class MWTAuthIntegrationTests(TestCase):
|
||||
"""Tests for MWT authentication in the executor."""
|
||||
|
||||
SECRET = "test-mwt-auth-secret-thats-32bytes!" # 32+ bytes for HS256
|
||||
|
||||
def setUp(self):
|
||||
clear_registry()
|
||||
self.factory = RequestFactory()
|
||||
from mizan.setup.settings import clear_settings_cache
|
||||
clear_settings_cache()
|
||||
|
||||
UserCtx = ReactContext("user")
|
||||
|
||||
@client(context=UserCtx, auth=True)
|
||||
def protected_fn(request: HttpRequest, user_id: int) -> dict:
|
||||
return {"viewer": request.user.pk}
|
||||
|
||||
register(protected_fn, "protected_fn")
|
||||
|
||||
def tearDown(self):
|
||||
clear_registry()
|
||||
from mizan.setup.settings import clear_settings_cache
|
||||
clear_settings_cache()
|
||||
|
||||
def test_mwt_auth_via_header(self):
|
||||
"""Request with valid X-Mizan-Token authenticates."""
|
||||
from mizan.mwt import create_mwt
|
||||
from mizan.client.executor import _try_mwt_auth
|
||||
from django.test import override_settings
|
||||
|
||||
user = MagicMock()
|
||||
user.pk = 5
|
||||
user.is_staff = False
|
||||
user.is_superuser = False
|
||||
user.get_all_permissions = MagicMock(return_value=set())
|
||||
|
||||
token = create_mwt(user, self.SECRET)
|
||||
|
||||
request = self.factory.get("/")
|
||||
request.META["HTTP_X_MIZAN_TOKEN"] = token
|
||||
request.user = MagicMock(is_authenticated=False)
|
||||
|
||||
with override_settings(MIZAN_CACHE_SECRET=self.SECRET):
|
||||
from mizan.setup.settings import clear_settings_cache
|
||||
clear_settings_cache()
|
||||
result = _try_mwt_auth(request)
|
||||
|
||||
self.assertTrue(result)
|
||||
self.assertEqual(request.user.pk, 5)
|
||||
self.assertTrue(request.user.is_authenticated)
|
||||
|
||||
def test_mwt_invalid_returns_401(self):
|
||||
"""Invalid X-Mizan-Token returns 401 on context fetch."""
|
||||
from django.test import override_settings
|
||||
|
||||
with override_settings(MIZAN_CACHE_SECRET=self.SECRET):
|
||||
from mizan.setup.settings import clear_settings_cache
|
||||
clear_settings_cache()
|
||||
response = self.client.get(
|
||||
"/api/mizan/ctx/user/?user_id=5",
|
||||
HTTP_X_MIZAN_TOKEN="invalid-token",
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 401)
|
||||
|
||||
def test_legacy_jwt_still_works(self):
|
||||
"""Authorization: Bearer still accepted alongside MWT."""
|
||||
from mizan.jwt.tokens import create_token_pair
|
||||
from tests.models import EmailUser
|
||||
|
||||
user = EmailUser.objects.create_user(email="legacy@test.com", password="pass")
|
||||
self.client.login(email="legacy@test.com", password="pass")
|
||||
session_key = self.client.session.session_key
|
||||
|
||||
tokens = create_token_pair(
|
||||
user.pk, session_key,
|
||||
is_staff=False, is_superuser=False,
|
||||
)
|
||||
|
||||
response = self.client.get(
|
||||
"/api/mizan/ctx/user/?user_id=5",
|
||||
HTTP_AUTHORIZATION=f"Bearer {tokens.access_token}",
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
Reference in New Issue
Block a user