Full test infrastructure, code audit fixes, and real E2E integration tests

Test infrastructure:
- Django standalone test runner (pytest-django, test settings, EmailUser model)
- React unit tests via Vitest with jsdom, jest compat layer, path aliases
- Playwright E2E tests using generated hooks in a real Chromium browser
- Docker Compose test backend (Django + Redis) for integration testing
- Desktop integration test app (PyWebView + Django + uvicorn)
- Makefile with test/test-django/test-react/test-integration targets

Library bugs found and fixed:
- hasJWT truthiness: undefined !== null was true, skipping session init
- process.env crash: CSR client referenced process.env in non-Node browsers
- baseUrl not forwarded: DjareaProvider didn't pass baseUrl to CSR client
- Relative URL handling: new URL() failed with relative base paths
- call() race condition: HTTP requests fired before CSRF cookie was set
- Session init await: added sessionRef promise so call() waits for session
- path_prefix on schema export: both export commands failed with URL reverse
- NullBooleanField removed: referenced field doesn't exist in Django 5.0+
- lru_cache on JWT settings: get_settings() now cached as intended
- Channel message routing: broadcasts now include channel name and params
- httpFunctionCall: fixed URL and request body format

Generator fixes:
- Removed 1,100 lines of REST/OpenAPI client generation (not part of Djarea)
- Generator now works for djarea-only projects without django-ninja REST APIs
- Generated DjangoContext now includes ChannelProvider when channels exist
- Fixed env var passthrough for schema export commands
- Deduplicated fetch logic into single runDjangoCommand helper

Test quality:
- Fixed 33 tautological Django tests with real assertions
- Found hidden bug: benchmark functions were never registered
- Found hidden bug: unicode lookalike test used plain ASCII
- Deleted worthless React unit tests (duplicates, shape checks, Zod-tests-Zod)
- Replaced jsdom integration tests with Playwright browser tests

Example apps:
- example/: Integration test backend with 33 server functions, 5 forms,
  4 channels covering auth variations, contexts, class-based ServerFunction,
  error codes, DjareaFormMixin, formsets, and JWT
- desktop/: PyWebView desktop app with file system access, SQLite CRUD,
  system introspection, and 39 real HTTP integration tests

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-31 01:17:48 -04:00
commit 4451ec24a1
179 changed files with 27699 additions and 0 deletions

View File

@@ -0,0 +1,70 @@
"""
djarea.jwt - JWT authentication for server functions.
Provides:
- Server functions for obtaining/refreshing JWT tokens
- JWT authentication utilities for validating tokens
Server Functions:
- jwt_obtain: Convert authenticated session to JWT tokens
- jwt_refresh: Refresh tokens using a refresh token
Usage in apps.py or urls.py (to register the functions):
import djarea.jwt.functions # noqa: F401
Note: This module is purpose-built for Djarea server functions.
For Django Ninja API authentication, use djarea.jwt.security directly.
"""
# Server functions (import to register with @client decorator)
from .functions import jwt_obtain, jwt_refresh
# Token utilities
from .tokens import (
create_token_pair,
create_access_token,
create_refresh_token,
decode_token,
refresh_tokens,
TokenPair,
TokenPayload,
JWTUser,
)
# Settings
from .settings import get_settings, JWTSettings
# Security (Ninja API auth) - lazy import to avoid triggering
# django-ninja's settings access at module load time.
# Use: from djarea.jwt.security import jwt_auth
def __getattr__(name):
if name in ("JWTAuth", "jwt_auth"):
from .security import JWTAuth, jwt_auth
globals()["JWTAuth"] = JWTAuth
globals()["jwt_auth"] = jwt_auth
return globals()[name]
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
__all__ = [
# Server functions
"jwt_obtain",
"jwt_refresh",
# Token utilities
"create_token_pair",
"create_access_token",
"create_refresh_token",
"decode_token",
"refresh_tokens",
"TokenPair",
"TokenPayload",
"JWTUser",
# Settings
"get_settings",
"JWTSettings",
# Security (lazy)
"JWTAuth",
"jwt_auth",
]

View File

@@ -0,0 +1,97 @@
"""
JWT Server Functions
JWT token operations exposed as djarea server functions.
Works over WebSocket RPC (primary) or HTTP fallback.
"""
from django.http import HttpRequest
from pydantic import BaseModel
from djarea.client import client
from djarea.jwt.tokens import create_token_pair, refresh_tokens
class TokenPairOutput(BaseModel):
"""JWT token pair response."""
access_token: str
refresh_token: str
expires_in: int
class JWTError(BaseModel):
"""JWT operation error."""
error: str
@client
def jwt_obtain(request: HttpRequest) -> TokenPairOutput:
"""
Obtain JWT tokens from an authenticated session.
Requires session authentication (cookie or WebSocket session).
Returns access and refresh tokens that can be used for stateless auth.
The tokens include user claims (is_staff, is_superuser) so that
subsequent JWT-authenticated requests don't need a database query.
Usage:
const { access_token, refresh_token } = await call('jwt_obtain')
// Use access_token in Authorization: Bearer header
"""
user = request.user
if not user.is_authenticated:
raise PermissionError("Authentication required")
# Get session key - for WebSocket, this comes from the scope
session = getattr(request, 'session', None)
if session is None:
# WebSocket request adapter - session is a dict, not SessionBase
session_key = getattr(request, '_scope', {}).get('session', {}).get('_session_key')
if not session_key:
raise PermissionError("No session available")
else:
# HTTP request - ensure session is saved
if not session.session_key:
session.save()
session_key = session.session_key
# Include user claims in the token for stateless auth
tokens = create_token_pair(
user.pk,
session_key,
is_staff=getattr(user, 'is_staff', False),
is_superuser=getattr(user, 'is_superuser', False),
)
return TokenPairOutput(
access_token=tokens.access_token,
refresh_token=tokens.refresh_token,
expires_in=tokens.expires_in,
)
@client
def jwt_refresh(request: HttpRequest, refresh_token: str) -> TokenPairOutput:
"""
Refresh JWT tokens using a refresh token.
Does not require session authentication - the refresh token itself
contains the session reference and is validated against the session store.
If the original session has been destroyed (user logged out), this fails.
Usage:
const { access_token, refresh_token } = await call('jwt_refresh', { refresh_token })
"""
tokens = refresh_tokens(refresh_token)
if tokens is None:
raise PermissionError("Invalid or expired refresh token")
return TokenPairOutput(
access_token=tokens.access_token,
refresh_token=tokens.refresh_token,
expires_in=tokens.expires_in,
)

View File

@@ -0,0 +1,64 @@
"""
Django Ninja Security Classes for JWT Authentication
Provides authentication classes that can be used with Django Ninja's
auth parameter to protect API endpoints.
"""
from django.http import HttpRequest
from ninja.security import HttpBearer
from .tokens import decode_token, JWTUser
class JWTAuth(HttpBearer):
"""
JWT Bearer token authentication for Django Ninja.
Usage:
from ninja_jwt_session import jwt_auth
@api.get("/protected/", auth=jwt_auth)
def protected_endpoint(request):
return {"user_id": request.user.id}
Or globally:
api = NinjaExtraAPI(auth=[django_auth, jwt_auth])
The token must be passed in the Authorization header:
Authorization: Bearer <access_token>
IMPORTANT: This is stateless - no database query is made.
request.user is a JWTUser object with id, is_staff, is_superuser.
If you need the full User object, query it explicitly:
user = User.objects.get(pk=request.user.id)
"""
def authenticate(self, request: HttpRequest, token: str):
"""
Validate the JWT and return a JWTUser if valid.
Returns None (authentication failed) if:
- Token is invalid or expired
- Token is not an access token
Note: No database query is made. The JWTUser is created from
token claims. This is truly stateless authentication.
"""
# Decode and validate the token
payload = decode_token(token, expected_type="access")
if payload is None:
return None
# Create JWTUser from token claims - NO DATABASE QUERY
jwt_user = JWTUser(payload)
# Set request.user for compatibility with code expecting it
request.user = jwt_user
return jwt_user
# Singleton instance for convenience
jwt_auth = JWTAuth()

View File

@@ -0,0 +1,118 @@
"""
JWT Hybrid Settings
Configuration is read from Django settings with sensible defaults.
Supports both symmetric (HS256) and asymmetric (RS256) algorithms.
"""
from dataclasses import dataclass
from functools import lru_cache
from django.conf import settings as django_settings
@dataclass
class JWTSettings:
"""JWT configuration."""
# Signing keys
private_key: str # Used for signing (required)
public_key: str # Used for verification (same as private for HS256)
# Algorithm
algorithm: str # HS256, RS256, etc.
# Token lifetimes (seconds)
access_token_expires_in: int
refresh_token_expires_in: int
# Security options
validate_session: bool # Check session exists on token validation
rotate_refresh_token: bool # Issue new refresh token on refresh
@lru_cache
def get_settings() -> JWTSettings:
"""
Load JWT settings from Django settings.
Settings:
JWT_PRIVATE_KEY: Signing key (required)
JWT_PUBLIC_KEY: Verification key (defaults to private key for HS256)
JWT_ALGORITHM: Algorithm to use (default: HS256)
JWT_ACCESS_TOKEN_EXPIRES_IN: Access token lifetime (default: 300)
JWT_REFRESH_TOKEN_EXPIRES_IN: Refresh token lifetime (default: 604800)
JWT_VALIDATE_SESSION: Validate session on token use (default: True)
JWT_ROTATE_REFRESH_TOKEN: Rotate refresh tokens (default: True)
"""
private_key = getattr(django_settings, "JWT_PRIVATE_KEY", None)
if not private_key:
# Fall back to allauth setting if available (for compatibility)
headless_key = getattr(django_settings, "HEADLESS_JWT_PRIVATE_KEY", None)
if headless_key:
private_key = headless_key
if private_key is None:
raise ValueError(
"JWT_PRIVATE_KEY must be set in Django settings. "
"For HS256, use a secure random string. "
"For RS256, use a PEM-encoded RSA private key."
)
# Auto-detect algorithm based on key format if not explicitly set
algorithm = getattr(django_settings, "JWT_ALGORITHM", None)
if algorithm is None:
# Auto-detect: if key looks like PEM, use RS256; otherwise HS256
if isinstance(private_key, str) and private_key.strip().startswith("-----BEGIN"):
algorithm = "RS256"
else:
algorithm = "HS256"
# For symmetric algorithms, public key = private key
if algorithm.startswith("HS"):
public_key = private_key
else:
public_key = getattr(django_settings, "JWT_PUBLIC_KEY", None)
if public_key is None:
# Try to extract public key from private key for RSA
try:
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import load_pem_private_key
private_key_obj = load_pem_private_key(
private_key.encode() if isinstance(private_key, str) else private_key,
password=None,
)
public_key = private_key_obj.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
).decode()
except Exception:
raise ValueError(
f"JWT_PUBLIC_KEY must be set for {algorithm} algorithm, "
"or JWT_PRIVATE_KEY must be a valid PEM-encoded RSA key."
)
return JWTSettings(
private_key=private_key,
public_key=public_key,
algorithm=algorithm,
access_token_expires_in=getattr(
django_settings,
"JWT_ACCESS_TOKEN_EXPIRES_IN",
getattr(django_settings, "HEADLESS_JWT_ACCESS_TOKEN_EXPIRES_IN", 300),
),
refresh_token_expires_in=getattr(
django_settings,
"JWT_REFRESH_TOKEN_EXPIRES_IN",
getattr(django_settings, "HEADLESS_JWT_REFRESH_TOKEN_EXPIRES_IN", 604800),
),
validate_session=getattr(
django_settings, "JWT_VALIDATE_SESSION", True
),
rotate_refresh_token=getattr(
django_settings, "JWT_ROTATE_REFRESH_TOKEN", True
),
)

View File

@@ -0,0 +1,245 @@
"""
JWT Token Creation and Validation
Uses PyJWT directly - no allauth dependency.
Tokens are tied to Django sessions for immediate revocation on logout.
"""
import time
from typing import NamedTuple
import jwt
from django.contrib.sessions.backends.base import SessionBase
from .settings import get_settings
class TokenPair(NamedTuple):
"""Access and refresh token pair."""
access_token: str
refresh_token: str
expires_in: int
class TokenPayload(NamedTuple):
"""Decoded token payload."""
user_id: int | str
session_key: str
token_type: str
is_staff: bool
is_superuser: bool
exp: int
iat: int
class JWTUser:
"""
Minimal user object created from JWT claims.
Used as request.user for JWT-authenticated requests.
No database query required - all data comes from the token.
If you need the full User object with all fields, query explicitly:
user = User.objects.get(pk=request.user.id)
"""
def __init__(self, payload: TokenPayload):
self.id = int(payload.user_id) if isinstance(payload.user_id, str) else payload.user_id
self.pk = self.id
self.is_staff = payload.is_staff
self.is_superuser = payload.is_superuser
self.is_authenticated = True
self.is_anonymous = False
self.is_active = True # Assumed active if they have a valid token
def __str__(self):
return f"JWTUser(id={self.id})"
def __repr__(self):
return f"JWTUser(id={self.id}, is_staff={self.is_staff}, is_superuser={self.is_superuser})"
def create_access_token(
user_id: int | str,
session_key: str,
*,
is_staff: bool = False,
is_superuser: bool = False,
) -> str:
"""
Create a short-lived access token.
The token contains:
- sub: user ID
- sid: session key (for revocation checking)
- staff: is_staff flag
- super: is_superuser flag
- type: "access"
- iat: issued at
- exp: expiration
"""
settings = get_settings()
now = int(time.time())
payload = {
"sub": str(user_id),
"sid": session_key,
"staff": is_staff,
"super": is_superuser,
"type": "access",
"iat": now,
"exp": now + settings.access_token_expires_in,
}
return jwt.encode(
payload,
settings.private_key,
algorithm=settings.algorithm,
)
def create_refresh_token(
user_id: int | str,
session_key: str,
*,
is_staff: bool = False,
is_superuser: bool = False,
) -> str:
"""
Create a longer-lived refresh token.
The token contains:
- sub: user ID
- sid: session key (for revocation checking)
- staff: is_staff flag
- super: is_superuser flag
- type: "refresh"
- iat: issued at
- exp: expiration
"""
settings = get_settings()
now = int(time.time())
payload = {
"sub": str(user_id),
"sid": session_key,
"staff": is_staff,
"super": is_superuser,
"type": "refresh",
"iat": now,
"exp": now + settings.refresh_token_expires_in,
}
return jwt.encode(
payload,
settings.private_key,
algorithm=settings.algorithm,
)
def create_token_pair(
user_id: int | str,
session_key: str,
*,
is_staff: bool = False,
is_superuser: bool = False,
) -> TokenPair:
"""Create both access and refresh tokens."""
settings = get_settings()
return TokenPair(
access_token=create_access_token(
user_id, session_key, is_staff=is_staff, is_superuser=is_superuser
),
refresh_token=create_refresh_token(
user_id, session_key, is_staff=is_staff, is_superuser=is_superuser
),
expires_in=settings.access_token_expires_in,
)
def decode_token(token: str, expected_type: str = None) -> TokenPayload | None:
"""
Decode and validate a JWT token.
Returns None if:
- Token is invalid or expired
- Token type doesn't match expected_type (if specified)
"""
settings = get_settings()
try:
payload = jwt.decode(
token,
settings.public_key,
algorithms=[settings.algorithm],
)
except jwt.PyJWTError:
return None
# Validate token type if specified
if expected_type and payload.get("type") != expected_type:
return None
return TokenPayload(
user_id=payload["sub"],
session_key=payload["sid"],
token_type=payload["type"],
is_staff=payload.get("staff", False),
is_superuser=payload.get("super", False),
exp=payload["exp"],
iat=payload["iat"],
)
def validate_session(session_key: str) -> bool:
"""
Check if a session is still valid (exists and not expired).
This is the key to immediate logout revocation - if the session
is destroyed, tokens tied to it become invalid.
"""
from importlib import import_module
from django.conf import settings as django_settings
jwt_settings = get_settings()
if not jwt_settings.validate_session:
return True
# Use the configured session engine
engine = import_module(django_settings.SESSION_ENGINE)
SessionStore = engine.SessionStore
# Try to load the session
session = SessionStore(session_key=session_key)
# Check if session exists and is not empty
# exists() is more reliable than checking load() result
return session.exists(session_key)
def refresh_tokens(refresh_token: str) -> TokenPair | None:
"""
Use a refresh token to obtain new tokens.
Returns None if:
- Refresh token is invalid or expired
- Associated session no longer exists
"""
payload = decode_token(refresh_token, expected_type="refresh")
if payload is None:
return None
# Validate the session still exists
if not validate_session(payload.session_key):
return None
# Issue new token pair with same claims
return create_token_pair(
payload.user_id,
payload.session_key,
is_staff=payload.is_staff,
is_superuser=payload.is_superuser,
)