Skip to content

Event System

MAID uses an event-driven architecture to decouple game systems.

Overview

The event bus allows systems to communicate without direct dependencies:

from maid_engine.core.events import EventBus, Event

class PlayerDiedEvent(Event):
    player_id: str
    killer_id: str | None = None

# Subscribe to events
@event_bus.subscribe(PlayerDiedEvent)
async def on_player_death(event: PlayerDiedEvent):
    # Handle player death
    pass

# Emit events
await event_bus.emit(PlayerDiedEvent(player_id="player1", killer_id="goblin1"))

Core Events

The engine provides a small set of core lifecycle events:

Event When Emitted
TickEvent At the start of each game tick
EntityCreatedEvent When an entity is created
EntityDestroyedEvent When an entity is destroyed
ComponentAddedEvent When a component is added to an entity
ComponentRemovedEvent When a component is removed from an entity
PlayerConnectedEvent When a player session connects
PlayerDisconnectedEvent When a player session disconnects
RoomEnterEvent When an entity enters a room
RoomLeaveEvent When an entity leaves a room

Custom Events

Content packs define their own events for game-specific behavior:

from dataclasses import dataclass
from maid_engine.core.events import Event
from uuid import UUID

@dataclass
class CombatStartedEvent(Event):
    attacker_id: UUID
    defender_id: UUID

@dataclass
class SpellCastEvent(Event):
    caster_id: UUID
    spell_name: str
    target_id: UUID | None = None
    mana_cost: int = 0

@dataclass
class ItemCraftedEvent(Event):
    crafter_id: UUID
    item_id: UUID
    recipe_name: str

Event Priorities

Handlers can be registered with different priorities:

from maid_engine.core.events import EventPriority

# HIGHEST runs first, LOWEST runs last
bus.subscribe(
    DamageEvent,
    damage_handler,
    priority=EventPriority.HIGH
)

bus.subscribe(
    DamageEvent,
    logging_handler,
    priority=EventPriority.LOWEST  # Log after all processing
)

Priority order: HIGHEST > HIGH > NORMAL > LOW > LOWEST

Cancelling Events

Events can be cancelled to prevent further processing:

@dataclass
class DamageEvent(Event):
    target_id: UUID
    amount: int
    source_id: UUID | None = None

async def damage_shield_handler(event: DamageEvent):
    target = world.entities.get(event.target_id)
    shield = target.try_get(ShieldComponent)

    if shield and shield.active:
        shield.absorb(event.amount)
        event.cancel()  # Prevent damage from being applied

Sync vs Async Emission

# Async emit - use when you can await
await bus.emit(MyEvent(...))

# Sync emit - queues for later processing
bus.emit_sync(MyEvent(...))

# Process queued events
await bus.process_pending()

The tick loop automatically processes pending events, so emit_sync is safe to use from systems.

Best Practices

1. Keep Events Focused

Each event should represent one thing that happened:

# Good - focused events
@dataclass
class PlayerLeveledUpEvent(Event):
    player_id: UUID
    new_level: int

@dataclass
class SkillUnlockedEvent(Event):
    player_id: UUID
    skill_name: str

# Avoid - too many responsibilities
@dataclass
class PlayerProgressEvent(Event):  # Too broad
    player_id: UUID
    level: int | None
    skill: str | None
    quest: str | None

2. Use Events for Cross-System Communication

Events decouple systems that need to react to the same game state:

# Combat system emits event
await bus.emit(EnemyDefeatedEvent(
    enemy_id=enemy.id,
    killer_id=player.id,
    experience=50
))

# Quest system reacts
async def on_enemy_defeated(event: EnemyDefeatedEvent):
    await quest_system.update_kill_quests(event.killer_id, event.enemy_id)

# Achievement system reacts
async def on_enemy_defeated(event: EnemyDefeatedEvent):
    await achievement_system.check_combat_achievements(event)

# Loot system reacts
async def on_enemy_defeated(event: EnemyDefeatedEvent):
    await loot_system.drop_loot(event.enemy_id)

3. Don't Emit Events in Loops

Batch operations should emit a single event:

# Avoid - emitting per item
for item in items:
    await bus.emit(ItemAddedEvent(player_id, item.id))

# Better - emit once with all items
await bus.emit(ItemsAddedEvent(player_id, [i.id for i in items]))

4. Track Event Sources for Hot Reload

When registering handlers in content packs, specify the source:

def register_events(self, bus: EventBus) -> None:
    bus.subscribe(
        CombatStartedEvent,
        self.on_combat_started,
        source_pack="maid-classic-rpg"  # For hot reload cleanup
    )

Further Reading