Events Guide¶
Events enable decoupled communication between systems in MAID. Instead of systems calling each other directly, they emit and subscribe to events. This guide covers creating events, subscribing to them, and best practices for event-driven architecture.
Event Basics¶
What Are Events?¶
Events are messages that describe something that happened in the game. They carry data about the occurrence and can be subscribed to by any interested system.
from dataclasses import dataclass
from uuid import UUID
from maid_engine.core.events import Event
@dataclass
class DamageDealtEvent(Event):
"""Emitted when damage is dealt to an entity."""
source_id: UUID | None # Who dealt the damage
target_id: UUID # Who received the damage
damage: int # Amount of damage
damage_type: str # Type: physical, fire, ice, etc.
The Event Base Class¶
All events inherit from Event, which provides:
@dataclass
class Event:
event_type: str # Set automatically to class name
timestamp: datetime # Set automatically to current time
cancelled: bool # Can be set to stop propagation
def cancel(self) -> None:
"""Cancel this event."""
self.cancelled = True
Creating Events¶
Simple Events¶
Events are dataclasses with fields for their data:
from dataclasses import dataclass
from uuid import UUID
from maid_engine.core.events import Event
@dataclass
class PlayerJoinedEvent(Event):
"""A player joined the game."""
player_id: UUID
player_name: str
@dataclass
class RoomEnterEvent(Event):
"""An entity entered a room."""
entity_id: UUID
room_id: UUID
from_room_id: UUID | None = None
Events with Default Values¶
Use default values for optional fields:
@dataclass
class ItemDroppedEvent(Event):
"""An item was dropped."""
entity_id: UUID
item_id: UUID
room_id: UUID
quantity: int = 1
intentional: bool = True # vs dropped on death
Events with Complex Data¶
Events can contain structured data:
from dataclasses import dataclass, field
@dataclass
class CombatRoundEvent(Event):
"""A round of combat completed."""
combatant_ids: list[UUID]
room_id: UUID
round_number: int
actions_taken: dict[str, str] = field(default_factory=dict)
Emitting Events¶
Basic Emission¶
Emit events through the event bus:
# In a system
async def update(self, delta: float) -> None:
event = DamageDealtEvent(
source_id=attacker.id,
target_id=target.id,
damage=25,
damage_type="physical",
)
await self.events.emit(event)
Emission from Commands¶
async def attack_command(ctx: CommandContext) -> bool:
# ... calculate attack ...
await ctx.world.events.emit(DamageDealtEvent(
source_id=ctx.player_id,
target_id=target_id,
damage=damage,
damage_type="physical",
))
return True
Sync Emission¶
For synchronous contexts, queue events for later processing:
# Queue for later (doesn't await)
world.events.emit_sync(MyEvent(...))
# Process queued events later
count = await world.events.process_pending()
Subscribing to Events¶
Basic Subscription¶
Subscribe handlers to event types:
async def on_damage_dealt(event: DamageDealtEvent) -> None:
print(f"Damage: {event.damage} to {event.target_id}")
# Subscribe
world.events.subscribe(DamageDealtEvent, on_damage_dealt)
In Systems¶
Subscribe in the system's startup method:
class LoggingSystem(System):
async def startup(self) -> None:
self.events.subscribe(DamageDealtEvent, self._on_damage)
self.events.subscribe(EntityDeathEvent, self._on_death)
async def _on_damage(self, event: DamageDealtEvent) -> None:
# Log damage...
pass
async def _on_death(self, event: EntityDeathEvent) -> None:
# Log death...
pass
Sync Handlers¶
Synchronous handlers are automatically wrapped:
def sync_handler(event: DamageDealtEvent) -> None:
# Sync code works fine
print(f"Damage dealt: {event.damage}")
world.events.subscribe(DamageDealtEvent, sync_handler)
Event Priority¶
Handlers run in priority order. Higher priority runs first.
from maid_engine.core.events import EventPriority
# Runs first - can modify or cancel event
world.events.subscribe(
DamageDealtEvent,
damage_reduction_handler,
priority=EventPriority.HIGHEST,
)
# Runs in the middle
world.events.subscribe(
DamageDealtEvent,
normal_handler,
priority=EventPriority.NORMAL,
)
# Runs last - for logging/cleanup
world.events.subscribe(
DamageDealtEvent,
logging_handler,
priority=EventPriority.LOWEST,
)
Priority levels:
| Priority | Use Case |
|---|---|
HIGHEST |
Validation, blocking, modification |
HIGH |
Pre-processing |
NORMAL |
Standard handlers |
LOW |
Post-processing |
LOWEST |
Logging, cleanup |
Canceling Events¶
Handlers can cancel events to prevent further processing:
async def invulnerability_handler(event: DamageDealtEvent) -> None:
target = world.entities.get(event.target_id)
if target and has_invulnerability(target):
event.cancel() # No more handlers will run
# Register with high priority to run first
world.events.subscribe(
DamageDealtEvent,
invulnerability_handler,
priority=EventPriority.HIGHEST,
)
Check if canceled before processing:
async def damage_handler(event: DamageDealtEvent) -> None:
if event.cancelled:
return # Optional - event bus already checks this
# Apply damage...
One-Time Subscriptions¶
Subscribe to handle an event only once:
# Handler automatically unsubscribes after first invocation
world.events.subscribe(
PlayerJoinedEvent,
first_player_handler,
once=True,
)
Unsubscribing¶
By Handler ID¶
# Subscribe returns a handler ID
handler_id = world.events.subscribe(DamageDealtEvent, my_handler)
# Later, unsubscribe
world.events.unsubscribe(handler_id)
All Handlers for Event Type¶
# Remove all handlers for an event type
count = world.events.unsubscribe_all(DamageDealtEvent)
print(f"Removed {count} handlers")
Built-in Events¶
Engine Events (maid-engine)¶
| Event | Description |
|---|---|
TickEvent |
Emitted at start of each game tick |
EntityCreatedEvent |
Entity was created |
EntityDestroyedEvent |
Entity was destroyed |
ComponentAddedEvent |
Component added to entity |
ComponentRemovedEvent |
Component removed from entity |
PlayerConnectedEvent |
Player connected to server |
PlayerDisconnectedEvent |
Player disconnected |
PlayerCommandEvent |
Before command is processed |
RoomEnterEvent |
Entity entered a room |
RoomLeaveEvent |
Entity left a room |
CustomEvent |
Generic event for ad-hoc use |
Stdlib Events (maid-stdlib)¶
| Event | Description |
|---|---|
CombatStartEvent |
Combat began |
CombatEndEvent |
Combat ended |
DamageDealtEvent |
Damage was dealt |
EntityDeathEvent |
Entity died |
ItemPickedUpEvent |
Item was picked up |
ItemDroppedEvent |
Item was dropped |
MessageEvent |
Communication message |
Event Patterns¶
Request-Response Pattern¶
Use events for requests that need responses:
@dataclass
class HealRequestEvent(Event):
"""Request to heal an entity."""
target_id: UUID
amount: int
source_id: UUID | None = None
@dataclass
class HealCompletedEvent(Event):
"""Heal was applied."""
target_id: UUID
actual_amount: int
source_id: UUID | None = None
# Healing system handles requests
async def handle_heal_request(event: HealRequestEvent) -> None:
entity = world.entities.get(event.target_id)
if not entity:
return
health = entity.try_get(HealthComponent)
if not health:
return
actual = health.heal(event.amount)
# Emit completion event
await world.events.emit(HealCompletedEvent(
target_id=event.target_id,
actual_amount=actual,
source_id=event.source_id,
))
Event Chains¶
Events can trigger other events:
# DamageDealtEvent -> EntityDeathEvent -> LootDroppedEvent
async def check_death(event: DamageDealtEvent) -> None:
target = world.entities.get(event.target_id)
if not target:
return
health = target.try_get(HealthComponent)
if health and not health.is_alive:
await world.events.emit(EntityDeathEvent(
entity_id=event.target_id,
killer_id=event.source_id,
))
async def drop_loot(event: EntityDeathEvent) -> None:
# Generate and drop loot
for item in generate_loot(event.entity_id):
await world.events.emit(LootDroppedEvent(
entity_id=event.entity_id,
item_id=item.id,
))
Broadcasting Pattern¶
Send messages to multiple targets:
@dataclass
class RoomMessageEvent(Event):
"""Message to all entities in a room."""
room_id: UUID
message: str
exclude_ids: list[UUID] = field(default_factory=list)
async def broadcast_to_room(event: RoomMessageEvent) -> None:
for entity in world.entities.with_components(PositionComponent):
pos = entity.get(PositionComponent)
if pos.room_id != event.room_id:
continue
if entity.id in event.exclude_ids:
continue
# Send message to entity's session
session = get_session_for_entity(entity.id)
if session:
await session.send(event.message)
Best Practices¶
Keep Events Immutable¶
Don't modify event data in handlers:
# Bad - modifying event data
async def bad_handler(event: DamageDealtEvent) -> None:
event.damage = event.damage * 2 # Don't do this!
# Good - emit a new event or modify the entity
async def good_handler(event: DamageDealtEvent) -> None:
modified_damage = event.damage * 2
# Apply modified damage to entity directly
Use Specific Event Types¶
Create specific events rather than generic ones:
# Good - specific events
@dataclass
class PlayerLeveledUpEvent(Event):
player_id: UUID
new_level: int
# Avoid - generic events are harder to work with
@dataclass
class GenericPlayerEvent(Event):
player_id: UUID
event_name: str # "leveled_up", "died", etc.
data: dict
Document Event Flow¶
Comment the expected event flow:
# Event Flow:
# 1. PlayerAttacksEvent (from command)
# 2. -> DamageCalculatedEvent (from combat system)
# 3. -> DamageDealtEvent (from combat system)
# 4. -> EntityDeathEvent (if target dies)
# 5. -> ExperienceGainedEvent (if target died)
# 6. -> LootDroppedEvent (if target died)
Handle Errors Gracefully¶
The event bus catches handler exceptions, but log them:
async def robust_handler(event: DamageDealtEvent) -> None:
try:
# Handle event...
pass
except Exception as e:
logger.error(f"Error handling DamageDealtEvent: {e}")
# Don't re-raise - other handlers should still run
Clean Up Subscriptions¶
Unsubscribe when systems shut down:
class MySystem(System):
def __init__(self, world: World) -> None:
super().__init__(world)
self._handler_ids: list[UUID] = []
async def startup(self) -> None:
handler_id = self.events.subscribe(MyEvent, self._handler)
self._handler_ids.append(handler_id)
async def shutdown(self) -> None:
for handler_id in self._handler_ids:
self.events.unsubscribe(handler_id)
self._handler_ids.clear()
Next Steps¶
- Persistence Guide - Storing event-related data
- Testing Guide - Testing event handlers
- Reference: Events - Complete event reference