Rename the package from djarea to mizan across the entire codebase — Python package, React library, generators, tests, and examples. Fix JSX/hook casing (MizanProvider, useMizan, etc.) that broke when the original PascalCase names were lowercased during the rename. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
420 lines
10 KiB
Python
420 lines
10 KiB
Python
"""
|
|
Desktop RPC server functions.
|
|
|
|
Tests mizan's appropriateness for desktop apps:
|
|
- Local file system access
|
|
- SQLite CRUD
|
|
- System introspection
|
|
- Real-time channels (file watcher, app status)
|
|
- No auth required (single-user desktop)
|
|
"""
|
|
|
|
import os
|
|
import platform
|
|
import shutil
|
|
import sys
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
from django.http import HttpRequest
|
|
from pydantic import BaseModel
|
|
|
|
from mizan.client import client
|
|
from mizan.channels import ReactChannel
|
|
from mizan.setup.registry import register
|
|
from mizan.channels import register as register_channel
|
|
|
|
|
|
# =============================================================================
|
|
# System Info
|
|
# =============================================================================
|
|
|
|
|
|
class SystemInfoOutput(BaseModel):
|
|
os_name: str
|
|
os_version: str
|
|
python_version: str
|
|
hostname: str
|
|
username: str
|
|
home_dir: str
|
|
cwd: str
|
|
cpu_count: int
|
|
mizan_version: str
|
|
|
|
|
|
@client(websocket=True)
|
|
def system_info(request: HttpRequest) -> SystemInfoOutput:
|
|
import mizan
|
|
|
|
return SystemInfoOutput(
|
|
os_name=platform.system(),
|
|
os_version=platform.version(),
|
|
python_version=sys.version.split()[0],
|
|
hostname=platform.node(),
|
|
username=os.getenv("USER", os.getenv("USERNAME", "unknown")),
|
|
home_dir=str(Path.home()),
|
|
cwd=os.getcwd(),
|
|
cpu_count=os.cpu_count() or 1,
|
|
mizan_version=getattr(mizan, "__version__", "dev"),
|
|
)
|
|
|
|
|
|
register(system_info, "system_info")
|
|
|
|
|
|
class DiskUsageOutput(BaseModel):
|
|
path: str
|
|
total_gb: float
|
|
used_gb: float
|
|
free_gb: float
|
|
percent_used: float
|
|
|
|
|
|
@client(websocket=True)
|
|
def disk_usage(request: HttpRequest, path: str = "/") -> DiskUsageOutput:
|
|
usage = shutil.disk_usage(path)
|
|
return DiskUsageOutput(
|
|
path=path,
|
|
total_gb=round(usage.total / (1024**3), 2),
|
|
used_gb=round(usage.used / (1024**3), 2),
|
|
free_gb=round(usage.free / (1024**3), 2),
|
|
percent_used=round(usage.used / usage.total * 100, 1),
|
|
)
|
|
|
|
|
|
register(disk_usage, "disk_usage")
|
|
|
|
|
|
# =============================================================================
|
|
# File System
|
|
# =============================================================================
|
|
|
|
|
|
class FileEntry(BaseModel):
|
|
name: str
|
|
path: str
|
|
is_dir: bool
|
|
size: int
|
|
modified: str
|
|
|
|
|
|
class ListFilesOutput(BaseModel):
|
|
directory: str
|
|
entries: list[FileEntry]
|
|
parent: str | None
|
|
|
|
|
|
@client(websocket=True)
|
|
def list_files(request: HttpRequest, directory: str = "~") -> ListFilesOutput:
|
|
dir_path = Path(directory).expanduser().resolve()
|
|
|
|
if not dir_path.is_dir():
|
|
raise ValueError(f"Not a directory: {dir_path}")
|
|
|
|
entries = []
|
|
try:
|
|
for entry in sorted(
|
|
dir_path.iterdir(), key=lambda e: (not e.is_dir(), e.name.lower())
|
|
):
|
|
try:
|
|
stat = entry.stat()
|
|
entries.append(
|
|
FileEntry(
|
|
name=entry.name,
|
|
path=str(entry),
|
|
is_dir=entry.is_dir(),
|
|
size=stat.st_size if not entry.is_dir() else 0,
|
|
modified=datetime.fromtimestamp(stat.st_mtime).isoformat(),
|
|
)
|
|
)
|
|
except (PermissionError, OSError):
|
|
continue
|
|
except PermissionError:
|
|
raise PermissionError(f"Cannot read directory: {dir_path}")
|
|
|
|
parent = str(dir_path.parent) if dir_path.parent != dir_path else None
|
|
|
|
return ListFilesOutput(
|
|
directory=str(dir_path),
|
|
entries=entries,
|
|
parent=parent,
|
|
)
|
|
|
|
|
|
register(list_files, "list_files")
|
|
|
|
|
|
class FileContentOutput(BaseModel):
|
|
path: str
|
|
content: str
|
|
size: int
|
|
modified: str
|
|
|
|
|
|
@client(websocket=True)
|
|
def read_file(request: HttpRequest, path: str) -> FileContentOutput:
|
|
file_path = Path(path).expanduser().resolve()
|
|
|
|
if not file_path.is_file():
|
|
raise FileNotFoundError(f"File not found: {file_path}")
|
|
|
|
stat = file_path.stat()
|
|
|
|
# Safety: limit to 1MB text files
|
|
if stat.st_size > 1_048_576:
|
|
raise ValueError(f"File too large: {stat.st_size} bytes (max 1MB)")
|
|
|
|
try:
|
|
content = file_path.read_text(encoding="utf-8")
|
|
except UnicodeDecodeError:
|
|
raise ValueError(f"Not a text file: {file_path}")
|
|
|
|
return FileContentOutput(
|
|
path=str(file_path),
|
|
content=content,
|
|
size=stat.st_size,
|
|
modified=datetime.fromtimestamp(stat.st_mtime).isoformat(),
|
|
)
|
|
|
|
|
|
register(read_file, "read_file")
|
|
|
|
|
|
class WriteFileOutput(BaseModel):
|
|
path: str
|
|
size: int
|
|
|
|
|
|
@client(websocket=True)
|
|
def write_file(request: HttpRequest, path: str, content: str) -> WriteFileOutput:
|
|
file_path = Path(path).expanduser().resolve()
|
|
|
|
# Safety: only allow writing within home directory
|
|
home = Path.home()
|
|
if not str(file_path).startswith(str(home)):
|
|
raise PermissionError(f"Can only write files within home directory: {home}")
|
|
|
|
file_path.parent.mkdir(parents=True, exist_ok=True)
|
|
file_path.write_text(content, encoding="utf-8")
|
|
|
|
return WriteFileOutput(path=str(file_path), size=len(content.encode("utf-8")))
|
|
|
|
|
|
register(write_file, "write_file")
|
|
|
|
|
|
class DeleteFileOutput(BaseModel):
|
|
path: str
|
|
deleted: bool
|
|
|
|
|
|
@client(websocket=True)
|
|
def delete_file(request: HttpRequest, path: str) -> DeleteFileOutput:
|
|
file_path = Path(path).expanduser().resolve()
|
|
|
|
home = Path.home()
|
|
if not str(file_path).startswith(str(home)):
|
|
raise PermissionError(f"Can only delete files within home directory: {home}")
|
|
|
|
if file_path.exists():
|
|
file_path.unlink()
|
|
return DeleteFileOutput(path=str(file_path), deleted=True)
|
|
|
|
return DeleteFileOutput(path=str(file_path), deleted=False)
|
|
|
|
|
|
register(delete_file, "delete_file")
|
|
|
|
|
|
# =============================================================================
|
|
# Notes CRUD (SQLite)
|
|
# =============================================================================
|
|
|
|
|
|
class NoteOutput(BaseModel):
|
|
id: int
|
|
title: str
|
|
content: str
|
|
pinned: bool
|
|
created_at: str
|
|
updated_at: str
|
|
|
|
|
|
class NoteListOutput(BaseModel):
|
|
notes: list[NoteOutput]
|
|
count: int
|
|
|
|
|
|
def _note_to_output(note) -> NoteOutput:
|
|
return NoteOutput(
|
|
id=note.id,
|
|
title=note.title,
|
|
content=note.content,
|
|
pinned=note.pinned,
|
|
created_at=note.created_at.isoformat(),
|
|
updated_at=note.updated_at.isoformat(),
|
|
)
|
|
|
|
|
|
@client(websocket=True)
|
|
def list_notes(request: HttpRequest) -> NoteListOutput:
|
|
from backend.models import Note
|
|
|
|
notes = Note.objects.all()
|
|
return NoteListOutput(
|
|
notes=[_note_to_output(n) for n in notes],
|
|
count=notes.count(),
|
|
)
|
|
|
|
|
|
register(list_notes, "list_notes")
|
|
|
|
|
|
@client(websocket=True)
|
|
def create_note(
|
|
request: HttpRequest, title: str, content: str = "", pinned: bool = False
|
|
) -> NoteOutput:
|
|
from backend.models import Note
|
|
|
|
note = Note.objects.create(title=title, content=content, pinned=pinned)
|
|
return _note_to_output(note)
|
|
|
|
|
|
register(create_note, "create_note")
|
|
|
|
|
|
@client(websocket=True)
|
|
def get_note(request: HttpRequest, id: int) -> NoteOutput:
|
|
from backend.models import Note
|
|
|
|
try:
|
|
note = Note.objects.get(pk=id)
|
|
except Note.DoesNotExist:
|
|
raise ValueError(f"Note {id} not found")
|
|
|
|
return _note_to_output(note)
|
|
|
|
|
|
register(get_note, "get_note")
|
|
|
|
|
|
@client(websocket=True)
|
|
def update_note(
|
|
request: HttpRequest,
|
|
id: int,
|
|
title: str | None = None,
|
|
content: str | None = None,
|
|
pinned: bool | None = None,
|
|
) -> NoteOutput:
|
|
from backend.models import Note
|
|
|
|
try:
|
|
note = Note.objects.get(pk=id)
|
|
except Note.DoesNotExist:
|
|
raise ValueError(f"Note {id} not found")
|
|
|
|
if title is not None:
|
|
note.title = title
|
|
if content is not None:
|
|
note.content = content
|
|
if pinned is not None:
|
|
note.pinned = pinned
|
|
|
|
note.save()
|
|
return _note_to_output(note)
|
|
|
|
|
|
register(update_note, "update_note")
|
|
|
|
|
|
class DeleteNoteOutput(BaseModel):
|
|
id: int
|
|
deleted: bool
|
|
|
|
|
|
@client(websocket=True)
|
|
def delete_note(request: HttpRequest, id: int) -> DeleteNoteOutput:
|
|
from backend.models import Note
|
|
|
|
try:
|
|
note = Note.objects.get(pk=id)
|
|
note.delete()
|
|
return DeleteNoteOutput(id=id, deleted=True)
|
|
except Note.DoesNotExist:
|
|
return DeleteNoteOutput(id=id, deleted=False)
|
|
|
|
|
|
register(delete_note, "delete_note")
|
|
|
|
|
|
# =============================================================================
|
|
# Channels — Real-time Desktop Events
|
|
# =============================================================================
|
|
|
|
|
|
class AppStatusChannel(ReactChannel):
|
|
"""Push app status updates to the UI (uptime, memory, etc.)."""
|
|
|
|
class DjangoMessage(BaseModel):
|
|
uptime_seconds: float
|
|
memory_mb: float
|
|
note_count: int
|
|
timestamp: str
|
|
|
|
def authorize(self, params=None):
|
|
return True # Desktop app, no auth needed
|
|
|
|
def group(self, params=None):
|
|
return "app_status"
|
|
|
|
|
|
register_channel(AppStatusChannel, "app_status")
|
|
|
|
|
|
class NotesChannel(ReactChannel):
|
|
"""Push notifications when notes are modified."""
|
|
|
|
class DjangoMessage(BaseModel):
|
|
action: str # "created", "updated", "deleted"
|
|
note_id: int
|
|
title: str
|
|
|
|
def authorize(self, params=None):
|
|
return True
|
|
|
|
def group(self, params=None):
|
|
return "notes_updates"
|
|
|
|
|
|
register_channel(NotesChannel, "notes_updates")
|
|
|
|
|
|
# =============================================================================
|
|
# App Lifecycle
|
|
# =============================================================================
|
|
|
|
_start_time = time.time()
|
|
|
|
|
|
class AppInfoOutput(BaseModel):
|
|
app_name: str
|
|
uptime_seconds: float
|
|
db_path: str
|
|
pid: int
|
|
|
|
|
|
@client(websocket=True)
|
|
def app_info(request: HttpRequest) -> AppInfoOutput:
|
|
from django.conf import settings
|
|
|
|
return AppInfoOutput(
|
|
app_name="mizan Desktop",
|
|
uptime_seconds=round(time.time() - _start_time, 2),
|
|
db_path=str(settings.DATABASES["default"]["NAME"]),
|
|
pid=os.getpid(),
|
|
)
|
|
|
|
|
|
register(app_info, "app_info")
|