Test infrastructure: - Django standalone test runner (pytest-django, test settings, EmailUser model) - React unit tests via Vitest with jsdom, jest compat layer, path aliases - Playwright E2E tests using generated hooks in a real Chromium browser - Docker Compose test backend (Django + Redis) for integration testing - Desktop integration test app (PyWebView + Django + uvicorn) - Makefile with test/test-django/test-react/test-integration targets Library bugs found and fixed: - hasJWT truthiness: undefined !== null was true, skipping session init - process.env crash: CSR client referenced process.env in non-Node browsers - baseUrl not forwarded: DjareaProvider didn't pass baseUrl to CSR client - Relative URL handling: new URL() failed with relative base paths - call() race condition: HTTP requests fired before CSRF cookie was set - Session init await: added sessionRef promise so call() waits for session - path_prefix on schema export: both export commands failed with URL reverse - NullBooleanField removed: referenced field doesn't exist in Django 5.0+ - lru_cache on JWT settings: get_settings() now cached as intended - Channel message routing: broadcasts now include channel name and params - httpFunctionCall: fixed URL and request body format Generator fixes: - Removed 1,100 lines of REST/OpenAPI client generation (not part of Djarea) - Generator now works for djarea-only projects without django-ninja REST APIs - Generated DjangoContext now includes ChannelProvider when channels exist - Fixed env var passthrough for schema export commands - Deduplicated fetch logic into single runDjangoCommand helper Test quality: - Fixed 33 tautological Django tests with real assertions - Found hidden bug: benchmark functions were never registered - Found hidden bug: unicode lookalike test used plain ASCII - Deleted worthless React unit tests (duplicates, shape checks, Zod-tests-Zod) - Replaced jsdom integration tests with Playwright browser tests Example apps: - example/: Integration test backend with 33 server functions, 5 forms, 4 channels covering auth variations, contexts, class-based ServerFunction, error codes, DjareaFormMixin, formsets, and JWT - desktop/: PyWebView desktop app with file system access, SQLite CRUD, system introspection, and 39 real HTTP integration tests Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1171 lines
36 KiB
Python
1171 lines
36 KiB
Python
"""
|
|
Tests for djarea.channels module.
|
|
"""
|
|
|
|
import json
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
from django.test import TestCase
|
|
from django.contrib.auth import get_user_model
|
|
from pydantic import BaseModel
|
|
|
|
from djarea.channels import (
|
|
ReactChannel,
|
|
register,
|
|
get_channel,
|
|
get_registered_channels,
|
|
get_channels_schema,
|
|
_registry,
|
|
)
|
|
|
|
|
|
User = get_user_model()
|
|
|
|
|
|
# =============================================================================
|
|
# Test Fixtures
|
|
# =============================================================================
|
|
|
|
class MockUser:
|
|
"""Mock user for testing."""
|
|
def __init__(self, is_authenticated=True, email="test@example.com"):
|
|
self.is_authenticated = is_authenticated
|
|
self.email = email
|
|
|
|
|
|
class MockAnonymousUser:
|
|
"""Mock anonymous user."""
|
|
is_authenticated = False
|
|
email = ""
|
|
|
|
|
|
# =============================================================================
|
|
# ReactChannel Base Class Tests
|
|
# =============================================================================
|
|
|
|
class ReactChannelBaseTests(TestCase):
|
|
"""Tests for ReactChannel base class."""
|
|
|
|
def test_react_channel_default_class_vars(self):
|
|
"""ReactChannel should have None defaults for nested classes."""
|
|
self.assertIsNone(ReactChannel.Params)
|
|
self.assertIsNone(ReactChannel.ReactMessage)
|
|
self.assertIsNone(ReactChannel.DjangoMessage)
|
|
|
|
def test_react_channel_requires_authorize_override(self):
|
|
"""ReactChannel subclass must override authorize()."""
|
|
|
|
class IncompleteChannel(ReactChannel):
|
|
pass
|
|
|
|
channel = IncompleteChannel()
|
|
channel.user = MockUser()
|
|
|
|
with self.assertRaises(NotImplementedError) as ctx:
|
|
channel.authorize()
|
|
|
|
self.assertIn("must implement authorize()", str(ctx.exception))
|
|
|
|
def test_react_channel_requires_group_override(self):
|
|
"""ReactChannel subclass must override group()."""
|
|
|
|
class IncompleteChannel(ReactChannel):
|
|
def authorize(self, params=None):
|
|
return True
|
|
|
|
channel = IncompleteChannel()
|
|
channel.user = MockUser()
|
|
|
|
with self.assertRaises(NotImplementedError) as ctx:
|
|
channel.group()
|
|
|
|
self.assertIn("must implement group()", str(ctx.exception))
|
|
|
|
def test_react_channel_receive_default(self):
|
|
"""ReactChannel.receive() should return None by default."""
|
|
|
|
class BasicChannel(ReactChannel):
|
|
def authorize(self, params=None):
|
|
return True
|
|
|
|
def group(self, params=None):
|
|
return "test-group"
|
|
|
|
channel = BasicChannel()
|
|
result = channel.receive(None, MagicMock())
|
|
|
|
self.assertIsNone(result)
|
|
|
|
def test_react_channel_init_creates_empty_groups(self):
|
|
"""ReactChannel.__init__() should create empty _groups set."""
|
|
|
|
class TestChannel(ReactChannel):
|
|
def authorize(self, params=None):
|
|
return True
|
|
|
|
def group(self, params=None):
|
|
return "test"
|
|
|
|
channel = TestChannel()
|
|
|
|
self.assertIsInstance(channel._groups, set)
|
|
self.assertEqual(len(channel._groups), 0)
|
|
|
|
|
|
# =============================================================================
|
|
# Channel with Typed Messages Tests
|
|
# =============================================================================
|
|
|
|
class TypedMessagesTests(TestCase):
|
|
"""Tests for channels with Pydantic message types."""
|
|
|
|
def test_channel_with_params(self):
|
|
"""Channel should accept Params Pydantic model."""
|
|
|
|
class ParamsChannel(ReactChannel):
|
|
class Params(BaseModel):
|
|
room: str
|
|
limit: int = 10
|
|
|
|
def authorize(self, params):
|
|
return True
|
|
|
|
def group(self, params):
|
|
return f"room_{params.room}"
|
|
|
|
self.assertIsNotNone(ParamsChannel.Params)
|
|
|
|
# Test params model
|
|
params = ParamsChannel.Params(room="general")
|
|
self.assertEqual(params.room, "general")
|
|
self.assertEqual(params.limit, 10)
|
|
|
|
def test_channel_with_react_message(self):
|
|
"""Channel should accept ReactMessage Pydantic model."""
|
|
|
|
class MessageChannel(ReactChannel):
|
|
class ReactMessage(BaseModel):
|
|
text: str
|
|
timestamp: int
|
|
|
|
def authorize(self, params=None):
|
|
return True
|
|
|
|
def group(self, params=None):
|
|
return "messages"
|
|
|
|
self.assertIsNotNone(MessageChannel.ReactMessage)
|
|
|
|
# Test message model
|
|
msg = MessageChannel.ReactMessage(text="Hello", timestamp=12345)
|
|
self.assertEqual(msg.text, "Hello")
|
|
self.assertEqual(msg.timestamp, 12345)
|
|
|
|
def test_channel_with_django_message(self):
|
|
"""Channel should accept DjangoMessage Pydantic model."""
|
|
|
|
class BroadcastChannel(ReactChannel):
|
|
class DjangoMessage(BaseModel):
|
|
user: str
|
|
text: str
|
|
created_at: str
|
|
|
|
def authorize(self, params=None):
|
|
return True
|
|
|
|
def group(self, params=None):
|
|
return "broadcast"
|
|
|
|
self.assertIsNotNone(BroadcastChannel.DjangoMessage)
|
|
|
|
# Test message model
|
|
msg = BroadcastChannel.DjangoMessage(
|
|
user="john",
|
|
text="Hello world",
|
|
created_at="2024-01-15T10:00:00Z"
|
|
)
|
|
self.assertEqual(msg.user, "john")
|
|
self.assertEqual(msg.text, "Hello world")
|
|
|
|
def test_channel_receive_with_typed_messages(self):
|
|
"""Channel.receive() should work with typed messages."""
|
|
|
|
class ChatChannel(ReactChannel):
|
|
class Params(BaseModel):
|
|
room: str
|
|
|
|
class ReactMessage(BaseModel):
|
|
text: str
|
|
|
|
class DjangoMessage(BaseModel):
|
|
user: str
|
|
text: str
|
|
|
|
def authorize(self, params):
|
|
return True
|
|
|
|
def group(self, params):
|
|
return f"chat_{params.room}"
|
|
|
|
def receive(self, params, msg):
|
|
return self.DjangoMessage(
|
|
user=self.user.email,
|
|
text=msg.text
|
|
)
|
|
|
|
channel = ChatChannel()
|
|
channel.user = MockUser(email="test@example.com")
|
|
|
|
params = ChatChannel.Params(room="general")
|
|
incoming = ChatChannel.ReactMessage(text="Hello!")
|
|
|
|
result = channel.receive(params, incoming)
|
|
|
|
self.assertIsInstance(result, ChatChannel.DjangoMessage)
|
|
self.assertEqual(result.user, "test@example.com")
|
|
self.assertEqual(result.text, "Hello!")
|
|
|
|
|
|
# =============================================================================
|
|
# Registration Tests
|
|
# =============================================================================
|
|
|
|
class RegistrationTests(TestCase):
|
|
"""Tests for channel registration."""
|
|
|
|
def setUp(self):
|
|
self._original_registry = dict(_registry)
|
|
|
|
def tearDown(self):
|
|
_registry.clear()
|
|
_registry.update(self._original_registry)
|
|
|
|
def test_register_adds_to_registry(self):
|
|
"""register() should add channel to registry."""
|
|
|
|
class TestChannel(ReactChannel):
|
|
def authorize(self, params=None):
|
|
return True
|
|
|
|
def group(self, params=None):
|
|
return "test"
|
|
|
|
register(TestChannel, "test-channel")
|
|
|
|
self.assertIn("test-channel", _registry)
|
|
self.assertEqual(_registry["test-channel"], TestChannel)
|
|
|
|
def test_register_duplicate_raises(self):
|
|
"""register() should raise on duplicate name."""
|
|
|
|
class Channel1(ReactChannel):
|
|
def authorize(self, params=None):
|
|
return True
|
|
|
|
def group(self, params=None):
|
|
return "test"
|
|
|
|
class Channel2(ReactChannel):
|
|
def authorize(self, params=None):
|
|
return True
|
|
|
|
def group(self, params=None):
|
|
return "test"
|
|
|
|
register(Channel1, "duplicate")
|
|
|
|
with self.assertRaises(ValueError) as ctx:
|
|
register(Channel2, "duplicate")
|
|
|
|
self.assertIn("already registered", str(ctx.exception))
|
|
|
|
def test_register_validates_authorize(self):
|
|
"""register() should validate that authorize method exists."""
|
|
|
|
class NoAuthorizeChannel(ReactChannel):
|
|
pass
|
|
|
|
# Should still pass because ReactChannel has authorize
|
|
# (just raises NotImplementedError when called)
|
|
register(NoAuthorizeChannel, "no-authorize-test")
|
|
self.assertIn("no-authorize-test", _registry)
|
|
|
|
def test_get_channel_returns_registered(self):
|
|
"""get_channel() should return registered channel."""
|
|
|
|
class MyChannel(ReactChannel):
|
|
def authorize(self, params=None):
|
|
return True
|
|
|
|
def group(self, params=None):
|
|
return "my"
|
|
|
|
register(MyChannel, "my-channel")
|
|
|
|
result = get_channel("my-channel")
|
|
|
|
self.assertEqual(result, MyChannel)
|
|
|
|
def test_get_channel_returns_none_for_unknown(self):
|
|
"""get_channel() should return None for unknown name."""
|
|
result = get_channel("nonexistent-channel")
|
|
|
|
self.assertIsNone(result)
|
|
|
|
def test_get_registered_channels_returns_copy(self):
|
|
"""get_registered_channels() should return a copy of registry."""
|
|
|
|
class TestChannel(ReactChannel):
|
|
def authorize(self, params=None):
|
|
return True
|
|
|
|
def group(self, params=None):
|
|
return "test"
|
|
|
|
register(TestChannel, "copy-test")
|
|
|
|
result = get_registered_channels()
|
|
|
|
# Modifying result shouldn't affect original
|
|
result["modified"] = "test"
|
|
|
|
self.assertIn("copy-test", _registry)
|
|
self.assertNotIn("modified", _registry)
|
|
|
|
|
|
# =============================================================================
|
|
# Schema Export Tests
|
|
# =============================================================================
|
|
|
|
class SchemaExportTests(TestCase):
|
|
"""Tests for channel schema export."""
|
|
|
|
def setUp(self):
|
|
self._original_registry = dict(_registry)
|
|
|
|
def tearDown(self):
|
|
_registry.clear()
|
|
_registry.update(self._original_registry)
|
|
|
|
def test_get_channels_schema_empty(self):
|
|
"""get_channels_schema() should return empty channels for empty registry."""
|
|
_registry.clear()
|
|
|
|
schema = get_channels_schema()
|
|
|
|
self.assertIn("channels", schema)
|
|
self.assertEqual(schema["channels"], {})
|
|
|
|
def test_get_channels_schema_with_basic_channel(self):
|
|
"""get_channels_schema() should include basic channel info."""
|
|
|
|
class BasicChannel(ReactChannel):
|
|
def authorize(self, params=None):
|
|
return True
|
|
|
|
def group(self, params=None):
|
|
return "basic"
|
|
|
|
register(BasicChannel, "basic")
|
|
|
|
schema = get_channels_schema()
|
|
|
|
self.assertIn("basic", schema["channels"])
|
|
channel_schema = schema["channels"]["basic"]
|
|
|
|
self.assertEqual(channel_schema["name"], "basic")
|
|
self.assertIsNone(channel_schema["params"])
|
|
self.assertIsNone(channel_schema["reactMessage"])
|
|
self.assertIsNone(channel_schema["djangoMessage"])
|
|
|
|
def test_get_channels_schema_with_params(self):
|
|
"""get_channels_schema() should include params schema."""
|
|
|
|
class ParamsChannel(ReactChannel):
|
|
class Params(BaseModel):
|
|
room: str
|
|
limit: int = 50
|
|
|
|
def authorize(self, params):
|
|
return True
|
|
|
|
def group(self, params):
|
|
return f"room_{params.room}"
|
|
|
|
register(ParamsChannel, "params-channel")
|
|
|
|
schema = get_channels_schema()
|
|
|
|
channel_schema = schema["channels"]["params-channel"]
|
|
|
|
self.assertIsNotNone(channel_schema["params"])
|
|
self.assertIn("properties", channel_schema["params"])
|
|
self.assertIn("room", channel_schema["params"]["properties"])
|
|
self.assertIn("limit", channel_schema["params"]["properties"])
|
|
|
|
def test_get_channels_schema_with_messages(self):
|
|
"""get_channels_schema() should include message schemas."""
|
|
|
|
class FullChannel(ReactChannel):
|
|
class Params(BaseModel):
|
|
channel_id: int
|
|
|
|
class ReactMessage(BaseModel):
|
|
text: str
|
|
|
|
class DjangoMessage(BaseModel):
|
|
user: str
|
|
text: str
|
|
timestamp: str
|
|
|
|
def authorize(self, params):
|
|
return True
|
|
|
|
def group(self, params):
|
|
return f"channel_{params.channel_id}"
|
|
|
|
register(FullChannel, "full-channel")
|
|
|
|
schema = get_channels_schema()
|
|
|
|
channel_schema = schema["channels"]["full-channel"]
|
|
|
|
# Check params
|
|
self.assertIsNotNone(channel_schema["params"])
|
|
self.assertIn("channel_id", channel_schema["params"]["properties"])
|
|
|
|
# Check ReactMessage
|
|
self.assertIsNotNone(channel_schema["reactMessage"])
|
|
self.assertIn("text", channel_schema["reactMessage"]["properties"])
|
|
|
|
# Check DjangoMessage
|
|
self.assertIsNotNone(channel_schema["djangoMessage"])
|
|
self.assertIn("user", channel_schema["djangoMessage"]["properties"])
|
|
self.assertIn("text", channel_schema["djangoMessage"]["properties"])
|
|
self.assertIn("timestamp", channel_schema["djangoMessage"]["properties"])
|
|
|
|
def test_get_channels_schema_multiple_channels(self):
|
|
"""get_channels_schema() should include all registered channels."""
|
|
|
|
class Channel1(ReactChannel):
|
|
class Params(BaseModel):
|
|
id: int
|
|
|
|
def authorize(self, params):
|
|
return True
|
|
|
|
def group(self, params):
|
|
return f"c1_{params.id}"
|
|
|
|
class Channel2(ReactChannel):
|
|
def authorize(self, params=None):
|
|
return True
|
|
|
|
def group(self, params=None):
|
|
return "c2"
|
|
|
|
register(Channel1, "channel-one")
|
|
register(Channel2, "channel-two")
|
|
|
|
schema = get_channels_schema()
|
|
|
|
self.assertIn("channel-one", schema["channels"])
|
|
self.assertIn("channel-two", schema["channels"])
|
|
|
|
# Channel 1 has params
|
|
self.assertIsNotNone(schema["channels"]["channel-one"]["params"])
|
|
|
|
# Channel 2 has no params
|
|
self.assertIsNone(schema["channels"]["channel-two"]["params"])
|
|
|
|
|
|
# =============================================================================
|
|
# Authorization Tests
|
|
# =============================================================================
|
|
|
|
class AuthorizationTests(TestCase):
|
|
"""Tests for channel authorization."""
|
|
|
|
def test_authorize_with_authenticated_user(self):
|
|
"""authorize() should work with authenticated users."""
|
|
|
|
class AuthChannel(ReactChannel):
|
|
def authorize(self, params=None):
|
|
return self.user.is_authenticated
|
|
|
|
def group(self, params=None):
|
|
return "auth"
|
|
|
|
channel = AuthChannel()
|
|
channel.user = MockUser(is_authenticated=True)
|
|
|
|
self.assertTrue(channel.authorize())
|
|
|
|
def test_authorize_with_anonymous_user(self):
|
|
"""authorize() should work with anonymous users."""
|
|
|
|
class AuthChannel(ReactChannel):
|
|
def authorize(self, params=None):
|
|
return self.user.is_authenticated
|
|
|
|
def group(self, params=None):
|
|
return "auth"
|
|
|
|
channel = AuthChannel()
|
|
channel.user = MockAnonymousUser()
|
|
|
|
self.assertFalse(channel.authorize())
|
|
|
|
def test_authorize_with_params(self):
|
|
"""authorize() should have access to params."""
|
|
|
|
class RoomChannel(ReactChannel):
|
|
class Params(BaseModel):
|
|
room: str
|
|
|
|
def authorize(self, params):
|
|
# Only allow 'public' room for all users
|
|
return params.room == "public" or self.user.is_authenticated
|
|
|
|
def group(self, params):
|
|
return f"room_{params.room}"
|
|
|
|
channel = RoomChannel()
|
|
channel.user = MockAnonymousUser()
|
|
|
|
public_params = RoomChannel.Params(room="public")
|
|
private_params = RoomChannel.Params(room="private")
|
|
|
|
self.assertTrue(channel.authorize(public_params))
|
|
self.assertFalse(channel.authorize(private_params))
|
|
|
|
|
|
# =============================================================================
|
|
# Group Tests
|
|
# =============================================================================
|
|
|
|
class GroupTests(TestCase):
|
|
"""Tests for channel group management."""
|
|
|
|
def test_group_returns_string(self):
|
|
"""group() should return a string group name."""
|
|
|
|
class TestChannel(ReactChannel):
|
|
def authorize(self, params=None):
|
|
return True
|
|
|
|
def group(self, params=None):
|
|
return "test-group"
|
|
|
|
channel = TestChannel()
|
|
|
|
self.assertEqual(channel.group(), "test-group")
|
|
|
|
def test_group_with_params(self):
|
|
"""group() should use params for dynamic group names."""
|
|
|
|
class RoomChannel(ReactChannel):
|
|
class Params(BaseModel):
|
|
room_id: int
|
|
|
|
def authorize(self, params):
|
|
return True
|
|
|
|
def group(self, params):
|
|
return f"room_{params.room_id}"
|
|
|
|
channel = RoomChannel()
|
|
|
|
params1 = RoomChannel.Params(room_id=1)
|
|
params2 = RoomChannel.Params(room_id=42)
|
|
|
|
self.assertEqual(channel.group(params1), "room_1")
|
|
self.assertEqual(channel.group(params2), "room_42")
|
|
|
|
|
|
# =============================================================================
|
|
# Async Methods Tests
|
|
# =============================================================================
|
|
|
|
class AsyncMethodsTests(TestCase):
|
|
"""Tests for async internal methods."""
|
|
|
|
def test_join_group_adds_to_groups_set(self):
|
|
"""_join_group() should add group to _groups set."""
|
|
import asyncio
|
|
|
|
class TestChannel(ReactChannel):
|
|
def authorize(self, params=None):
|
|
return True
|
|
|
|
def group(self, params=None):
|
|
return "test"
|
|
|
|
channel = TestChannel()
|
|
channel._channel_layer = AsyncMock()
|
|
channel._channel_name = "test-channel-name"
|
|
|
|
async def test():
|
|
await channel._join_group("my-group")
|
|
return channel._groups
|
|
|
|
groups = asyncio.get_event_loop().run_until_complete(test())
|
|
|
|
self.assertIn("my-group", groups)
|
|
channel._channel_layer.group_add.assert_called_once_with(
|
|
"my-group", "test-channel-name"
|
|
)
|
|
|
|
def test_leave_group_removes_from_groups_set(self):
|
|
"""_leave_group() should remove group from _groups set."""
|
|
import asyncio
|
|
|
|
class TestChannel(ReactChannel):
|
|
def authorize(self, params=None):
|
|
return True
|
|
|
|
def group(self, params=None):
|
|
return "test"
|
|
|
|
channel = TestChannel()
|
|
channel._channel_layer = AsyncMock()
|
|
channel._channel_name = "test-channel-name"
|
|
channel._groups.add("my-group")
|
|
|
|
async def test():
|
|
await channel._leave_group("my-group")
|
|
return channel._groups
|
|
|
|
groups = asyncio.get_event_loop().run_until_complete(test())
|
|
|
|
self.assertNotIn("my-group", groups)
|
|
channel._channel_layer.group_discard.assert_called_once_with(
|
|
"my-group", "test-channel-name"
|
|
)
|
|
|
|
def test_leave_group_ignores_unknown_group(self):
|
|
"""_leave_group() should ignore groups not in _groups."""
|
|
import asyncio
|
|
|
|
class TestChannel(ReactChannel):
|
|
def authorize(self, params=None):
|
|
return True
|
|
|
|
def group(self, params=None):
|
|
return "test"
|
|
|
|
channel = TestChannel()
|
|
channel._channel_layer = AsyncMock()
|
|
channel._channel_name = "test-channel-name"
|
|
|
|
async def test():
|
|
await channel._leave_group("unknown-group")
|
|
return channel._groups
|
|
|
|
groups = asyncio.get_event_loop().run_until_complete(test())
|
|
|
|
# Should not have called group_discard
|
|
channel._channel_layer.group_discard.assert_not_called()
|
|
|
|
def test_leave_all_groups(self):
|
|
"""_leave_all_groups() should leave all joined groups."""
|
|
import asyncio
|
|
|
|
class TestChannel(ReactChannel):
|
|
def authorize(self, params=None):
|
|
return True
|
|
|
|
def group(self, params=None):
|
|
return "test"
|
|
|
|
channel = TestChannel()
|
|
channel._channel_layer = AsyncMock()
|
|
channel._channel_name = "test-channel-name"
|
|
channel._groups = {"group1", "group2", "group3"}
|
|
|
|
async def test():
|
|
await channel._leave_all_groups()
|
|
return channel._groups
|
|
|
|
groups = asyncio.get_event_loop().run_until_complete(test())
|
|
|
|
self.assertEqual(len(groups), 0)
|
|
self.assertEqual(channel._channel_layer.group_discard.call_count, 3)
|
|
|
|
def test_broadcast_sends_to_group(self):
|
|
"""_broadcast() should send message to channel layer."""
|
|
import asyncio
|
|
|
|
class TestChannel(ReactChannel):
|
|
class DjangoMessage(BaseModel):
|
|
text: str
|
|
|
|
def authorize(self, params=None):
|
|
return True
|
|
|
|
def group(self, params=None):
|
|
return "test"
|
|
|
|
channel = TestChannel()
|
|
channel._channel_layer = AsyncMock()
|
|
|
|
message = TestChannel.DjangoMessage(text="Hello")
|
|
|
|
async def test():
|
|
await channel._broadcast("my-group", message)
|
|
|
|
asyncio.get_event_loop().run_until_complete(test())
|
|
|
|
channel._channel_layer.group_send.assert_called_once()
|
|
call_args = channel._channel_layer.group_send.call_args
|
|
|
|
self.assertEqual(call_args[0][0], "my-group")
|
|
self.assertEqual(call_args[0][1]["type"], "channel.message")
|
|
self.assertEqual(call_args[0][1]["data"]["text"], "Hello")
|
|
|
|
|
|
# =============================================================================
|
|
# Server Push Tests
|
|
# =============================================================================
|
|
|
|
class ServerPushTests(TestCase):
|
|
"""Tests for server push functionality."""
|
|
|
|
def setUp(self):
|
|
self._original_registry = dict(_registry)
|
|
|
|
def tearDown(self):
|
|
_registry.clear()
|
|
_registry.update(self._original_registry)
|
|
|
|
def test_push_without_params(self):
|
|
"""push() should work for channels without params."""
|
|
import asyncio
|
|
|
|
class NotificationChannel(ReactChannel):
|
|
class DjangoMessage(BaseModel):
|
|
title: str
|
|
body: str
|
|
|
|
def authorize(self, params=None):
|
|
return True
|
|
|
|
def group(self, params=None):
|
|
return "notifications"
|
|
|
|
with patch('channels.layers.get_channel_layer') as mock_get_layer:
|
|
mock_layer = AsyncMock()
|
|
mock_get_layer.return_value = mock_layer
|
|
|
|
message = NotificationChannel.DjangoMessage(
|
|
title="Alert",
|
|
body="Something happened"
|
|
)
|
|
|
|
async def test():
|
|
await NotificationChannel.push(message=message)
|
|
|
|
asyncio.get_event_loop().run_until_complete(test())
|
|
|
|
mock_layer.group_send.assert_called_once()
|
|
call_args = mock_layer.group_send.call_args
|
|
|
|
self.assertEqual(call_args[0][0], "notifications")
|
|
self.assertEqual(call_args[0][1]["data"]["title"], "Alert")
|
|
|
|
def test_push_with_params(self):
|
|
"""push() should work for channels with params."""
|
|
import asyncio
|
|
|
|
class RoomChannel(ReactChannel):
|
|
class Params(BaseModel):
|
|
room: str
|
|
|
|
class DjangoMessage(BaseModel):
|
|
text: str
|
|
|
|
def authorize(self, params):
|
|
return True
|
|
|
|
def group(self, params):
|
|
return f"room_{params.room}"
|
|
|
|
with patch('channels.layers.get_channel_layer') as mock_get_layer:
|
|
mock_layer = AsyncMock()
|
|
mock_get_layer.return_value = mock_layer
|
|
|
|
message = RoomChannel.DjangoMessage(text="Hello room!")
|
|
|
|
async def test():
|
|
await RoomChannel.push(room="general", message=message)
|
|
|
|
asyncio.get_event_loop().run_until_complete(test())
|
|
|
|
mock_layer.group_send.assert_called_once()
|
|
call_args = mock_layer.group_send.call_args
|
|
|
|
self.assertEqual(call_args[0][0], "room_general")
|
|
self.assertEqual(call_args[0][1]["data"]["text"], "Hello room!")
|
|
|
|
def test_push_without_channel_layer_warns(self):
|
|
"""push() should warn when no channel layer is configured."""
|
|
import asyncio
|
|
import logging
|
|
|
|
class TestChannel(ReactChannel):
|
|
class DjangoMessage(BaseModel):
|
|
text: str
|
|
|
|
def authorize(self, params=None):
|
|
return True
|
|
|
|
def group(self, params=None):
|
|
return "test"
|
|
|
|
with patch('channels.layers.get_channel_layer') as mock_get_layer:
|
|
mock_get_layer.return_value = None
|
|
|
|
message = TestChannel.DjangoMessage(text="test")
|
|
|
|
with self.assertLogs('djarea.channels', level='WARNING') as cm:
|
|
async def test():
|
|
await TestChannel.push(message=message)
|
|
|
|
asyncio.get_event_loop().run_until_complete(test())
|
|
|
|
self.assertTrue(any("No channel layer configured" in msg for msg in cm.output))
|
|
|
|
|
|
# =============================================================================
|
|
# Management Command Tests
|
|
# =============================================================================
|
|
|
|
class ManagementCommandTests(TestCase):
|
|
"""Tests for the export_channels_schema management command."""
|
|
|
|
def setUp(self):
|
|
self._original_registry = dict(_registry)
|
|
|
|
def tearDown(self):
|
|
_registry.clear()
|
|
_registry.update(self._original_registry)
|
|
|
|
def test_export_command_outputs_openapi_json(self):
|
|
"""export_channels_schema should output valid OpenAPI JSON."""
|
|
from io import StringIO
|
|
from django.core.management import call_command
|
|
|
|
out = StringIO()
|
|
call_command('export_channels_schema', stdout=out)
|
|
|
|
output = out.getvalue()
|
|
|
|
# Should be valid JSON with OpenAPI structure
|
|
schema = json.loads(output)
|
|
|
|
self.assertIn("openapi", schema)
|
|
self.assertIn("x-djarea-channels", schema)
|
|
|
|
def test_export_command_includes_registered_channels(self):
|
|
"""export_channels_schema should include registered channels."""
|
|
from io import StringIO
|
|
from django.core.management import call_command
|
|
|
|
class TestChannel(ReactChannel):
|
|
class Params(BaseModel):
|
|
id: int
|
|
|
|
def authorize(self, params):
|
|
return True
|
|
|
|
def group(self, params):
|
|
return f"test_{params.id}"
|
|
|
|
register(TestChannel, "export-test")
|
|
|
|
out = StringIO()
|
|
call_command('export_channels_schema', stdout=out)
|
|
|
|
output = out.getvalue()
|
|
schema = json.loads(output)
|
|
|
|
# Check that channel is in x-djarea-channels metadata
|
|
channel_names = [c["name"] for c in schema["x-djarea-channels"]]
|
|
self.assertIn("export-test", channel_names)
|
|
|
|
def test_export_command_respects_indent(self):
|
|
"""export_channels_schema should respect --indent option."""
|
|
from io import StringIO
|
|
from django.core.management import call_command
|
|
|
|
# With indent
|
|
out_indent = StringIO()
|
|
call_command('export_channels_schema', indent=2, stdout=out_indent)
|
|
|
|
# Without indent (compact)
|
|
out_compact = StringIO()
|
|
call_command('export_channels_schema', indent=0, stdout=out_compact)
|
|
|
|
# Indented should be longer (has whitespace)
|
|
self.assertGreater(len(out_indent.getvalue()), len(out_compact.getvalue()))
|
|
|
|
|
|
# =============================================================================
|
|
# WebSocket RPC Tests
|
|
# =============================================================================
|
|
|
|
|
|
class WebSocketRPCTests(TestCase):
|
|
"""Tests for WebSocket RPC functionality."""
|
|
|
|
def setUp(self):
|
|
# Clear djarea registry
|
|
from djarea.setup.registry import clear_registry
|
|
clear_registry()
|
|
|
|
# Register test functions
|
|
from djarea.client import client
|
|
from djarea.setup.registry import register
|
|
from pydantic import BaseModel
|
|
|
|
class EchoOutput(BaseModel):
|
|
echo: str
|
|
|
|
class AddOutput(BaseModel):
|
|
result: int
|
|
|
|
@client(websocket=True)
|
|
def rpc_echo(request, message: str) -> EchoOutput:
|
|
return EchoOutput(echo=f"Echo: {message}")
|
|
register(rpc_echo, "rpc_echo")
|
|
|
|
@client(websocket=True)
|
|
def rpc_add(request, a: int, b: int) -> AddOutput:
|
|
return AddOutput(result=a + b)
|
|
register(rpc_add, "rpc_add")
|
|
|
|
@client(websocket=True)
|
|
def rpc_auth_required(request) -> EchoOutput:
|
|
if not request.user.is_authenticated:
|
|
raise PermissionError("Authentication required")
|
|
return EchoOutput(echo=f"Hello, {request.user.email}")
|
|
register(rpc_auth_required, "rpc_auth_required")
|
|
|
|
def tearDown(self):
|
|
from djarea.setup.registry import clear_registry
|
|
clear_registry()
|
|
|
|
def test_handle_rpc_success(self):
|
|
"""_handle_rpc should execute function and return result."""
|
|
import asyncio
|
|
from djarea.channels.connection import DjangoReactConsumer
|
|
|
|
consumer = DjangoReactConsumer()
|
|
consumer.scope = {
|
|
"user": MockUser(is_authenticated=True, email="test@example.com"),
|
|
}
|
|
consumer.sent_messages = []
|
|
|
|
async def mock_send_json(data):
|
|
consumer.sent_messages.append(data)
|
|
|
|
consumer.send_json = mock_send_json
|
|
|
|
async def test():
|
|
await consumer._handle_rpc({
|
|
"id": "test-123",
|
|
"fn": "rpc_echo",
|
|
"args": {"message": "Hello"},
|
|
})
|
|
|
|
asyncio.get_event_loop().run_until_complete(test())
|
|
|
|
self.assertEqual(len(consumer.sent_messages), 1)
|
|
response = consumer.sent_messages[0]
|
|
|
|
self.assertEqual(response["id"], "test-123")
|
|
self.assertTrue(response["ok"])
|
|
self.assertEqual(response["data"]["echo"], "Echo: Hello")
|
|
|
|
def test_handle_rpc_with_multiple_args(self):
|
|
"""_handle_rpc should handle functions with multiple arguments."""
|
|
import asyncio
|
|
from djarea.channels.connection import DjangoReactConsumer
|
|
|
|
consumer = DjangoReactConsumer()
|
|
consumer.scope = {"user": MockUser()}
|
|
consumer.sent_messages = []
|
|
|
|
async def mock_send_json(data):
|
|
consumer.sent_messages.append(data)
|
|
|
|
consumer.send_json = mock_send_json
|
|
|
|
async def test():
|
|
await consumer._handle_rpc({
|
|
"id": "add-123",
|
|
"fn": "rpc_add",
|
|
"args": {"a": 5, "b": 3},
|
|
})
|
|
|
|
asyncio.get_event_loop().run_until_complete(test())
|
|
|
|
response = consumer.sent_messages[0]
|
|
self.assertTrue(response["ok"])
|
|
self.assertEqual(response["data"]["result"], 8)
|
|
|
|
def test_handle_rpc_function_not_found(self):
|
|
"""_handle_rpc should return error for unknown function."""
|
|
import asyncio
|
|
from djarea.channels.connection import DjangoReactConsumer
|
|
|
|
consumer = DjangoReactConsumer()
|
|
consumer.scope = {"user": MockUser()}
|
|
consumer.sent_messages = []
|
|
|
|
async def mock_send_json(data):
|
|
consumer.sent_messages.append(data)
|
|
|
|
consumer.send_json = mock_send_json
|
|
|
|
async def test():
|
|
await consumer._handle_rpc({
|
|
"id": "test-456",
|
|
"fn": "nonexistent_function",
|
|
"args": {},
|
|
})
|
|
|
|
asyncio.get_event_loop().run_until_complete(test())
|
|
|
|
response = consumer.sent_messages[0]
|
|
self.assertEqual(response["id"], "test-456")
|
|
self.assertFalse(response["ok"])
|
|
self.assertEqual(response["error"]["code"], "NOT_FOUND")
|
|
|
|
def test_handle_rpc_validation_error(self):
|
|
"""_handle_rpc should return validation error for invalid input."""
|
|
import asyncio
|
|
from djarea.channels.connection import DjangoReactConsumer
|
|
|
|
consumer = DjangoReactConsumer()
|
|
consumer.scope = {"user": MockUser()}
|
|
consumer.sent_messages = []
|
|
|
|
async def mock_send_json(data):
|
|
consumer.sent_messages.append(data)
|
|
|
|
consumer.send_json = mock_send_json
|
|
|
|
async def test():
|
|
await consumer._handle_rpc({
|
|
"id": "test-789",
|
|
"fn": "rpc_echo",
|
|
"args": {}, # Missing required 'message' field
|
|
})
|
|
|
|
asyncio.get_event_loop().run_until_complete(test())
|
|
|
|
response = consumer.sent_messages[0]
|
|
self.assertEqual(response["id"], "test-789")
|
|
self.assertFalse(response["ok"])
|
|
self.assertEqual(response["error"]["code"], "VALIDATION_ERROR")
|
|
|
|
def test_handle_rpc_missing_id(self):
|
|
"""_handle_rpc should return error when id is missing."""
|
|
import asyncio
|
|
from djarea.channels.connection import DjangoReactConsumer
|
|
|
|
consumer = DjangoReactConsumer()
|
|
consumer.scope = {"user": MockUser()}
|
|
consumer.sent_messages = []
|
|
|
|
async def mock_send_json(data):
|
|
consumer.sent_messages.append(data)
|
|
|
|
consumer.send_json = mock_send_json
|
|
|
|
async def test():
|
|
await consumer._handle_rpc({
|
|
"fn": "rpc_echo",
|
|
"args": {"message": "test"},
|
|
# Missing 'id'
|
|
})
|
|
|
|
asyncio.get_event_loop().run_until_complete(test())
|
|
|
|
response = consumer.sent_messages[0]
|
|
self.assertIn("error", response)
|
|
self.assertIn("missing", response["error"].lower())
|
|
|
|
def test_handle_rpc_missing_fn(self):
|
|
"""_handle_rpc should return error when fn is missing."""
|
|
import asyncio
|
|
from djarea.channels.connection import DjangoReactConsumer
|
|
|
|
consumer = DjangoReactConsumer()
|
|
consumer.scope = {"user": MockUser()}
|
|
consumer.sent_messages = []
|
|
|
|
async def mock_send_json(data):
|
|
consumer.sent_messages.append(data)
|
|
|
|
consumer.send_json = mock_send_json
|
|
|
|
async def test():
|
|
await consumer._handle_rpc({
|
|
"id": "test-abc",
|
|
"args": {"message": "test"},
|
|
# Missing 'fn'
|
|
})
|
|
|
|
asyncio.get_event_loop().run_until_complete(test())
|
|
|
|
response = consumer.sent_messages[0]
|
|
self.assertEqual(response["id"], "test-abc")
|
|
self.assertFalse(response["ok"])
|
|
self.assertEqual(response["error"]["code"], "BAD_REQUEST")
|
|
|
|
def test_handle_rpc_with_unauthenticated_user(self):
|
|
"""_handle_rpc should handle permission errors correctly."""
|
|
import asyncio
|
|
from djarea.channels.connection import DjangoReactConsumer
|
|
|
|
consumer = DjangoReactConsumer()
|
|
consumer.scope = {"user": MockAnonymousUser()}
|
|
consumer.sent_messages = []
|
|
|
|
async def mock_send_json(data):
|
|
consumer.sent_messages.append(data)
|
|
|
|
consumer.send_json = mock_send_json
|
|
|
|
async def test():
|
|
await consumer._handle_rpc({
|
|
"id": "auth-test",
|
|
"fn": "rpc_auth_required",
|
|
"args": {},
|
|
})
|
|
|
|
asyncio.get_event_loop().run_until_complete(test())
|
|
|
|
response = consumer.sent_messages[0]
|
|
self.assertEqual(response["id"], "auth-test")
|
|
self.assertFalse(response["ok"])
|
|
self.assertEqual(response["error"]["code"], "FORBIDDEN")
|
|
|
|
def test_websocket_request_adapter(self):
|
|
"""WebSocketRequest should provide correct user and session."""
|
|
from djarea.channels.connection import WebSocketRequest
|
|
|
|
mock_user = MockUser(email="ws@example.com")
|
|
scope = {
|
|
"user": mock_user,
|
|
"session": {"key": "value"},
|
|
"headers": [(b"x-custom-header", b"test-value")],
|
|
}
|
|
|
|
request = WebSocketRequest(scope)
|
|
|
|
self.assertEqual(request.user, mock_user)
|
|
self.assertEqual(request.session["key"], "value")
|
|
self.assertIn("HTTP_X_CUSTOM_HEADER", request.META)
|