Skip to content

Events Guide

Events enable decoupled communication between systems in MAID. Instead of systems calling each other directly, they emit and subscribe to events. This guide covers creating events, subscribing to them, and best practices for event-driven architecture.

Event Basics

What Are Events?

Events are messages that describe something that happened in the game. They carry data about the occurrence and can be subscribed to by any interested system.

from dataclasses import dataclass
from uuid import UUID

from maid_engine.core.events import Event


@dataclass
class DamageDealtEvent(Event):
    """Emitted when damage is dealt to an entity."""

    source_id: UUID | None   # Who dealt the damage
    target_id: UUID          # Who received the damage
    damage: int              # Amount of damage
    damage_type: str         # Type: physical, fire, ice, etc.

The Event Base Class

All events inherit from Event, which provides:

@dataclass
class Event:
    event_type: str      # Set automatically to class name
    timestamp: datetime  # Set automatically to current time
    cancelled: bool      # Can be set to stop propagation

    def cancel(self) -> None:
        """Cancel this event."""
        self.cancelled = True

Creating Events

Simple Events

Events are dataclasses with fields for their data:

from dataclasses import dataclass
from uuid import UUID

from maid_engine.core.events import Event


@dataclass
class PlayerJoinedEvent(Event):
    """A player joined the game."""

    player_id: UUID
    player_name: str


@dataclass
class RoomEnterEvent(Event):
    """An entity entered a room."""

    entity_id: UUID
    room_id: UUID
    from_room_id: UUID | None = None

Events with Default Values

Use default values for optional fields:

@dataclass
class ItemDroppedEvent(Event):
    """An item was dropped."""

    entity_id: UUID
    item_id: UUID
    room_id: UUID
    quantity: int = 1
    intentional: bool = True  # vs dropped on death

Events with Complex Data

Events can contain structured data:

from dataclasses import dataclass, field

@dataclass
class CombatRoundEvent(Event):
    """A round of combat completed."""

    combatant_ids: list[UUID]
    room_id: UUID
    round_number: int
    actions_taken: dict[str, str] = field(default_factory=dict)

Emitting Events

Basic Emission

Emit events through the event bus:

# In a system
async def update(self, delta: float) -> None:
    event = DamageDealtEvent(
        source_id=attacker.id,
        target_id=target.id,
        damage=25,
        damage_type="physical",
    )
    await self.events.emit(event)

Emission from Commands

async def attack_command(ctx: CommandContext) -> bool:
    # ... calculate attack ...

    await ctx.world.events.emit(DamageDealtEvent(
        source_id=ctx.player_id,
        target_id=target_id,
        damage=damage,
        damage_type="physical",
    ))
    return True

Sync Emission

For synchronous contexts, queue events for later processing:

# Queue for later (doesn't await)
world.events.emit_sync(MyEvent(...))

# Process queued events later
count = await world.events.process_pending()

Subscribing to Events

Basic Subscription

Subscribe handlers to event types:

async def on_damage_dealt(event: DamageDealtEvent) -> None:
    print(f"Damage: {event.damage} to {event.target_id}")


# Subscribe
world.events.subscribe(DamageDealtEvent, on_damage_dealt)

In Systems

Subscribe in the system's startup method:

class LoggingSystem(System):
    async def startup(self) -> None:
        self.events.subscribe(DamageDealtEvent, self._on_damage)
        self.events.subscribe(EntityDeathEvent, self._on_death)

    async def _on_damage(self, event: DamageDealtEvent) -> None:
        # Log damage...
        pass

    async def _on_death(self, event: EntityDeathEvent) -> None:
        # Log death...
        pass

Sync Handlers

Synchronous handlers are automatically wrapped:

def sync_handler(event: DamageDealtEvent) -> None:
    # Sync code works fine
    print(f"Damage dealt: {event.damage}")


world.events.subscribe(DamageDealtEvent, sync_handler)

Event Priority

Handlers run in priority order. Higher priority runs first.

from maid_engine.core.events import EventPriority


# Runs first - can modify or cancel event
world.events.subscribe(
    DamageDealtEvent,
    damage_reduction_handler,
    priority=EventPriority.HIGHEST,
)

# Runs in the middle
world.events.subscribe(
    DamageDealtEvent,
    normal_handler,
    priority=EventPriority.NORMAL,
)

# Runs last - for logging/cleanup
world.events.subscribe(
    DamageDealtEvent,
    logging_handler,
    priority=EventPriority.LOWEST,
)

Priority levels:

Priority Use Case
HIGHEST Validation, blocking, modification
HIGH Pre-processing
NORMAL Standard handlers
LOW Post-processing
LOWEST Logging, cleanup

Canceling Events

Handlers can cancel events to prevent further processing:

async def invulnerability_handler(event: DamageDealtEvent) -> None:
    target = world.entities.get(event.target_id)
    if target and has_invulnerability(target):
        event.cancel()  # No more handlers will run


# Register with high priority to run first
world.events.subscribe(
    DamageDealtEvent,
    invulnerability_handler,
    priority=EventPriority.HIGHEST,
)

Check if canceled before processing:

async def damage_handler(event: DamageDealtEvent) -> None:
    if event.cancelled:
        return  # Optional - event bus already checks this

    # Apply damage...

One-Time Subscriptions

Subscribe to handle an event only once:

# Handler automatically unsubscribes after first invocation
world.events.subscribe(
    PlayerJoinedEvent,
    first_player_handler,
    once=True,
)

Unsubscribing

By Handler ID

# Subscribe returns a handler ID
handler_id = world.events.subscribe(DamageDealtEvent, my_handler)

# Later, unsubscribe
world.events.unsubscribe(handler_id)

All Handlers for Event Type

# Remove all handlers for an event type
count = world.events.unsubscribe_all(DamageDealtEvent)
print(f"Removed {count} handlers")

Built-in Events

Engine Events (maid-engine)

Event Description
TickEvent Emitted at start of each game tick
EntityCreatedEvent Entity was created
EntityDestroyedEvent Entity was destroyed
ComponentAddedEvent Component added to entity
ComponentRemovedEvent Component removed from entity
PlayerConnectedEvent Player connected to server
PlayerDisconnectedEvent Player disconnected
PlayerCommandEvent Before command is processed
RoomEnterEvent Entity entered a room
RoomLeaveEvent Entity left a room
CustomEvent Generic event for ad-hoc use

Stdlib Events (maid-stdlib)

Event Description
CombatStartEvent Combat began
CombatEndEvent Combat ended
DamageDealtEvent Damage was dealt
EntityDeathEvent Entity died
ItemPickedUpEvent Item was picked up
ItemDroppedEvent Item was dropped
MessageEvent Communication message

Event Patterns

Request-Response Pattern

Use events for requests that need responses:

@dataclass
class HealRequestEvent(Event):
    """Request to heal an entity."""
    target_id: UUID
    amount: int
    source_id: UUID | None = None


@dataclass
class HealCompletedEvent(Event):
    """Heal was applied."""
    target_id: UUID
    actual_amount: int
    source_id: UUID | None = None


# Healing system handles requests
async def handle_heal_request(event: HealRequestEvent) -> None:
    entity = world.entities.get(event.target_id)
    if not entity:
        return

    health = entity.try_get(HealthComponent)
    if not health:
        return

    actual = health.heal(event.amount)

    # Emit completion event
    await world.events.emit(HealCompletedEvent(
        target_id=event.target_id,
        actual_amount=actual,
        source_id=event.source_id,
    ))

Event Chains

Events can trigger other events:

# DamageDealtEvent -> EntityDeathEvent -> LootDroppedEvent

async def check_death(event: DamageDealtEvent) -> None:
    target = world.entities.get(event.target_id)
    if not target:
        return

    health = target.try_get(HealthComponent)
    if health and not health.is_alive:
        await world.events.emit(EntityDeathEvent(
            entity_id=event.target_id,
            killer_id=event.source_id,
        ))


async def drop_loot(event: EntityDeathEvent) -> None:
    # Generate and drop loot
    for item in generate_loot(event.entity_id):
        await world.events.emit(LootDroppedEvent(
            entity_id=event.entity_id,
            item_id=item.id,
        ))

Broadcasting Pattern

Send messages to multiple targets:

@dataclass
class RoomMessageEvent(Event):
    """Message to all entities in a room."""
    room_id: UUID
    message: str
    exclude_ids: list[UUID] = field(default_factory=list)


async def broadcast_to_room(event: RoomMessageEvent) -> None:
    for entity in world.entities.with_components(PositionComponent):
        pos = entity.get(PositionComponent)
        if pos.room_id != event.room_id:
            continue
        if entity.id in event.exclude_ids:
            continue

        # Send message to entity's session
        session = get_session_for_entity(entity.id)
        if session:
            await session.send(event.message)

Best Practices

Keep Events Immutable

Don't modify event data in handlers:

# Bad - modifying event data
async def bad_handler(event: DamageDealtEvent) -> None:
    event.damage = event.damage * 2  # Don't do this!

# Good - emit a new event or modify the entity
async def good_handler(event: DamageDealtEvent) -> None:
    modified_damage = event.damage * 2
    # Apply modified damage to entity directly

Use Specific Event Types

Create specific events rather than generic ones:

# Good - specific events
@dataclass
class PlayerLeveledUpEvent(Event):
    player_id: UUID
    new_level: int

# Avoid - generic events are harder to work with
@dataclass
class GenericPlayerEvent(Event):
    player_id: UUID
    event_name: str  # "leveled_up", "died", etc.
    data: dict

Document Event Flow

Comment the expected event flow:

# Event Flow:
# 1. PlayerAttacksEvent (from command)
# 2. -> DamageCalculatedEvent (from combat system)
# 3. -> DamageDealtEvent (from combat system)
# 4. -> EntityDeathEvent (if target dies)
# 5. -> ExperienceGainedEvent (if target died)
# 6. -> LootDroppedEvent (if target died)

Handle Errors Gracefully

The event bus catches handler exceptions, but log them:

async def robust_handler(event: DamageDealtEvent) -> None:
    try:
        # Handle event...
        pass
    except Exception as e:
        logger.error(f"Error handling DamageDealtEvent: {e}")
        # Don't re-raise - other handlers should still run

Clean Up Subscriptions

Unsubscribe when systems shut down:

class MySystem(System):
    def __init__(self, world: World) -> None:
        super().__init__(world)
        self._handler_ids: list[UUID] = []

    async def startup(self) -> None:
        handler_id = self.events.subscribe(MyEvent, self._handler)
        self._handler_ids.append(handler_id)

    async def shutdown(self) -> None:
        for handler_id in self._handler_ids:
            self.events.unsubscribe(handler_id)
        self._handler_ids.clear()

Next Steps