Files
Ryth Azhur 6108845d99 Rename all djarea references to mizan
- djarea_clients.py → clients.py (both example apps)
- export_djarea_schema → export_mizan_schema (management command)
- djarea.spec.ts → mizan.spec.ts (playwright test)
- fetch.mjs: command name updated
- apps.py/asgi.py: import paths updated
- Removed stale generated.djarea.* artifacts
- Fixed desktop app: asgi.py import, vite config aliases, package.json dep path

373 Django tests pass. Both example apps verified running.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 11:26:19 -04:00

420 lines
10 KiB
Python

"""
Desktop RPC server functions.
Tests mizan's appropriateness for desktop apps:
- Local file system access
- SQLite CRUD
- System introspection
- Real-time channels (file watcher, app status)
- No auth required (single-user desktop)
"""
import os
import platform
import shutil
import sys
import time
from datetime import datetime
from pathlib import Path
from django.http import HttpRequest
from pydantic import BaseModel
from mizan.client import client
from mizan.channels import ReactChannel
from mizan.setup.registry import register
from mizan.channels import register as register_channel
# =============================================================================
# System Info
# =============================================================================
class SystemInfoOutput(BaseModel):
os_name: str
os_version: str
python_version: str
hostname: str
username: str
home_dir: str
cwd: str
cpu_count: int
mizan_version: str
@client(websocket=True)
def system_info(request: HttpRequest) -> SystemInfoOutput:
import mizan
return SystemInfoOutput(
os_name=platform.system(),
os_version=platform.version(),
python_version=sys.version.split()[0],
hostname=platform.node(),
username=os.getenv("USER", os.getenv("USERNAME", "unknown")),
home_dir=str(Path.home()),
cwd=os.getcwd(),
cpu_count=os.cpu_count() or 1,
mizan_version=getattr(mizan, "__version__", "dev"),
)
register(system_info, "system_info")
class DiskUsageOutput(BaseModel):
path: str
total_gb: float
used_gb: float
free_gb: float
percent_used: float
@client(websocket=True)
def disk_usage(request: HttpRequest, path: str = "/") -> DiskUsageOutput:
usage = shutil.disk_usage(path)
return DiskUsageOutput(
path=path,
total_gb=round(usage.total / (1024**3), 2),
used_gb=round(usage.used / (1024**3), 2),
free_gb=round(usage.free / (1024**3), 2),
percent_used=round(usage.used / usage.total * 100, 1),
)
register(disk_usage, "disk_usage")
# =============================================================================
# File System
# =============================================================================
class FileEntry(BaseModel):
name: str
path: str
is_dir: bool
size: int
modified: str
class ListFilesOutput(BaseModel):
directory: str
entries: list[FileEntry]
parent: str | None
@client(websocket=True)
def list_files(request: HttpRequest, directory: str = "~") -> ListFilesOutput:
dir_path = Path(directory).expanduser().resolve()
if not dir_path.is_dir():
raise ValueError(f"Not a directory: {dir_path}")
entries = []
try:
for entry in sorted(
dir_path.iterdir(), key=lambda e: (not e.is_dir(), e.name.lower())
):
try:
stat = entry.stat()
entries.append(
FileEntry(
name=entry.name,
path=str(entry),
is_dir=entry.is_dir(),
size=stat.st_size if not entry.is_dir() else 0,
modified=datetime.fromtimestamp(stat.st_mtime).isoformat(),
)
)
except (PermissionError, OSError):
continue
except PermissionError:
raise PermissionError(f"Cannot read directory: {dir_path}")
parent = str(dir_path.parent) if dir_path.parent != dir_path else None
return ListFilesOutput(
directory=str(dir_path),
entries=entries,
parent=parent,
)
register(list_files, "list_files")
class FileContentOutput(BaseModel):
path: str
content: str
size: int
modified: str
@client(websocket=True)
def read_file(request: HttpRequest, path: str) -> FileContentOutput:
file_path = Path(path).expanduser().resolve()
if not file_path.is_file():
raise FileNotFoundError(f"File not found: {file_path}")
stat = file_path.stat()
# Safety: limit to 1MB text files
if stat.st_size > 1_048_576:
raise ValueError(f"File too large: {stat.st_size} bytes (max 1MB)")
try:
content = file_path.read_text(encoding="utf-8")
except UnicodeDecodeError:
raise ValueError(f"Not a text file: {file_path}")
return FileContentOutput(
path=str(file_path),
content=content,
size=stat.st_size,
modified=datetime.fromtimestamp(stat.st_mtime).isoformat(),
)
register(read_file, "read_file")
class WriteFileOutput(BaseModel):
path: str
size: int
@client(websocket=True)
def write_file(request: HttpRequest, path: str, content: str) -> WriteFileOutput:
file_path = Path(path).expanduser().resolve()
# Safety: only allow writing within home directory
home = Path.home()
if not str(file_path).startswith(str(home)):
raise PermissionError(f"Can only write files within home directory: {home}")
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_text(content, encoding="utf-8")
return WriteFileOutput(path=str(file_path), size=len(content.encode("utf-8")))
register(write_file, "write_file")
class DeleteFileOutput(BaseModel):
path: str
deleted: bool
@client(websocket=True)
def delete_file(request: HttpRequest, path: str) -> DeleteFileOutput:
file_path = Path(path).expanduser().resolve()
home = Path.home()
if not str(file_path).startswith(str(home)):
raise PermissionError(f"Can only delete files within home directory: {home}")
if file_path.exists():
file_path.unlink()
return DeleteFileOutput(path=str(file_path), deleted=True)
return DeleteFileOutput(path=str(file_path), deleted=False)
register(delete_file, "delete_file")
# =============================================================================
# Notes CRUD (SQLite)
# =============================================================================
class NoteOutput(BaseModel):
id: int
title: str
content: str
pinned: bool
created_at: str
updated_at: str
class NoteListOutput(BaseModel):
notes: list[NoteOutput]
count: int
def _note_to_output(note) -> NoteOutput:
return NoteOutput(
id=note.id,
title=note.title,
content=note.content,
pinned=note.pinned,
created_at=note.created_at.isoformat(),
updated_at=note.updated_at.isoformat(),
)
@client(websocket=True)
def list_notes(request: HttpRequest) -> NoteListOutput:
from backend.models import Note
notes = Note.objects.all()
return NoteListOutput(
notes=[_note_to_output(n) for n in notes],
count=notes.count(),
)
register(list_notes, "list_notes")
@client(websocket=True)
def create_note(
request: HttpRequest, title: str, content: str = "", pinned: bool = False
) -> NoteOutput:
from backend.models import Note
note = Note.objects.create(title=title, content=content, pinned=pinned)
return _note_to_output(note)
register(create_note, "create_note")
@client(websocket=True)
def get_note(request: HttpRequest, id: int) -> NoteOutput:
from backend.models import Note
try:
note = Note.objects.get(pk=id)
except Note.DoesNotExist:
raise ValueError(f"Note {id} not found")
return _note_to_output(note)
register(get_note, "get_note")
@client(websocket=True)
def update_note(
request: HttpRequest,
id: int,
title: str | None = None,
content: str | None = None,
pinned: bool | None = None,
) -> NoteOutput:
from backend.models import Note
try:
note = Note.objects.get(pk=id)
except Note.DoesNotExist:
raise ValueError(f"Note {id} not found")
if title is not None:
note.title = title
if content is not None:
note.content = content
if pinned is not None:
note.pinned = pinned
note.save()
return _note_to_output(note)
register(update_note, "update_note")
class DeleteNoteOutput(BaseModel):
id: int
deleted: bool
@client(websocket=True)
def delete_note(request: HttpRequest, id: int) -> DeleteNoteOutput:
from backend.models import Note
try:
note = Note.objects.get(pk=id)
note.delete()
return DeleteNoteOutput(id=id, deleted=True)
except Note.DoesNotExist:
return DeleteNoteOutput(id=id, deleted=False)
register(delete_note, "delete_note")
# =============================================================================
# Channels — Real-time Desktop Events
# =============================================================================
class AppStatusChannel(ReactChannel):
"""Push app status updates to the UI (uptime, memory, etc.)."""
class DjangoMessage(BaseModel):
uptime_seconds: float
memory_mb: float
note_count: int
timestamp: str
def authorize(self, params=None):
return True # Desktop app, no auth needed
def group(self, params=None):
return "app_status"
register_channel(AppStatusChannel, "app_status")
class NotesChannel(ReactChannel):
"""Push notifications when notes are modified."""
class DjangoMessage(BaseModel):
action: str # "created", "updated", "deleted"
note_id: int
title: str
def authorize(self, params=None):
return True
def group(self, params=None):
return "notes_updates"
register_channel(NotesChannel, "notes_updates")
# =============================================================================
# App Lifecycle
# =============================================================================
_start_time = time.time()
class AppInfoOutput(BaseModel):
app_name: str
uptime_seconds: float
db_path: str
pid: int
@client(websocket=True)
def app_info(request: HttpRequest) -> AppInfoOutput:
from django.conf import settings
return AppInfoOutput(
app_name="mizan Desktop",
uptime_seconds=round(time.time() - _start_time, 2),
db_path=str(settings.DATABASES["default"]["NAME"]),
pid=os.getpid(),
)
register(app_info, "app_info")