Move desktop and e2e into examples/ directory

- desktop/ → examples/django-react-desktop-app/
- e2e/ → examples/django-react-site/
- example/ → examples/django-react-site/backend/
- Update Dockerfile.test, Makefile, playwright config, and
  django.config.mjs path references

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-31 20:41:20 -04:00
parent c866142770
commit eee352d908
51 changed files with 5983 additions and 10 deletions

View File

@@ -0,0 +1,96 @@
#!/usr/bin/env python
"""
mizan Desktop — PyWebView + Django local RPC.
Starts a local Django ASGI server and opens a native desktop window.
All communication between the UI and backend uses mizan server functions.
"""
import os
import sys
import threading
import time
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "backend.settings")
# Work around Qt WebEngine GPU crashes on some systems
os.environ.setdefault("QTWEBENGINE_CHROMIUM_FLAGS", "--disable-gpu")
def start_server(host: str, port: int):
"""Start the Django ASGI server in a background thread."""
import django
django.setup()
# Run migrations on first launch
from django.core.management import call_command
call_command("migrate", "--run-syncdb", verbosity=0)
import uvicorn
uvicorn.run(
"backend.asgi:application",
host=host,
port=port,
log_level="warning",
)
def wait_for_server(url: str, timeout: float = 10.0):
"""Poll until the server responds."""
from urllib.request import urlopen
from urllib.error import URLError
deadline = time.time() + timeout
while time.time() < deadline:
try:
urlopen(url, timeout=1)
return True
except (URLError, OSError):
time.sleep(0.1)
return False
def main():
host = "127.0.0.1"
port = 8765
# Start Django in a daemon thread
server = threading.Thread(target=start_server, args=(host, port), daemon=True)
server.start()
base_url = f"http://{host}:{port}"
if not wait_for_server(f"{base_url}/api/mizan/session/"):
print("ERROR: Django server failed to start", file=sys.stderr)
sys.exit(1)
print(f"Backend running at {base_url}")
# Check if --headless flag is passed (for testing)
if "--headless" in sys.argv:
print("Headless mode — server running. Press Ctrl+C to stop.")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
pass
return
# Open native window
import webview
window = webview.create_window(
title="mizan Desktop",
url=base_url,
width=1024,
height=768,
min_size=(640, 480),
)
webview.start()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,6 @@
from django.apps import AppConfig
class DesktopBackendConfig(AppConfig):
name = "backend"
default_auto_field = "django.db.models.BigAutoField"

View File

@@ -0,0 +1,13 @@
import os
import django
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "backend.settings")
django.setup()
from django.core.asgi import get_asgi_application
from mizan import wrap_asgi
import backend.mizan_clients # noqa: F401
application = wrap_asgi(get_asgi_application())

View File

@@ -0,0 +1,419 @@
"""
Desktop RPC server functions.
Tests mizan's appropriateness for desktop apps:
- Local file system access
- SQLite CRUD
- System introspection
- Real-time channels (file watcher, app status)
- No auth required (single-user desktop)
"""
import os
import platform
import shutil
import sys
import time
from datetime import datetime
from pathlib import Path
from django.http import HttpRequest
from pydantic import BaseModel
from mizan.client import client
from mizan.channels import ReactChannel
from mizan.setup.registry import register
from mizan.channels import register as register_channel
# =============================================================================
# System Info
# =============================================================================
class SystemInfoOutput(BaseModel):
os_name: str
os_version: str
python_version: str
hostname: str
username: str
home_dir: str
cwd: str
cpu_count: int
mizan_version: str
@client(websocket=True)
def system_info(request: HttpRequest) -> SystemInfoOutput:
import mizan
return SystemInfoOutput(
os_name=platform.system(),
os_version=platform.version(),
python_version=sys.version.split()[0],
hostname=platform.node(),
username=os.getenv("USER", os.getenv("USERNAME", "unknown")),
home_dir=str(Path.home()),
cwd=os.getcwd(),
cpu_count=os.cpu_count() or 1,
mizan_version=getattr(mizan, "__version__", "dev"),
)
register(system_info, "system_info")
class DiskUsageOutput(BaseModel):
path: str
total_gb: float
used_gb: float
free_gb: float
percent_used: float
@client(websocket=True)
def disk_usage(request: HttpRequest, path: str = "/") -> DiskUsageOutput:
usage = shutil.disk_usage(path)
return DiskUsageOutput(
path=path,
total_gb=round(usage.total / (1024**3), 2),
used_gb=round(usage.used / (1024**3), 2),
free_gb=round(usage.free / (1024**3), 2),
percent_used=round(usage.used / usage.total * 100, 1),
)
register(disk_usage, "disk_usage")
# =============================================================================
# File System
# =============================================================================
class FileEntry(BaseModel):
name: str
path: str
is_dir: bool
size: int
modified: str
class ListFilesOutput(BaseModel):
directory: str
entries: list[FileEntry]
parent: str | None
@client(websocket=True)
def list_files(request: HttpRequest, directory: str = "~") -> ListFilesOutput:
dir_path = Path(directory).expanduser().resolve()
if not dir_path.is_dir():
raise ValueError(f"Not a directory: {dir_path}")
entries = []
try:
for entry in sorted(
dir_path.iterdir(), key=lambda e: (not e.is_dir(), e.name.lower())
):
try:
stat = entry.stat()
entries.append(
FileEntry(
name=entry.name,
path=str(entry),
is_dir=entry.is_dir(),
size=stat.st_size if not entry.is_dir() else 0,
modified=datetime.fromtimestamp(stat.st_mtime).isoformat(),
)
)
except (PermissionError, OSError):
continue
except PermissionError:
raise PermissionError(f"Cannot read directory: {dir_path}")
parent = str(dir_path.parent) if dir_path.parent != dir_path else None
return ListFilesOutput(
directory=str(dir_path),
entries=entries,
parent=parent,
)
register(list_files, "list_files")
class FileContentOutput(BaseModel):
path: str
content: str
size: int
modified: str
@client(websocket=True)
def read_file(request: HttpRequest, path: str) -> FileContentOutput:
file_path = Path(path).expanduser().resolve()
if not file_path.is_file():
raise FileNotFoundError(f"File not found: {file_path}")
stat = file_path.stat()
# Safety: limit to 1MB text files
if stat.st_size > 1_048_576:
raise ValueError(f"File too large: {stat.st_size} bytes (max 1MB)")
try:
content = file_path.read_text(encoding="utf-8")
except UnicodeDecodeError:
raise ValueError(f"Not a text file: {file_path}")
return FileContentOutput(
path=str(file_path),
content=content,
size=stat.st_size,
modified=datetime.fromtimestamp(stat.st_mtime).isoformat(),
)
register(read_file, "read_file")
class WriteFileOutput(BaseModel):
path: str
size: int
@client(websocket=True)
def write_file(request: HttpRequest, path: str, content: str) -> WriteFileOutput:
file_path = Path(path).expanduser().resolve()
# Safety: only allow writing within home directory
home = Path.home()
if not str(file_path).startswith(str(home)):
raise PermissionError(f"Can only write files within home directory: {home}")
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_text(content, encoding="utf-8")
return WriteFileOutput(path=str(file_path), size=len(content.encode("utf-8")))
register(write_file, "write_file")
class DeleteFileOutput(BaseModel):
path: str
deleted: bool
@client(websocket=True)
def delete_file(request: HttpRequest, path: str) -> DeleteFileOutput:
file_path = Path(path).expanduser().resolve()
home = Path.home()
if not str(file_path).startswith(str(home)):
raise PermissionError(f"Can only delete files within home directory: {home}")
if file_path.exists():
file_path.unlink()
return DeleteFileOutput(path=str(file_path), deleted=True)
return DeleteFileOutput(path=str(file_path), deleted=False)
register(delete_file, "delete_file")
# =============================================================================
# Notes CRUD (SQLite)
# =============================================================================
class NoteOutput(BaseModel):
id: int
title: str
content: str
pinned: bool
created_at: str
updated_at: str
class NoteListOutput(BaseModel):
notes: list[NoteOutput]
count: int
def _note_to_output(note) -> NoteOutput:
return NoteOutput(
id=note.id,
title=note.title,
content=note.content,
pinned=note.pinned,
created_at=note.created_at.isoformat(),
updated_at=note.updated_at.isoformat(),
)
@client(websocket=True)
def list_notes(request: HttpRequest) -> NoteListOutput:
from backend.models import Note
notes = Note.objects.all()
return NoteListOutput(
notes=[_note_to_output(n) for n in notes],
count=notes.count(),
)
register(list_notes, "list_notes")
@client(websocket=True)
def create_note(
request: HttpRequest, title: str, content: str = "", pinned: bool = False
) -> NoteOutput:
from backend.models import Note
note = Note.objects.create(title=title, content=content, pinned=pinned)
return _note_to_output(note)
register(create_note, "create_note")
@client(websocket=True)
def get_note(request: HttpRequest, id: int) -> NoteOutput:
from backend.models import Note
try:
note = Note.objects.get(pk=id)
except Note.DoesNotExist:
raise ValueError(f"Note {id} not found")
return _note_to_output(note)
register(get_note, "get_note")
@client(websocket=True)
def update_note(
request: HttpRequest,
id: int,
title: str | None = None,
content: str | None = None,
pinned: bool | None = None,
) -> NoteOutput:
from backend.models import Note
try:
note = Note.objects.get(pk=id)
except Note.DoesNotExist:
raise ValueError(f"Note {id} not found")
if title is not None:
note.title = title
if content is not None:
note.content = content
if pinned is not None:
note.pinned = pinned
note.save()
return _note_to_output(note)
register(update_note, "update_note")
class DeleteNoteOutput(BaseModel):
id: int
deleted: bool
@client(websocket=True)
def delete_note(request: HttpRequest, id: int) -> DeleteNoteOutput:
from backend.models import Note
try:
note = Note.objects.get(pk=id)
note.delete()
return DeleteNoteOutput(id=id, deleted=True)
except Note.DoesNotExist:
return DeleteNoteOutput(id=id, deleted=False)
register(delete_note, "delete_note")
# =============================================================================
# Channels — Real-time Desktop Events
# =============================================================================
class AppStatusChannel(ReactChannel):
"""Push app status updates to the UI (uptime, memory, etc.)."""
class DjangoMessage(BaseModel):
uptime_seconds: float
memory_mb: float
note_count: int
timestamp: str
def authorize(self, params=None):
return True # Desktop app, no auth needed
def group(self, params=None):
return "app_status"
register_channel(AppStatusChannel, "app_status")
class NotesChannel(ReactChannel):
"""Push notifications when notes are modified."""
class DjangoMessage(BaseModel):
action: str # "created", "updated", "deleted"
note_id: int
title: str
def authorize(self, params=None):
return True
def group(self, params=None):
return "notes_updates"
register_channel(NotesChannel, "notes_updates")
# =============================================================================
# App Lifecycle
# =============================================================================
_start_time = time.time()
class AppInfoOutput(BaseModel):
app_name: str
uptime_seconds: float
db_path: str
pid: int
@client(websocket=True)
def app_info(request: HttpRequest) -> AppInfoOutput:
from django.conf import settings
return AppInfoOutput(
app_name="mizan Desktop",
uptime_seconds=round(time.time() - _start_time, 2),
db_path=str(settings.DATABASES["default"]["NAME"]),
pid=os.getpid(),
)
register(app_info, "app_info")

View File

@@ -0,0 +1,15 @@
from django.db import models
class Note(models.Model):
title = models.CharField(max_length=200)
content = models.TextField(blank=True, default="")
pinned = models.BooleanField(default=False)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ["-pinned", "-updated_at"]
def __str__(self):
return self.title

View File

@@ -0,0 +1,49 @@
"""
Django settings for the mizan desktop integration test app.
Runs entirely local: SQLite database, in-memory channel layer,
no external services required.
"""
import os
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
SECRET_KEY = "desktop-app-local-only-secret-key"
DEBUG = True
ALLOWED_HOSTS = ["127.0.0.1", "localhost"]
INSTALLED_APPS = [
"django.contrib.contenttypes",
"backend",
]
MIDDLEWARE = []
ROOT_URLCONF = "backend.urls"
DATABASES = {
"default": {
"ENGINE": "django.db.backends.sqlite3",
"NAME": os.path.join(BASE_DIR, "app.db"),
}
}
DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField"
ASGI_APPLICATION = "backend.asgi.application"
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels.layers.InMemoryChannelLayer",
},
}
# Serve the built frontend
STATIC_URL = "/static/"
STATICFILES_DIRS = [os.path.join(BASE_DIR, "frontend", "dist")]
# No auth, no CSRF — local desktop app
CSRF_COOKIE_HTTPONLY = False

View File

@@ -0,0 +1,34 @@
from django.urls import include, path, re_path
from django.http import HttpResponse, HttpResponseNotFound
from pathlib import Path
DIST_DIR = Path(__file__).resolve().parent.parent / "frontend" / "dist"
CONTENT_TYPES = {
".html": "text/html",
".js": "application/javascript",
".css": "text/css",
".svg": "image/svg+xml",
".png": "image/png",
".ico": "image/x-icon",
".woff2": "font/woff2",
".json": "application/json",
}
def serve_dist(request, path="index.html"):
file_path = (DIST_DIR / path).resolve()
if not str(file_path).startswith(str(DIST_DIR)) or not file_path.is_file():
return HttpResponseNotFound()
ct = CONTENT_TYPES.get(file_path.suffix, "application/octet-stream")
return HttpResponse(file_path.read_bytes(), content_type=ct)
urlpatterns = [
path("api/mizan/", include("mizan.urls")),
re_path(r"^(?P<path>assets/.+)$", serve_dist),
path("favicon.ico", serve_dist, {"path": "favicon.ico"}),
path("", serve_dist),
]

View File

@@ -0,0 +1,16 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>mizan Desktop</title>
<style>
* { margin: 0; padding: 0; box-sizing: border-box; }
body { font-family: system-ui, -apple-system, sans-serif; background: #0f0f0f; color: #e0e0e0; }
</style>
</head>
<body>
<div id="root"></div>
<script type="module" src="/src/main.tsx"></script>
</body>
</html>

View File

@@ -0,0 +1,21 @@
{
"name": "mizan-desktop-frontend",
"private": true,
"type": "module",
"scripts": {
"dev": "vite --port 5173",
"build": "vite build"
},
"dependencies": {
"@rythazhur/mizan": "file:../../react",
"react": "^19.0.0",
"react-dom": "^19.0.0"
},
"devDependencies": {
"@types/react": "^19.0.0",
"@types/react-dom": "^19.0.0",
"@vitejs/plugin-react": "^4.0.0",
"typescript": "^5.7.0",
"vite": "^6.0.0"
}
}

View File

@@ -0,0 +1,215 @@
import { useState, useEffect, useCallback } from 'react'
import { MizanProvider, useMizan, useMizanStatus } from '@rythazhur/mizan'
// ─── System Info ────────────────────────────────────────────────────────────
function SystemInfo() {
const { call } = useMizan()
const [info, setInfo] = useState<Record<string, unknown> | null>(null)
useEffect(() => {
call('system_info').then(setInfo).catch(() => {})
}, [call])
if (!info) return <div style={styles.card}>Loading system info...</div>
return (
<div style={styles.card}>
<h2 style={styles.h2}>System</h2>
<table style={styles.table}>
<tbody>
{Object.entries(info).map(([k, v]) => (
<tr key={k}>
<td style={styles.label}>{k}</td>
<td style={styles.value}>{String(v)}</td>
</tr>
))}
</tbody>
</table>
</div>
)
}
// ─── Connection Status ──────────────────────────────────────────────────────
function StatusBar() {
const status = useMizanStatus()
return (
<div style={{ ...styles.statusBar, color: status === 'connected' ? '#4ade80' : '#f87171' }}>
{status}
</div>
)
}
// ─── Notes ──────────────────────────────────────────────────────────────────
type Note = { id: number; title: string; content: string; pinned: boolean; updated_at: string }
function Notes() {
const { call } = useMizan()
const [notes, setNotes] = useState<Note[]>([])
const [selected, setSelected] = useState<Note | null>(null)
const [title, setTitle] = useState('')
const [content, setContent] = useState('')
const refresh = useCallback(() => {
call<{ notes: Note[] }>('list_notes').then(d => setNotes(d.notes)).catch(() => {})
}, [call])
useEffect(() => { refresh() }, [refresh])
const create = async () => {
if (!title.trim()) return
await call('create_note', { title, content })
setTitle('')
setContent('')
refresh()
}
const save = async () => {
if (!selected) return
await call('update_note', { id: selected.id, title, content })
setSelected(null)
setTitle('')
setContent('')
refresh()
}
const remove = async (id: number) => {
await call('delete_note', { id })
if (selected?.id === id) { setSelected(null); setTitle(''); setContent('') }
refresh()
}
const select = (n: Note) => {
setSelected(n)
setTitle(n.title)
setContent(n.content)
}
return (
<div style={styles.card}>
<h2 style={styles.h2}>Notes ({notes.length})</h2>
<div style={{ display: 'flex', gap: 12 }}>
<div style={{ flex: 1 }}>
{notes.map(n => (
<div
key={n.id}
onClick={() => select(n)}
style={{
...styles.noteItem,
borderLeft: selected?.id === n.id ? '3px solid #6cf' : '3px solid transparent',
}}
>
<span>{n.pinned ? '\u{1f4cc} ' : ''}{n.title}</span>
<button onClick={e => { e.stopPropagation(); remove(n.id) }} style={styles.deleteBtn}>x</button>
</div>
))}
{notes.length === 0 && <div style={{ color: '#666', padding: 8 }}>No notes yet</div>}
</div>
<div style={{ flex: 2 }}>
<input
value={title}
onChange={e => setTitle(e.target.value)}
placeholder="Title"
style={styles.input}
/>
<textarea
value={content}
onChange={e => setContent(e.target.value)}
placeholder="Content"
rows={6}
style={{ ...styles.input, resize: 'vertical' }}
/>
<button onClick={selected ? save : create} style={styles.btn}>
{selected ? 'Save' : 'Create'}
</button>
{selected && (
<button onClick={() => { setSelected(null); setTitle(''); setContent('') }} style={{ ...styles.btn, background: '#333' }}>
Cancel
</button>
)}
</div>
</div>
</div>
)
}
// ─── File Browser ───────────────────────────────────────────────────────────
type FileEntry = { name: string; path: string; is_dir: boolean; size: number }
function FileBrowser() {
const { call } = useMizan()
const [dir, setDir] = useState('~')
const [entries, setEntries] = useState<FileEntry[]>([])
const [parent, setParent] = useState<string | null>(null)
const browse = useCallback((d: string) => {
call<{ directory: string; entries: FileEntry[]; parent: string | null }>('list_files', { directory: d })
.then(data => {
setDir(data.directory)
setEntries(data.entries.slice(0, 50))
setParent(data.parent)
})
.catch(() => {})
}, [call])
useEffect(() => { browse('~') }, [browse])
return (
<div style={styles.card}>
<h2 style={styles.h2}>Files</h2>
<div style={{ color: '#888', fontSize: 13, marginBottom: 8 }}>{dir}</div>
{parent && (
<div onClick={() => browse(parent)} style={{ ...styles.fileItem, color: '#6cf', cursor: 'pointer' }}>
../ (parent)
</div>
)}
{entries.map(e => (
<div
key={e.path}
onClick={() => e.is_dir && browse(e.path)}
style={{ ...styles.fileItem, cursor: e.is_dir ? 'pointer' : 'default', color: e.is_dir ? '#6cf' : '#ccc' }}
>
{e.is_dir ? '\u{1f4c1}' : '\u{1f4c4}'} {e.name}
{!e.is_dir && <span style={{ color: '#666', marginLeft: 8 }}>{(e.size / 1024).toFixed(1)}K</span>}
</div>
))}
</div>
)
}
// ─── App ────────────────────────────────────────────────────────────────────
export function App() {
return (
<MizanProvider baseUrl="/api/mizan" autoConnect={false}>
<div style={{ maxWidth: 960, margin: '0 auto', padding: 24 }}>
<div style={{ display: 'flex', justifyContent: 'space-between', alignItems: 'center', marginBottom: 24 }}>
<h1 style={{ fontSize: 24, color: '#fff' }}>mizan Desktop</h1>
<StatusBar />
</div>
<SystemInfo />
<Notes />
<FileBrowser />
</div>
</MizanProvider>
)
}
// ─── Styles ─────────────────────────────────────────────────────────────────
const styles: Record<string, React.CSSProperties> = {
card: { background: '#1a1a1a', borderRadius: 8, padding: 20, marginBottom: 16 },
h2: { fontSize: 16, marginBottom: 12, color: '#aaa', textTransform: 'uppercase', letterSpacing: 1 },
table: { width: '100%', fontSize: 14 },
label: { padding: '4px 12px 4px 0', color: '#888', whiteSpace: 'nowrap' },
value: { padding: '4px 0', wordBreak: 'break-all' },
input: { width: '100%', padding: '8px 12px', marginBottom: 8, background: '#111', border: '1px solid #333', borderRadius: 4, color: '#e0e0e0', fontSize: 14 },
btn: { padding: '8px 16px', background: '#2563eb', color: '#fff', border: 'none', borderRadius: 4, cursor: 'pointer', marginRight: 8, fontSize: 14 },
noteItem: { display: 'flex', justifyContent: 'space-between', alignItems: 'center', padding: '8px 12px', cursor: 'pointer', borderRadius: 4, marginBottom: 2 },
deleteBtn: { background: 'none', border: 'none', color: '#666', cursor: 'pointer', fontSize: 14, padding: '2px 6px' },
fileItem: { padding: '4px 8px', fontSize: 14 },
statusBar: { fontSize: 12, fontFamily: 'monospace' },
}

View File

@@ -0,0 +1,4 @@
import { createRoot } from 'react-dom/client'
import { App } from './App'
createRoot(document.getElementById('root')!).render(<App />)

View File

@@ -0,0 +1,11 @@
{
"compilerOptions": {
"target": "ES2022",
"module": "ESNext",
"moduleResolution": "bundler",
"strict": true,
"jsx": "react-jsx",
"skipLibCheck": true
},
"include": ["src"]
}

View File

@@ -0,0 +1,12 @@
import { defineConfig } from 'vite'
import react from '@vitejs/plugin-react'
export default defineConfig({
plugins: [react()],
server: {
proxy: {
'/api': 'http://127.0.0.1:8765',
'/ws': { target: 'ws://127.0.0.1:8765', ws: true },
},
},
})

View File

@@ -0,0 +1,8 @@
#!/usr/bin/env python
import os
import sys
if __name__ == "__main__":
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "backend.settings")
from django.core.management import execute_from_command_line
execute_from_command_line(sys.argv)

View File

@@ -0,0 +1,25 @@
[project]
name = "mizan-desktop"
version = "0.1.0"
description = "Desktop integration test app for mizan"
requires-python = ">=3.10"
dependencies = [
"mizan[channels]",
"uvicorn[standard]>=0.30",
"pywebview[qt]>=5.0",
]
[tool.uv.sources]
mizan = { path = "../django", editable = true }
[project.optional-dependencies]
dev = [
"pytest>=8.0",
"pytest-django>=4.9",
"httpx>=0.27",
]
[tool.pytest.ini_options]
DJANGO_SETTINGS_MODULE = "backend.settings"
pythonpath = ["."]
testpaths = ["tests"]

View File

@@ -0,0 +1,8 @@
import django
from django.conf import settings
# Ensure migrations run before tests
def pytest_configure():
# Import mizan_clients to trigger function registration
import backend.mizan_clients # noqa: F401

View File

@@ -0,0 +1,179 @@
"""
REAL integration tests for the mizan RPC framework layer.
Tests the actual HTTP stack: CSRF, middleware, error codes, validation.
Every test makes a real HTTP request — no mocks, no RequestFactory.
"""
import json
from urllib.request import urlopen, Request
from urllib.error import HTTPError
from django.test import LiveServerTestCase
class RealHTTPMixin:
def _session_init(self):
url = f"{self.live_server_url}/api/mizan/session/"
resp = urlopen(Request(url))
cookies = resp.headers.get_all("Set-Cookie") or []
for cookie in cookies:
if "csrftoken=" in cookie:
self._csrf_token = cookie.split("csrftoken=")[1].split(";")[0]
self._cookies = f"csrftoken={self._csrf_token}"
return
self._csrf_token = None
self._cookies = ""
def _call(self, fn: str, args: dict | None = None):
url = f"{self.live_server_url}/api/mizan/call/"
body = json.dumps({"fn": fn, "args": args or {}}).encode()
req = Request(url, data=body, method="POST")
req.add_header("Content-Type", "application/json")
if self._csrf_token:
req.add_header("X-CSRFToken", self._csrf_token)
if self._cookies:
req.add_header("Cookie", self._cookies)
resp = urlopen(req)
return json.loads(resp.read())
def _raw_post(
self,
path: str,
body: bytes | str,
content_type: str = "application/json",
include_csrf: bool = False,
):
"""Raw POST without the call() envelope — for testing malformed requests."""
url = f"{self.live_server_url}{path}"
if isinstance(body, str):
body = body.encode()
req = Request(url, data=body, method="POST")
req.add_header("Content-Type", content_type)
if include_csrf and self._csrf_token:
req.add_header("X-CSRFToken", self._csrf_token)
req.add_header("Cookie", self._cookies)
return urlopen(req)
class CSRFTests(RealHTTPMixin, LiveServerTestCase):
"""CSRF handling over real HTTP — the thing that was broken."""
def test_session_endpoint_sets_csrf_cookie(self):
"""GET /session/ must return a Set-Cookie with csrftoken."""
url = f"{self.live_server_url}/api/mizan/session/"
resp = urlopen(Request(url))
cookies = resp.headers.get_all("Set-Cookie") or []
csrf_cookies = [c for c in cookies if "csrftoken=" in c]
self.assertGreater(len(csrf_cookies), 0, "No csrftoken cookie set by /session/")
def test_call_without_csrf_is_rejected(self):
"""POST /call/ without CSRF token must fail."""
url = f"{self.live_server_url}/api/mizan/call/"
body = json.dumps({"fn": "system_info", "args": {}}).encode()
req = Request(url, data=body, method="POST")
req.add_header("Content-Type", "application/json")
try:
resp = urlopen(req)
data = json.loads(resp.read())
# If it doesn't raise, the response should indicate an error
self.assertTrue(data.get("error"), "POST without CSRF should be rejected")
except HTTPError as e:
self.assertEqual(e.code, 403, f"Expected 403, got {e.code}")
def test_call_with_csrf_succeeds(self):
"""POST /call/ with valid CSRF token must work."""
self._session_init()
data = self._call("system_info")
self.assertFalse(data["error"])
self.assertIn("os_name", data["data"])
class ValidationTests(RealHTTPMixin, LiveServerTestCase):
"""Pydantic validation errors over real HTTP."""
def setUp(self):
self._session_init()
def test_missing_required_field(self):
"""Calling create_note without title should return VALIDATION_ERROR."""
data = self._call("create_note", {})
self.assertTrue(data["error"])
self.assertEqual(data["code"], "VALIDATION_ERROR")
def test_wrong_type(self):
"""Calling delete_note with string id should return VALIDATION_ERROR."""
data = self._call("delete_note", {"id": "not-an-int"})
self.assertTrue(data["error"])
self.assertEqual(data["code"], "VALIDATION_ERROR")
def test_missing_multiple_fields(self):
"""write_file with no args should list all missing fields."""
data = self._call("write_file", {})
self.assertTrue(data["error"])
self.assertEqual(data["code"], "VALIDATION_ERROR")
class ErrorCodeTests(RealHTTPMixin, LiveServerTestCase):
"""Error codes over real HTTP."""
def setUp(self):
self._session_init()
def test_not_found_function(self):
data = self._call("this_does_not_exist")
self.assertTrue(data["error"])
self.assertEqual(data["code"], "NOT_FOUND")
def test_forbidden_write_outside_home(self):
data = self._call("write_file", {"path": "/etc/nope.txt", "content": "x"})
self.assertTrue(data["error"])
self.assertEqual(data["code"], "FORBIDDEN")
def test_get_method_rejected(self):
"""GET to /call/ should be rejected."""
url = f"{self.live_server_url}/api/mizan/call/"
try:
resp = urlopen(Request(url))
data = json.loads(resp.read())
self.assertTrue(data.get("error"))
except HTTPError as e:
self.assertIn(e.code, [403, 405])
def test_invalid_json_body(self):
"""Malformed JSON should return BAD_REQUEST."""
self._session_init()
try:
resp = self._raw_post(
"/api/mizan/call/",
body="not valid json{{{",
include_csrf=True,
)
data = json.loads(resp.read())
self.assertTrue(data["error"])
self.assertEqual(data["code"], "BAD_REQUEST")
except HTTPError as e:
self.assertIn(e.code, [400, 403])
def test_missing_fn_field(self):
"""POST with valid JSON but no 'fn' field should return BAD_REQUEST."""
self._session_init()
try:
resp = self._raw_post(
"/api/mizan/call/",
body=json.dumps({"not_fn": "hello"}),
include_csrf=True,
)
data = json.loads(resp.read())
self.assertTrue(data["error"])
self.assertEqual(data["code"], "BAD_REQUEST")
except HTTPError as e:
self.assertIn(e.code, [400, 403])

View File

@@ -0,0 +1,143 @@
"""
REAL integration tests for notes CRUD over HTTP.
Every test makes actual HTTP requests to a live Django server.
"""
import json
from django.test import LiveServerTestCase
from urllib.request import urlopen, Request
class RealHTTPMixin:
def _session_init(self):
url = f"{self.live_server_url}/api/mizan/session/"
resp = urlopen(Request(url))
cookies = resp.headers.get_all("Set-Cookie") or []
for cookie in cookies:
if "csrftoken=" in cookie:
self._csrf_token = cookie.split("csrftoken=")[1].split(";")[0]
self._cookies = f"csrftoken={self._csrf_token}"
return
self._csrf_token = None
self._cookies = ""
def _call(self, fn: str, args: dict | None = None):
url = f"{self.live_server_url}/api/mizan/call/"
body = json.dumps({"fn": fn, "args": args or {}}).encode()
req = Request(url, data=body, method="POST")
req.add_header("Content-Type", "application/json")
if self._csrf_token:
req.add_header("X-CSRFToken", self._csrf_token)
if self._cookies:
req.add_header("Cookie", self._cookies)
resp = urlopen(req)
return json.loads(resp.read())
class NotesCRUDTests(RealHTTPMixin, LiveServerTestCase):
"""Full CRUD lifecycle over real HTTP."""
def setUp(self):
self._session_init()
def test_list_notes_empty(self):
data = self._call("list_notes")
self.assertFalse(data["error"])
self.assertEqual(data["data"]["notes"], [])
self.assertEqual(data["data"]["count"], 0)
def test_create_note(self):
data = self._call("create_note", {"title": "First Note", "content": "Hello!"})
self.assertFalse(data["error"])
self.assertEqual(data["data"]["title"], "First Note")
self.assertEqual(data["data"]["content"], "Hello!")
self.assertFalse(data["data"]["pinned"])
self.assertIn("id", data["data"])
self.assertIn("created_at", data["data"])
def test_create_and_list(self):
self._call("create_note", {"title": "Note A"})
self._call("create_note", {"title": "Note B"})
data = self._call("list_notes")
self.assertFalse(data["error"])
self.assertEqual(data["data"]["count"], 2)
titles = [n["title"] for n in data["data"]["notes"]]
self.assertIn("Note A", titles)
self.assertIn("Note B", titles)
def test_get_note_by_id(self):
create = self._call("create_note", {"title": "Get Me", "content": "Specific"})
note_id = create["data"]["id"]
data = self._call("get_note", {"id": note_id})
self.assertFalse(data["error"])
self.assertEqual(data["data"]["id"], note_id)
self.assertEqual(data["data"]["title"], "Get Me")
def test_update_note(self):
create = self._call("create_note", {"title": "Original"})
note_id = create["data"]["id"]
data = self._call("update_note", {"id": note_id, "title": "Updated"})
self.assertFalse(data["error"])
self.assertEqual(data["data"]["title"], "Updated")
def test_update_note_pin(self):
create = self._call("create_note", {"title": "Pin Me"})
note_id = create["data"]["id"]
data = self._call("update_note", {"id": note_id, "pinned": True})
self.assertFalse(data["error"])
self.assertTrue(data["data"]["pinned"])
def test_delete_note(self):
create = self._call("create_note", {"title": "Delete Me"})
note_id = create["data"]["id"]
data = self._call("delete_note", {"id": note_id})
self.assertFalse(data["error"])
self.assertTrue(data["data"]["deleted"])
# Verify it's gone
from urllib.error import HTTPError
try:
get_data = self._call("get_note", {"id": note_id})
self.assertTrue(get_data["error"])
except HTTPError:
pass # 500 is also a valid failure signal
def test_pinned_notes_sort_first(self):
self._call("create_note", {"title": "Unpinned"})
self._call("create_note", {"title": "Pinned", "pinned": True})
data = self._call("list_notes")
self.assertFalse(data["error"])
self.assertEqual(data["data"]["notes"][0]["title"], "Pinned")
def test_full_lifecycle(self):
"""Create -> update -> pin -> verify -> delete over real HTTP."""
# Create
create = self._call("create_note", {"title": "Lifecycle", "content": "v1"})
note_id = create["data"]["id"]
# Update
self._call("update_note", {"id": note_id, "content": "v2"})
# Pin
self._call("update_note", {"id": note_id, "pinned": True})
# Verify
get = self._call("get_note", {"id": note_id})
self.assertEqual(get["data"]["title"], "Lifecycle")
self.assertEqual(get["data"]["content"], "v2")
self.assertTrue(get["data"]["pinned"])
# Delete
delete = self._call("delete_note", {"id": note_id})
self.assertTrue(delete["data"]["deleted"])

View File

@@ -0,0 +1,167 @@
"""
REAL integration tests for desktop system RPC functions.
These make actual HTTP requests to a running Django server.
No RequestFactory, no mocks, no shortcuts.
"""
import json
import os
import platform
from pathlib import Path
from django.test import LiveServerTestCase
from urllib.request import urlopen, Request
class RealHTTPMixin:
"""Makes real HTTP requests to the live server."""
def _session_init(self):
"""Hit /session/ to get CSRF cookie, like mizanProvider does."""
url = f"{self.live_server_url}/api/mizan/session/"
req = Request(url)
resp = urlopen(req)
# Extract csrftoken from Set-Cookie header
cookies = resp.headers.get_all("Set-Cookie") or []
for cookie in cookies:
if "csrftoken=" in cookie:
self._csrf_token = cookie.split("csrftoken=")[1].split(";")[0]
self._cookies = f"csrftoken={self._csrf_token}"
return
self._csrf_token = None
self._cookies = ""
def _call(self, fn: str, args: dict | None = None):
"""Make a real POST to /api/mizan/call/ with CSRF token."""
url = f"{self.live_server_url}/api/mizan/call/"
body = json.dumps({"fn": fn, "args": args or {}}).encode()
req = Request(url, data=body, method="POST")
req.add_header("Content-Type", "application/json")
if self._csrf_token:
req.add_header("X-CSRFToken", self._csrf_token)
if self._cookies:
req.add_header("Cookie", self._cookies)
resp = urlopen(req)
return json.loads(resp.read())
class SystemInfoTests(RealHTTPMixin, LiveServerTestCase):
"""system_info over real HTTP."""
def setUp(self):
self._session_init()
def test_system_info_returns_os_data(self):
data = self._call("system_info")
self.assertFalse(data["error"])
self.assertEqual(data["data"]["os_name"], platform.system())
self.assertEqual(data["data"]["hostname"], platform.node())
self.assertGreater(data["data"]["cpu_count"], 0)
def test_system_info_returns_paths(self):
data = self._call("system_info")
self.assertFalse(data["error"])
self.assertEqual(data["data"]["home_dir"], str(Path.home()))
self.assertEqual(data["data"]["cwd"], os.getcwd())
def test_disk_usage(self):
data = self._call("disk_usage", {"path": "/"})
self.assertFalse(data["error"])
self.assertGreater(data["data"]["total_gb"], 0)
self.assertGreater(data["data"]["free_gb"], 0)
self.assertGreaterEqual(data["data"]["percent_used"], 0)
self.assertLessEqual(data["data"]["percent_used"], 100)
def test_app_info(self):
data = self._call("app_info")
self.assertFalse(data["error"])
self.assertEqual(data["data"]["app_name"], "mizan Desktop")
self.assertGreater(data["data"]["uptime_seconds"], 0)
class FileSystemTests(RealHTTPMixin, LiveServerTestCase):
"""File system RPC over real HTTP."""
def setUp(self):
self._session_init()
self.test_dir = Path.home() / ".mizan-test"
self.test_dir.mkdir(exist_ok=True)
def tearDown(self):
import shutil
if self.test_dir.exists():
shutil.rmtree(self.test_dir)
def test_list_files_home(self):
data = self._call("list_files", {"directory": "~"})
self.assertFalse(data["error"])
self.assertEqual(data["data"]["directory"], str(Path.home()))
self.assertIsInstance(data["data"]["entries"], list)
def test_list_files_root_has_no_parent(self):
data = self._call("list_files", {"directory": "/"})
self.assertFalse(data["error"])
self.assertIsNone(data["data"]["parent"])
def test_write_and_read_file(self):
"""Full round-trip over real HTTP: write, read back, verify."""
test_path = str(self.test_dir / "test-note.txt")
test_content = "Hello from a REAL HTTP integration test!"
# Write
write_data = self._call(
"write_file", {"path": test_path, "content": test_content}
)
self.assertFalse(write_data["error"])
self.assertEqual(write_data["data"]["path"], test_path)
# Read back
read_data = self._call("read_file", {"path": test_path})
self.assertFalse(read_data["error"])
self.assertEqual(read_data["data"]["content"], test_content)
def test_write_outside_home_rejected(self):
"""Server should reject writes outside home directory."""
from urllib.error import HTTPError
try:
data = self._call(
"write_file", {"path": "/tmp/escape.txt", "content": "nope"}
)
# If we get here, check the response has an error
self.assertTrue(data["error"])
self.assertEqual(data["code"], "FORBIDDEN")
except HTTPError as e:
# 403 is also acceptable
self.assertEqual(e.code, 403)
def test_delete_file(self):
test_path = str(self.test_dir / "to-delete.txt")
(self.test_dir / "to-delete.txt").write_text("delete me")
data = self._call("delete_file", {"path": test_path})
self.assertFalse(data["error"])
self.assertTrue(data["data"]["deleted"])
self.assertFalse(Path(test_path).exists())
def test_file_entries_have_metadata(self):
(self.test_dir / "metadata-test.txt").write_text("hello")
data = self._call("list_files", {"directory": str(self.test_dir)})
self.assertFalse(data["error"])
self.assertGreater(len(data["data"]["entries"]), 0)
entry = data["data"]["entries"][0]
self.assertIn("name", entry)
self.assertIn("path", entry)
self.assertIn("is_dir", entry)
self.assertIn("size", entry)
self.assertIn("modified", entry)