Skip to content

maid-engine

The maid-engine package provides the core infrastructure for building MUD games with MAID. It is the foundation layer that all other MAID packages build upon.

Installation

Install via uv sync from the monorepo root. Not yet published to PyPI.

# Install all packages
uv sync

# With optional AI providers
uv sync --extra openai
uv sync --extra anthropic
uv sync --extra ollama

Module Overview

Core (maid_engine.core)

The core module contains the fundamental building blocks:

  • engine - Main GameEngine class - tick loop, content pack loading
  • world - World class - state management, entity tracking
  • events - EventBus and core event types
  • ecs - Entity Component System implementation

Commands (maid_engine.commands)

The command system provides layered, priority-based command resolution:

  • registry - LayeredCommandRegistry for command management
  • decorators - @command decorator and helpers
  • arguments - Argument parsing utilities
  • hooks - Pre/post command hooks
  • locks - Permission lock expressions

Plugins (maid_engine.plugins)

The content pack system for extensibility:

  • protocol - ContentPack protocol and BaseContentPack
  • loader - ContentPackLoader for discovery and loading
  • manifest - ContentPackManifest metadata

Storage (maid_engine.storage)

Document-based persistence layer:

  • document_store - DocumentStore interface and implementations

Networking (maid_engine.net)

Network layer for player connections:

  • server - Base NetworkServer class
  • session - Session base class
  • telnet - Telnet protocol support
  • web - WebSocket support

AI Integration (maid_engine.ai)

LLM provider abstraction:

  • providers - LLM provider interface and implementations
  • conversation - Conversation history management
  • prompts - Prompt building utilities
  • rate_limiter - API rate limiting
  • safety - Content filtering

Authentication (maid_engine.auth)

Account and session management.

Configuration (maid_engine.config)

Settings and configuration with Pydantic.

Internationalization (maid_engine.i18n)

Multi-language support with Translator and translation utilities.

Persistence (maid_engine.persistence)

Durable entity persistence — saving and restoring entity state across server restarts.

Migrations (maid_engine.migrations)

Database migration framework for managing schema changes across versions.

Observability (maid_engine.observability)

Operational observability including Prometheus metrics export, structured logging, distributed tracing, and health/readiness checks (served on internal port 9090).

Loader (maid_engine.loader)

World data loader pipeline for importing and transforming world data from external sources.


Key Classes

GameEngine

The central game engine that manages the tick loop and content packs.

GameEngine

GameEngine(
    settings: Settings | SettingsProxy | None = None,
    document_store: DocumentStore | None = None,
    template_registry: Any | None = None,
)

Main game engine that runs the tick loop.

The GameEngine manages the game world, networking, and the main tick loop that drives all game systems. It is content-agnostic - all game-specific functionality comes from content packs.

Protocol conformance: GameEngine structurally satisfies the :class:~maid_engine.core.protocols.EngineServices protocol. The World stores a reference to this engine typed as EngineServices (not GameEngine) so that systems only depend on the narrow protocol interface. See :attr:World.engine for details.

Example

engine = GameEngine()

Load content packs

engine.load_content_pack(my_content_pack)

await engine.start()

Engine runs until stopped

In another context:

await engine.stop()

start async

start() -> None

Start the game engine.

stop async

stop() -> None

Stop the game engine.

load_content_pack

load_content_pack(
    pack: ContentPack, use_hot_reload: bool = False
) -> None

Load a content pack.

When the engine is stopped, content packs are loaded synchronously. When the engine is running, set use_hot_reload=True to use the hot reload manager for safe runtime loading.

Parameters:

Name Type Description Default
pack ContentPack

The content pack to load.

required
use_hot_reload bool

If True and engine is running, use hot reload. If False and engine is running, raises RuntimeError.

False

Raises:

Type Description
RuntimeError

If engine is running and use_hot_reload is False.

ValueError

If pack is already loaded or dependencies are missing.

World

Central state management for the game world.

World

World(settings: Settings | SettingsProxy)

Central game world state manager.

The World holds all game state including entities, rooms, and provides efficient queries for game logic.

Example

world = World(settings) await world.startup()

Create a player entity

player = world.create_entity() player.add(PositionComponent(room_id=starting_room.id)) player.add(HealthComponent(current=100, maximum=100))

Query entities in a room

for entity in world.entities_in_room(room_id): ...

get_entity

get_entity(entity_id: UUID) -> Entity | None

Get an entity by ID.

tick async

tick(delta: float) -> None

Process a single game tick.

Parameters:

Name Type Description Default
delta float

Time since last tick in seconds

required

EventBus

Event-driven communication system.

EventBus

EventBus(obs_registry: ObservabilityRegistry | None = None)

Publish/subscribe event system.

The EventBus allows systems to communicate through events without direct coupling. Handlers can be async or sync.

Supports wildcard subscriptions via subscribe_all() to receive all events.

Example

bus = EventBus()

async def on_damage(event: DamageDealtEvent): print(f"Damage dealt: {event.damage}")

bus.subscribe(DamageDealtEvent, on_damage)

Subscribe to ALL events (wildcard)

def log_event(event: Event): print(f"Event: {event.event_type}") bus.subscribe_all(log_event)

await bus.emit(DamageDealtEvent( source_id=attacker.id, target_id=defender.id, damage=25, damage_type="physical" ))

subscribe

subscribe(
    event_type: type[E],
    handler: EventHandler | SyncEventHandler,
    priority: EventPriority = NORMAL,
    once: bool = False,
    source_pack: str | None = None,
) -> UUID

Subscribe to an event type.

Parameters:

Name Type Description Default
event_type type[E]

The event class to subscribe to

required
handler EventHandler | SyncEventHandler

Async or sync function to handle events

required
priority EventPriority

Handler priority (higher priority runs first)

NORMAL
once bool

If True, handler is removed after first invocation

False
source_pack str | None

Name of the content pack registering this handler (for hot reload)

None

Returns:

Type Description
UUID

Handler ID for unsubscribing

unsubscribe

unsubscribe(handler_id: UUID) -> bool

Unsubscribe a handler by ID.

If called during event dispatch, the removal is deferred until the dispatch completes to ensure safe iteration.

Returns:

Type Description
bool

True if handler was found and removed (or queued for removal)

emit async

emit(event: Event) -> None

Emit an event immediately, invoking all handlers.

Handlers are invoked in priority order. If an event is cancelled, remaining handlers are not called.

Both type-specific handlers and wildcard handlers (registered via subscribe_all) are invoked.

Handler removals during dispatch are deferred until dispatch completes.

Entity

Base class for all game entities.

Entity

Entity(
    entity_id: UUID | None = None,
    manager: EntityManager | None = None,
)

Represents a game entity as a collection of components.

Entities are essentially just IDs that group components together. The EntityManager handles the actual component storage and queries.

Example

entity = Entity() entity.add(PositionComponent(room_id=room.id)) entity.add(HealthComponent(current=100, maximum=100))

if entity.has(HealthComponent): health = entity.get(HealthComponent) health.damage(10)

ContentPack Protocol

Protocol defining the content pack interface.

ContentPack

Bases: Protocol

Protocol defining the interface for content packs.

Content packs provide game functionality to the MAID engine. They can include: - ECS systems that process entities - Event types for communication - Commands for player interaction - Document schemas for persistence - Setup/teardown hooks

Security / Trust Model: Content packs run with full privileges. There is no sandboxing, capability restriction, or isolation enforced by the engine. A content pack receives direct access to the GameEngine (via on_load / on_unload), the World, the CommandRegistry, and the DocumentStore. This means a pack can:

- Execute arbitrary Python code, including filesystem and network I/O.
- Inspect or mutate the state of other loaded content packs.
- Register or override commands with no restrictions.
- Crash or hang the server (e.g. blocking the tick loop).

**Only load content packs from sources you trust.** Built-in packs
(``maid-stdlib``, ``maid-classic-rpg``, ``maid-tutorial-world``) are
maintained alongside the engine and are safe to use. Community or
registry packs should be **code-reviewed before use**. Packs discovered
via entry points or directory scanning are loaded and executed
automatically — ensure the Python environment and search paths are
controlled.
Important

Content packs that are discovered via entry points or directory scanning MUST have a no-argument constructor (or a constructor where all parameters have default values). This is because the discovery mechanism cannot pass constructor arguments. If your pack requires initialization parameters, consider using the on_load method or environment variables instead.

Example

class MyContentPack: def init(self) -> None: # No required arguments - this is important for auto-discovery! self._initialized = False

@property
def manifest(self) -> ContentPackManifest:
    return ContentPackManifest(
        name="my-pack",
        version="1.0.0",
        display_name="My Pack",
        description="A custom content pack",
    )

def get_dependencies(self) -> list[str]:
    return ["maid-stdlib"]

def get_systems(self, world: World) -> list[System]:
    return [MySystem(world)]

def get_events(self) -> list[type[Event]]:
    return [MyEvent]

def register_commands(self, registry: CommandRegistry) -> None:
    registry.register(...)

def register_document_schemas(self, store: DocumentStore) -> None:
    store.register_schema("my_collection", MyModel)

async def on_load(self, engine: GameEngine) -> None:
    # Initialize resources - use this for configuration that
    # would otherwise require constructor arguments
    self._initialized = True

async def on_unload(self, engine: GameEngine) -> None:
    # Cleanup resources
    pass

manifest property

Get the content pack manifest.

Returns metadata about this pack including name, version, dependencies, and capabilities.

get_dependencies

get_dependencies() -> list[str]

Get the names of content packs this pack depends on.

Dependencies must be loaded before this pack can be loaded. Returns a list of content pack names (not versions).

get_systems

get_systems(world: World) -> list[System]

Get ECS systems provided by this pack.

Systems are registered with the engine's SystemManager and are updated each tick.

Parameters:

Name Type Description Default
world World

The game world to create systems for

required

Returns:

Type Description
list[System]

List of System instances to register

get_events

get_events() -> list[type[Event]]

Get event types defined by this pack.

These event types can be used by the EventBus for publish/subscribe communication.

Returns:

Type Description
list[type[Event]]

List of Event subclasses

register_commands

register_commands(
    registry: CommandRegistry | LayeredCommandRegistry,
) -> None

Register commands provided by this pack.

Commands are registered with the engine's CommandRegistry and can be executed by players.

Parameters:

Name Type Description Default
registry CommandRegistry | LayeredCommandRegistry

The command registry to register with

required

register_document_schemas

register_document_schemas(store: DocumentStore) -> None

Register document schemas used by this pack.

Schemas define the structure of documents stored in the document store.

Parameters:

Name Type Description Default
store DocumentStore

The document store to register schemas with

required

register_component_types

register_component_types(
    registry: ComponentRegistry,
) -> None

Register component types for persistence serialization/deserialization.

register_api_routes

register_api_routes(router: APIRouter) -> None

Register API routes for this pack.

Content packs can register custom API routes that will be mounted under /admin/packs/{pack_name}/. Routes registered here benefit from the admin authentication and rate limiting middleware.

Parameters:

Name Type Description Default
router APIRouter

An APIRouter scoped to this pack's prefix. Add routes directly to this router.

required

on_load async

on_load(engine: GameEngine) -> None

Called when the pack is loaded.

Use this hook to: - Initialize resources - Load data files - Subscribe to events - Set up background tasks

Parameters:

Name Type Description Default
engine GameEngine

The game engine

required

on_unload async

on_unload(engine: GameEngine) -> None

Called when the pack is unloaded.

Use this hook to: - Clean up resources - Save state - Unsubscribe from events - Cancel background tasks

Parameters:

Name Type Description Default
engine GameEngine

The game engine

required

LayeredCommandRegistry

Priority-based command resolution.

LayeredCommandRegistry

LayeredCommandRegistry(event_bus: EventBus | None = None)

Command registry with layered resolution for content packs.

Commands can be registered by multiple content packs. When a command is executed, the highest priority registration is used.

The registry now integrates with: - Hook system: Pre/post command hooks for logging, rate limiting, etc. - Lock system: Permission checks using lock expressions

Example

registry = LayeredCommandRegistry()

Set pack priorities (higher = wins)

registry.set_pack_priority("my-game", 100) registry.set_pack_priority("stdlib", 50) registry.set_pack_priority("engine", 0)

Register commands from different packs

registry.register("look", look_handler, pack_name="stdlib") registry.register("look", custom_look_handler, pack_name="my-game")

When "look" is executed, my-game's handler is used (priority 100)

Register hooks

registry.register_pre_hook("rate_limiter", check_rate_limit) registry.register_post_hook("logger", log_command)

Register custom lock functions

registry.register_lock_function("is_owner", check_ownership)

execute async

execute(
    context: CommandContext,
    player_access_level: AccessLevel = PLAYER,
) -> bool

Execute a command using the highest-priority handler with lock checking and hook pipeline.

Execution flow: 1. Find the command definition 2. Check lock expression (if defined) 3. Check access level 4. Run pre-hooks (can cancel execution) 5. Execute command handler 6. Run post-hooks (always runs, even on exception)

Parameters:

Name Type Description Default
context CommandContext

Command context containing session, player_id, command, args, etc.

required
player_access_level AccessLevel

The access level of the player executing

PLAYER

Returns:

Type Description
bool

True if command was handled successfully, False if command not found

bool

or execution was cancelled

Raises:

Type Description
PermissionError

If player lacks required access level


Package API

Full package API documentation:

maid_engine

MAID Engine - Core infrastructure for building MUD games.

This package provides the foundational infrastructure for building Multi-User Dungeon (MUD) games, including:

  • Entity Component System (ECS) for game entity management
  • Event-driven communication via EventBus
  • Network layer (Telnet and WebSocket support)
  • Authentication and session management
  • Storage abstraction (SQL for auth, document store for game data)
  • Plugin/content pack system for extensibility
  • AI provider abstraction
  • Internationalization (i18n) support

AccessLevel

Bases: IntEnum

Command access levels.

BaseContentPack

Base implementation of ContentPack with sensible defaults.

Extend this class to create a content pack with minimal boilerplate. Override only the methods you need.

Example

class MyPack(BaseContentPack): @property def manifest(self) -> ContentPackManifest: return ContentPackManifest( name="my-pack", version="1.0.0", display_name="My Pack", description="Custom content", )

def get_systems(self, world: World) -> list[System]:
    return [MySystem(world)]

manifest property

Get the content pack manifest. Must be overridden.

get_dependencies

get_dependencies() -> list[str]

Get dependencies. Default: none.

Source code in packages/maid-engine/src/maid_engine/plugins/protocol.py
def get_dependencies(self) -> list[str]:
    """Get dependencies. Default: none."""
    return []

get_events

get_events() -> list[type[Event]]

Get events. Default: none.

Source code in packages/maid-engine/src/maid_engine/plugins/protocol.py
def get_events(self) -> list[type[Event]]:
    """Get events. Default: none."""
    return []

get_systems

get_systems(world: World) -> list[System]

Get systems. Default: none.

Source code in packages/maid-engine/src/maid_engine/plugins/protocol.py
def get_systems(self, world: World) -> list[System]:  # noqa: ARG002
    """Get systems. Default: none."""
    return []

on_load async

on_load(engine: GameEngine) -> None

On load hook. Default: no-op.

Source code in packages/maid-engine/src/maid_engine/plugins/protocol.py
async def on_load(self, engine: GameEngine) -> None:
    """On load hook. Default: no-op."""
    pass

on_unload async

on_unload(engine: GameEngine) -> None

On unload hook. Default: no-op.

Source code in packages/maid-engine/src/maid_engine/plugins/protocol.py
async def on_unload(self, engine: GameEngine) -> None:
    """On unload hook. Default: no-op."""
    pass

register_api_routes

register_api_routes(router: APIRouter) -> None

Register API routes. Default: none.

Source code in packages/maid-engine/src/maid_engine/plugins/protocol.py
def register_api_routes(self, router: APIRouter) -> None:
    """Register API routes. Default: none."""
    pass

register_commands

register_commands(
    registry: CommandRegistry | LayeredCommandRegistry,
) -> None

Register commands. Default: none.

Source code in packages/maid-engine/src/maid_engine/plugins/protocol.py
def register_commands(self, registry: CommandRegistry | LayeredCommandRegistry) -> None:
    """Register commands. Default: none."""
    pass

register_component_types

register_component_types(
    registry: ComponentRegistry,
) -> None

Register component types. Default: none.

Source code in packages/maid-engine/src/maid_engine/plugins/protocol.py
def register_component_types(self, registry: ComponentRegistry) -> None:
    """Register component types. Default: none."""
    pass

register_document_schemas

register_document_schemas(store: DocumentStore) -> None

Register schemas. Default: none.

Source code in packages/maid-engine/src/maid_engine/plugins/protocol.py
def register_document_schemas(self, store: DocumentStore) -> None:
    """Register schemas. Default: none."""
    pass

CombatState

Bases: Protocol

Protocol for components that track combat state.

Used by the checkpoint system to capture and restore combat state during hot reload without importing CombatComponent from maid-stdlib.

CommandContext dataclass

CommandContext(
    session: Session,
    player_id: UUID,
    command: str,
    args: list[str],
    raw_input: str,
    world: World,
    document_store: DocumentStore | None = None,
    obs_registry: ObservabilityRegistry | None = None,
    metadata: dict[str, Any] = dict(),
    target: Any | None = None,
)

Context passed to command handlers.

Attributes:

Name Type Description
session Session

The player session

player_id UUID

UUID of the player entity

command str

The command name executed

args list[str]

Command arguments

raw_input str

Full raw input string

world World

The game world

document_store DocumentStore | None

Document store for persistence

metadata dict[str, Any]

Additional metadata

target Any | None

Optional target entity for lock evaluation (e.g., for owns()/holds() checks)

rest property

rest: str

Everything after the command name in the raw input.

CommandDefinition dataclass

CommandDefinition(
    name: str,
    handler: CommandHandler,
    pack_name: str,
    priority: int = 0,
    aliases: list[str] = list(),
    category: str = "general",
    description: str = "",
    usage: str = "",
    access_level: AccessLevel = PLAYER,
    hidden: bool = False,
    locks: str = "",
    metadata: dict[str, Any] = dict(),
)

Definition of a registered command.

Attributes:

Name Type Description
name str

Primary command name

handler CommandHandler

Async function to handle the command

pack_name str

Name of the content pack that registered this

priority int

Priority for layered resolution

aliases list[str]

Alternative names for this command

category str

Command category for help

description str

Short description

usage str

Usage string

access_level AccessLevel

Minimum access level required

hidden bool

Whether to hide from help listings

locks str

Lock expression for permission checks (e.g., "perm(admin) OR owns()")

metadata dict[str, Any]

Additional metadata for hooks (e.g., cooldowns, combat state)

CommandRegistry

CommandRegistry()

Simple command registry without layering.

For basic command registration without content pack support.

Source code in packages/maid-engine/src/maid_engine/commands/registry.py
def __init__(self) -> None:
    self._commands: dict[str, CommandDefinition] = {}
    self._aliases: dict[str, str] = {}  # alias -> primary name

all_commands

all_commands() -> list[CommandDefinition]

Get all registered commands.

Source code in packages/maid-engine/src/maid_engine/commands/registry.py
def all_commands(self) -> list[CommandDefinition]:
    """Get all registered commands."""
    return list(self._commands.values())

commands_by_category

commands_by_category() -> dict[
    str, list[CommandDefinition]
]

Get commands grouped by category.

Source code in packages/maid-engine/src/maid_engine/commands/registry.py
def commands_by_category(self) -> dict[str, list[CommandDefinition]]:
    """Get commands grouped by category."""
    by_category: dict[str, list[CommandDefinition]] = defaultdict(list)
    for cmd in self._commands.values():
        if not cmd.hidden:
            by_category[cmd.category].append(cmd)
    return dict(by_category)

execute async

execute(
    context: CommandContext,
    player_access_level: AccessLevel = PLAYER,
) -> bool

Execute a command.

Parameters:

Name Type Description Default
context CommandContext

Command context

required
player_access_level AccessLevel

The access level of the player executing

PLAYER

Returns:

Type Description
bool

True if command was handled successfully

Raises:

Type Description
PermissionError

If player lacks required access level

Source code in packages/maid-engine/src/maid_engine/commands/registry.py
async def execute(
    self,
    context: CommandContext,
    player_access_level: AccessLevel = AccessLevel.PLAYER,
) -> bool:
    """Execute a command.

    Args:
        context: Command context
        player_access_level: The access level of the player executing

    Returns:
        True if command was handled successfully

    Raises:
        PermissionError: If player lacks required access level
    """
    definition = self.get(context.command)
    if not definition:
        return False

    # Check access level
    if player_access_level < definition.access_level:
        raise PermissionError(
            f"Command '{definition.name}' requires {definition.access_level.name} "
            f"access (you have {player_access_level.name})"
        )

    result = await definition.handler(context)
    # Ensure we always return a bool, even if handler returns None
    return result if result is not None else True

get

get(name: str) -> CommandDefinition | None

Get a command by name, alias, or unique prefix.

Source code in packages/maid-engine/src/maid_engine/commands/registry.py
def get(self, name: str) -> CommandDefinition | None:
    """Get a command by name, alias, or unique prefix."""
    # Check if it's an alias
    if name in self._aliases:
        name = self._aliases[name]
    result = self._commands.get(name)
    if result:
        return result

    # Try prefix matching — find all commands/aliases starting with name
    matches: set[str] = set()
    for cmd_name in self._commands:
        if cmd_name.startswith(name):
            matches.add(cmd_name)
    for alias, cmd_name in self._aliases.items():
        if alias.startswith(name):
            matches.add(cmd_name)

    if len(matches) == 1:
        return self._commands.get(matches.pop())
    return None

register

register(
    name: str,
    handler: CommandHandler,
    *,
    aliases: list[str] | None = None,
    category: str = "general",
    description: str = "",
    usage: str = "",
    access_level: AccessLevel = PLAYER,
    hidden: bool = False,
) -> None

Register a command.

Source code in packages/maid-engine/src/maid_engine/commands/registry.py
def register(
    self,
    name: str,
    handler: CommandHandler,
    *,
    aliases: list[str] | None = None,
    category: str = "general",
    description: str = "",
    usage: str = "",
    access_level: AccessLevel = AccessLevel.PLAYER,
    hidden: bool = False,
) -> None:
    """Register a command."""
    definition = CommandDefinition(
        name=name,
        handler=handler,
        pack_name="core",
        aliases=aliases or [],
        category=category,
        description=description,
        usage=usage,
        access_level=access_level,
        hidden=hidden,
    )

    self._commands[name] = definition

    for alias in definition.aliases:
        self._aliases[alias] = name

unregister

unregister(name: str) -> bool

Unregister a command.

Source code in packages/maid-engine/src/maid_engine/commands/registry.py
def unregister(self, name: str) -> bool:
    """Unregister a command."""
    if name not in self._commands:
        return False

    definition = self._commands.pop(name)

    for alias in definition.aliases:
        self._aliases.pop(alias, None)

    return True

Component

Bases: BaseModel

Base class for all ECS components.

Components are pure data containers that can be attached to entities. They contain no logic - all behavior is implemented in Systems.

Example

class PositionComponent(Component): room_id: UUID x: int = 0 y: int = 0

get_type classmethod

get_type() -> str

Get the component type identifier.

Source code in packages/maid-engine/src/maid_engine/core/ecs/component.py
@classmethod
def get_type(cls) -> str:
    """Get the component type identifier."""
    return cls.component_type or cls.__name__

notify_mutation

notify_mutation() -> None

Mark the owning entity dirty after in-place container mutation.

Source code in packages/maid-engine/src/maid_engine/core/ecs/component.py
def notify_mutation(self) -> None:
    """Mark the owning entity dirty after in-place container mutation."""
    callback = self._dirty_callback
    owner_id = self._owner_id
    if callback is not None and owner_id is not None:
        callback(owner_id)

ComponentAddedEvent dataclass

ComponentAddedEvent(entity_id: UUID, component_type: str)

Bases: Event

Emitted when a component is added to an entity.

ComponentRemovedEvent dataclass

ComponentRemovedEvent(entity_id: UUID, component_type: str)

Bases: Event

Emitted when a component is removed from an entity.

ContentPack

Bases: Protocol

Protocol defining the interface for content packs.

Content packs provide game functionality to the MAID engine. They can include: - ECS systems that process entities - Event types for communication - Commands for player interaction - Document schemas for persistence - Setup/teardown hooks

Security / Trust Model: Content packs run with full privileges. There is no sandboxing, capability restriction, or isolation enforced by the engine. A content pack receives direct access to the GameEngine (via on_load / on_unload), the World, the CommandRegistry, and the DocumentStore. This means a pack can:

- Execute arbitrary Python code, including filesystem and network I/O.
- Inspect or mutate the state of other loaded content packs.
- Register or override commands with no restrictions.
- Crash or hang the server (e.g. blocking the tick loop).

**Only load content packs from sources you trust.** Built-in packs
(``maid-stdlib``, ``maid-classic-rpg``, ``maid-tutorial-world``) are
maintained alongside the engine and are safe to use. Community or
registry packs should be **code-reviewed before use**. Packs discovered
via entry points or directory scanning are loaded and executed
automatically — ensure the Python environment and search paths are
controlled.
Important

Content packs that are discovered via entry points or directory scanning MUST have a no-argument constructor (or a constructor where all parameters have default values). This is because the discovery mechanism cannot pass constructor arguments. If your pack requires initialization parameters, consider using the on_load method or environment variables instead.

Example

class MyContentPack: def init(self) -> None: # No required arguments - this is important for auto-discovery! self._initialized = False

@property
def manifest(self) -> ContentPackManifest:
    return ContentPackManifest(
        name="my-pack",
        version="1.0.0",
        display_name="My Pack",
        description="A custom content pack",
    )

def get_dependencies(self) -> list[str]:
    return ["maid-stdlib"]

def get_systems(self, world: World) -> list[System]:
    return [MySystem(world)]

def get_events(self) -> list[type[Event]]:
    return [MyEvent]

def register_commands(self, registry: CommandRegistry) -> None:
    registry.register(...)

def register_document_schemas(self, store: DocumentStore) -> None:
    store.register_schema("my_collection", MyModel)

async def on_load(self, engine: GameEngine) -> None:
    # Initialize resources - use this for configuration that
    # would otherwise require constructor arguments
    self._initialized = True

async def on_unload(self, engine: GameEngine) -> None:
    # Cleanup resources
    pass

manifest property

Get the content pack manifest.

Returns metadata about this pack including name, version, dependencies, and capabilities.

get_dependencies

get_dependencies() -> list[str]

Get the names of content packs this pack depends on.

Dependencies must be loaded before this pack can be loaded. Returns a list of content pack names (not versions).

Source code in packages/maid-engine/src/maid_engine/plugins/protocol.py
def get_dependencies(self) -> list[str]:
    """Get the names of content packs this pack depends on.

    Dependencies must be loaded before this pack can be loaded.
    Returns a list of content pack names (not versions).
    """
    ...

get_events

get_events() -> list[type[Event]]

Get event types defined by this pack.

These event types can be used by the EventBus for publish/subscribe communication.

Returns:

Type Description
list[type[Event]]

List of Event subclasses

Source code in packages/maid-engine/src/maid_engine/plugins/protocol.py
def get_events(self) -> list[type[Event]]:
    """Get event types defined by this pack.

    These event types can be used by the EventBus for
    publish/subscribe communication.

    Returns:
        List of Event subclasses
    """
    ...

get_systems

get_systems(world: World) -> list[System]

Get ECS systems provided by this pack.

Systems are registered with the engine's SystemManager and are updated each tick.

Parameters:

Name Type Description Default
world World

The game world to create systems for

required

Returns:

Type Description
list[System]

List of System instances to register

Source code in packages/maid-engine/src/maid_engine/plugins/protocol.py
def get_systems(self, world: World) -> list[System]:
    """Get ECS systems provided by this pack.

    Systems are registered with the engine's SystemManager and
    are updated each tick.

    Args:
        world: The game world to create systems for

    Returns:
        List of System instances to register
    """
    ...

on_load async

on_load(engine: GameEngine) -> None

Called when the pack is loaded.

Use this hook to: - Initialize resources - Load data files - Subscribe to events - Set up background tasks

Parameters:

Name Type Description Default
engine GameEngine

The game engine

required
Source code in packages/maid-engine/src/maid_engine/plugins/protocol.py
async def on_load(self, engine: GameEngine) -> None:
    """Called when the pack is loaded.

    Use this hook to:
    - Initialize resources
    - Load data files
    - Subscribe to events
    - Set up background tasks

    Args:
        engine: The game engine
    """
    ...

on_unload async

on_unload(engine: GameEngine) -> None

Called when the pack is unloaded.

Use this hook to: - Clean up resources - Save state - Unsubscribe from events - Cancel background tasks

Parameters:

Name Type Description Default
engine GameEngine

The game engine

required
Source code in packages/maid-engine/src/maid_engine/plugins/protocol.py
async def on_unload(self, engine: GameEngine) -> None:
    """Called when the pack is unloaded.

    Use this hook to:
    - Clean up resources
    - Save state
    - Unsubscribe from events
    - Cancel background tasks

    Args:
        engine: The game engine
    """
    ...

register_api_routes

register_api_routes(router: APIRouter) -> None

Register API routes for this pack.

Content packs can register custom API routes that will be mounted under /admin/packs/{pack_name}/. Routes registered here benefit from the admin authentication and rate limiting middleware.

Parameters:

Name Type Description Default
router APIRouter

An APIRouter scoped to this pack's prefix. Add routes directly to this router.

required
Source code in packages/maid-engine/src/maid_engine/plugins/protocol.py
def register_api_routes(self, router: APIRouter) -> None:
    """Register API routes for this pack.

    Content packs can register custom API routes that will be mounted
    under ``/admin/packs/{pack_name}/``. Routes registered here
    benefit from the admin authentication and rate limiting middleware.

    Args:
        router: An APIRouter scoped to this pack's prefix.
            Add routes directly to this router.
    """
    ...

register_commands

register_commands(
    registry: CommandRegistry | LayeredCommandRegistry,
) -> None

Register commands provided by this pack.

Commands are registered with the engine's CommandRegistry and can be executed by players.

Parameters:

Name Type Description Default
registry CommandRegistry | LayeredCommandRegistry

The command registry to register with

required
Source code in packages/maid-engine/src/maid_engine/plugins/protocol.py
def register_commands(self, registry: CommandRegistry | LayeredCommandRegistry) -> None:
    """Register commands provided by this pack.

    Commands are registered with the engine's CommandRegistry
    and can be executed by players.

    Args:
        registry: The command registry to register with
    """
    ...

register_component_types

register_component_types(
    registry: ComponentRegistry,
) -> None

Register component types for persistence serialization/deserialization.

Source code in packages/maid-engine/src/maid_engine/plugins/protocol.py
def register_component_types(self, registry: ComponentRegistry) -> None:
    """Register component types for persistence serialization/deserialization."""
    ...

register_document_schemas

register_document_schemas(store: DocumentStore) -> None

Register document schemas used by this pack.

Schemas define the structure of documents stored in the document store.

Parameters:

Name Type Description Default
store DocumentStore

The document store to register schemas with

required
Source code in packages/maid-engine/src/maid_engine/plugins/protocol.py
def register_document_schemas(self, store: DocumentStore) -> None:
    """Register document schemas used by this pack.

    Schemas define the structure of documents stored in the
    document store.

    Args:
        store: The document store to register schemas with
    """
    ...

ContentPackLoader

ContentPackLoader()

Manages content pack loading with dependency resolution.

Example

loader = ContentPackLoader()

Discover and register available packs

for pack in discover_content_packs(): loader.register(pack)

Load packs in dependency order

ordered_packs = loader.resolve_load_order() for pack in ordered_packs: engine.load_content_pack(pack)

Source code in packages/maid-engine/src/maid_engine/plugins/loader.py
def __init__(self) -> None:
    self._available: dict[str, ContentPack] = {}
    self._loaded: set[str] = set()

available_packs property

available_packs: list[str]

Get names of all available packs.

loaded_packs property

loaded_packs: set[str]

Get names of loaded packs.

can_unload

can_unload(pack_name: str) -> tuple[bool, list[str]]

Check if a pack can be unloaded.

Returns:

Type Description
tuple[bool, list[str]]

Tuple of (can_unload, list of blocking dependents)

Source code in packages/maid-engine/src/maid_engine/plugins/loader.py
def can_unload(self, pack_name: str) -> tuple[bool, list[str]]:
    """Check if a pack can be unloaded.

    Returns:
        Tuple of (can_unload, list of blocking dependents)
    """
    dependents = self.get_dependents(pack_name)
    loaded_dependents = [d for d in dependents if d in self._loaded]
    return len(loaded_dependents) == 0, loaded_dependents

get

get(pack_name: str) -> ContentPack | None

Get a registered pack by name.

Source code in packages/maid-engine/src/maid_engine/plugins/loader.py
def get(self, pack_name: str) -> ContentPack | None:
    """Get a registered pack by name."""
    return self._available.get(pack_name)

get_dependents

get_dependents(pack_name: str) -> list[str]

Get packs that depend on the given pack.

Source code in packages/maid-engine/src/maid_engine/plugins/loader.py
def get_dependents(self, pack_name: str) -> list[str]:
    """Get packs that depend on the given pack."""
    dependents: list[str] = []
    for name, pack in self._available.items():
        if pack_name in pack.get_dependencies():
            dependents.append(name)
    return dependents

mark_loaded

mark_loaded(pack_name: str) -> None

Mark a pack as loaded.

Source code in packages/maid-engine/src/maid_engine/plugins/loader.py
def mark_loaded(self, pack_name: str) -> None:
    """Mark a pack as loaded."""
    self._loaded.add(pack_name)

mark_unloaded

mark_unloaded(pack_name: str) -> None

Mark a pack as unloaded.

Source code in packages/maid-engine/src/maid_engine/plugins/loader.py
def mark_unloaded(self, pack_name: str) -> None:
    """Mark a pack as unloaded."""
    self._loaded.discard(pack_name)

register

register(pack: ContentPack) -> None

Register an available content pack.

Source code in packages/maid-engine/src/maid_engine/plugins/loader.py
def register(self, pack: ContentPack) -> None:
    """Register an available content pack."""
    self._available[pack.manifest.name] = pack

resolve_load_order

resolve_load_order(
    pack_names: list[str] | None = None,
) -> list[ContentPack]

Resolve the loading order for content packs.

Uses topological sort to order packs by dependencies.

Parameters:

Name Type Description Default
pack_names list[str] | None

Specific packs to load (default: all available)

None

Returns:

Type Description
list[ContentPack]

List of packs in dependency order

Raises:

Type Description
DependencyError

If dependencies cannot be resolved

Source code in packages/maid-engine/src/maid_engine/plugins/loader.py
def resolve_load_order(
    self, pack_names: list[str] | None = None
) -> list[ContentPack]:
    """Resolve the loading order for content packs.

    Uses topological sort to order packs by dependencies.

    Args:
        pack_names: Specific packs to load (default: all available)

    Returns:
        List of packs in dependency order

    Raises:
        DependencyError: If dependencies cannot be resolved
    """
    if pack_names is None:
        pack_names = list(self._available.keys())

    # Build dependency graph - include transitive dependencies
    graph: dict[str, set[str]] = {}
    to_process = list(pack_names)
    processed = set()

    while to_process:
        name = to_process.pop(0)
        if name in processed:
            continue
        processed.add(name)

        pack = self._available.get(name)
        if not pack:
            raise DependencyError(f"Content pack '{name}' not found")

        deps = set(pack.get_dependencies())
        graph[name] = deps

        # Get version constraints from manifest
        version_constraints = pack.manifest.dependencies

        # Verify all dependencies are available, meet version constraints,
        # and queue them for processing
        for dep in deps:
            if dep not in self._available:
                raise DependencyError(
                    f"Content pack '{name}' requires '{dep}' which is not available"
                )

            # Check version constraint if specified
            dep_pack = self._available[dep]
            version_constraint = version_constraints.get(dep, "")
            if version_constraint and not dep_pack.manifest.satisfies_dependency(
                dep, version_constraint
            ):
                raise DependencyError(
                    f"Content pack '{name}' requires '{dep}' "
                    f"version {version_constraint}, but version "
                    f"{dep_pack.manifest.version} is available"
                )

            # Add transitive dependency to processing queue
            if dep not in processed and dep not in to_process:
                to_process.append(dep)

    # Topological sort
    ordered: list[str] = []
    visited: set[str] = set()
    visiting: set[str] = set()

    def visit(name: str) -> None:
        if name in visited:
            return
        if name in visiting:
            raise DependencyError(
                f"Circular dependency detected involving '{name}'"
            )

        visiting.add(name)

        for dep in graph.get(name, set()):
            visit(dep)

        visiting.remove(name)
        visited.add(name)
        ordered.append(name)

    # Visit all packs in the graph (including transitive dependencies)
    for name in graph:
        visit(name)

    return [self._available[name] for name in ordered]

unregister

unregister(pack_name: str) -> ContentPack | None

Unregister a content pack.

Source code in packages/maid-engine/src/maid_engine/plugins/loader.py
def unregister(self, pack_name: str) -> ContentPack | None:
    """Unregister a content pack."""
    return self._available.pop(pack_name, None)

ContentPackManifest dataclass

ContentPackManifest(
    name: str,
    version: str,
    display_name: str = "",
    description: str = "",
    dependencies: dict[str, str] = dict(),
    provides: list[str] = list(),
    requires: list[str] = list(),
    authors: list[str] = list(),
    license: str = "",
    homepage: str = "",
    keywords: list[str] = list(),
    migration_requires: dict[str, int] = dict(),
)

Metadata about a content pack.

Attributes:

Name Type Description
name str

Unique identifier for the pack (e.g., "classic-rpg")

version str

Semantic version string (e.g., "0.1.0")

display_name str

Human-readable name (e.g., "Classic RPG")

description str

Brief description of the pack

dependencies dict[str, str]

Dict of pack_name -> version_constraint

provides list[str]

List of capability identifiers this pack provides

requires list[str]

List of capability identifiers this pack requires

authors list[str]

List of author names

license str

SPDX license identifier

homepage str

URL to pack homepage/repository

keywords list[str]

List of keywords for discovery

from_dict classmethod

from_dict(data: dict[str, Any]) -> ContentPackManifest

Create manifest from a dictionary.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary with 'pack' key containing manifest data

required

Returns:

Type Description
ContentPackManifest

ContentPackManifest instance

Source code in packages/maid-engine/src/maid_engine/plugins/manifest.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> ContentPackManifest:
    """Create manifest from a dictionary.

    Args:
        data: Dictionary with 'pack' key containing manifest data

    Returns:
        ContentPackManifest instance
    """
    pack_data = data.get("pack", data)

    # Extract capabilities section
    capabilities = pack_data.get("capabilities", {})

    return cls(
        name=pack_data["name"],
        version=pack_data["version"],
        display_name=pack_data.get("display_name", ""),
        description=pack_data.get("description", ""),
        dependencies=pack_data.get("dependencies", {}),
        provides=capabilities.get("provides", []),
        requires=capabilities.get("requires", []),
        authors=pack_data.get("authors", []),
        license=pack_data.get("license", ""),
        homepage=pack_data.get("homepage", ""),
        keywords=pack_data.get("keywords", []),
        migration_requires=pack_data.get("migration_requires", {}),
    )

from_toml classmethod

from_toml(path: Path) -> ContentPackManifest

Load manifest from a TOML file.

Expected format

[pack] name = "my-pack" version = "1.0.0" display_name = "My Pack" description = "A custom content pack"

[pack.dependencies] maid-stdlib = ">=0.1.0"

[pack.capabilities] provides = ["combat-system", "magic-system"] requires = ["ecs", "event-bus"]

Source code in packages/maid-engine/src/maid_engine/plugins/manifest.py
@classmethod
def from_toml(cls, path: Path) -> ContentPackManifest:
    """Load manifest from a TOML file.

    Expected format:
        [pack]
        name = "my-pack"
        version = "1.0.0"
        display_name = "My Pack"
        description = "A custom content pack"

        [pack.dependencies]
        maid-stdlib = ">=0.1.0"

        [pack.capabilities]
        provides = ["combat-system", "magic-system"]
        requires = ["ecs", "event-bus"]
    """
    with open(path, "rb") as f:
        data = tomllib.load(f)

    return cls.from_dict(data)

satisfies_dependency

satisfies_dependency(name: str, version: str) -> bool

Check if this pack satisfies a dependency requirement.

Uses PEP440 version specifiers for proper version comparison, supporting: - Pre-release versions (e.g., 1.0.0a1, 1.0.0b2, 1.0.0rc1) - Post-release versions (e.g., 1.0.0.post1) - Dev versions (e.g., 1.0.0.dev1) - Local versions (e.g., 1.0.0+local) - Epoch (e.g., 1!1.0.0) - All PEP440 operators (, !=, <, <=, >, >=, ~=, =)

Parameters:

Name Type Description Default
name str

Required pack name

required
version str

Version constraint (e.g., ">=0.1.0", "~=1.4", ">=1.0,<2.0")

required

Returns:

Type Description
bool

True if this pack satisfies the requirement

Source code in packages/maid-engine/src/maid_engine/plugins/manifest.py
def satisfies_dependency(self, name: str, version: str) -> bool:
    """Check if this pack satisfies a dependency requirement.

    Uses PEP440 version specifiers for proper version comparison, supporting:
    - Pre-release versions (e.g., 1.0.0a1, 1.0.0b2, 1.0.0rc1)
    - Post-release versions (e.g., 1.0.0.post1)
    - Dev versions (e.g., 1.0.0.dev1)
    - Local versions (e.g., 1.0.0+local)
    - Epoch (e.g., 1!1.0.0)
    - All PEP440 operators (==, !=, <, <=, >, >=, ~=, ===)

    Args:
        name: Required pack name
        version: Version constraint (e.g., ">=0.1.0", "~=1.4", ">=1.0,<2.0")

    Returns:
        True if this pack satisfies the requirement
    """
    if self.name != name:
        return False

    constraint = version.strip()

    # Empty constraint means any version is acceptable
    if not constraint:
        return True

    # Handle legacy single '=' prefix (normalize to '==')
    if constraint.startswith("=") and not constraint.startswith("=="):
        constraint = "=" + constraint

    # Handle bare versions without operators (treat as exact match)
    # PEP440 requires an operator, but for backwards compatibility
    # we treat bare version strings like "1.0.0" as "==1.0.0"
    if constraint[0].isdigit():
        constraint = f"=={constraint}"

    try:
        # Parse the version of this pack
        pack_version = Version(self.version)

        # Parse the constraint as a PEP440 specifier set
        # SpecifierSet handles comma-separated constraints like ">=1.0,<2.0"
        specifier = SpecifierSet(constraint)

        # Check if the pack version satisfies the constraint
        return pack_version in specifier
    except InvalidVersion:
        # If we can't parse our version, fall back to string comparison
        # for legacy versions that don't follow PEP440
        return self._legacy_version_match(constraint)
    except InvalidSpecifier:
        # If the constraint is invalid PEP440, treat it as "any version"
        # for backwards compatibility. The caller (e.g., hot_reload) should
        # validate constraint syntax separately and log warnings.
        return True

to_dict

to_dict() -> dict[str, Any]

Convert manifest to dictionary.

Source code in packages/maid-engine/src/maid_engine/plugins/manifest.py
def to_dict(self) -> dict[str, Any]:
    """Convert manifest to dictionary."""
    return {
        "pack": {
            "name": self.name,
            "version": self.version,
            "display_name": self.display_name,
            "description": self.description,
            "dependencies": self.dependencies,
            "capabilities": {
                "provides": self.provides,
                "requires": self.requires,
            },
            "authors": self.authors,
            "license": self.license,
            "homepage": self.homepage,
            "keywords": self.keywords,
            "migration_requires": self.migration_requires,
        }
    }

DocumentCollection

Bases: ABC

Abstract base class for document collections.

A collection holds documents of a specific type, identified by UUID. All operations are async for compatibility with async databases.

Example

Get a collection

characters = store.get_collection("characters")

CRUD operations

doc_id = await characters.create(character_data) character = await characters.get(doc_id) await characters.update(doc_id, updated_data) await characters.delete(doc_id)

Query

results = await characters.query(QueryOptions( filters={"level": 10}, order_by="name", limit=20 ))

name abstractmethod property

name: str

Get the collection name.

count abstractmethod async

count(filters: dict[str, Any] | None = None) -> int

Count documents.

Parameters:

Name Type Description Default
filters dict[str, Any] | None

Optional filters to apply

None

Returns:

Type Description
int

Number of matching documents

Source code in packages/maid-engine/src/maid_engine/storage/document_store.py
@abstractmethod
async def count(self, filters: dict[str, Any] | None = None) -> int:
    """Count documents.

    Args:
        filters: Optional filters to apply

    Returns:
        Number of matching documents
    """
    ...

create abstractmethod async

create(document: T, doc_id: UUID | None = None) -> UUID

Create a new document.

Parameters:

Name Type Description Default
document T

Document data

required
doc_id UUID | None

Optional specific UUID (generated if not provided)

None

Returns:

Type Description
UUID

UUID of created document

Source code in packages/maid-engine/src/maid_engine/storage/document_store.py
@abstractmethod
async def create(self, document: T, doc_id: UUID | None = None) -> UUID:
    """Create a new document.

    Args:
        document: Document data
        doc_id: Optional specific UUID (generated if not provided)

    Returns:
        UUID of created document
    """
    ...

delete abstractmethod async

delete(doc_id: UUID) -> bool

Delete a document.

Parameters:

Name Type Description Default
doc_id UUID

Document UUID

required

Returns:

Type Description
bool

True if document was deleted, False if not found

Source code in packages/maid-engine/src/maid_engine/storage/document_store.py
@abstractmethod
async def delete(self, doc_id: UUID) -> bool:
    """Delete a document.

    Args:
        doc_id: Document UUID

    Returns:
        True if document was deleted, False if not found
    """
    ...

exists abstractmethod async

exists(doc_id: UUID) -> bool

Check if a document exists.

Parameters:

Name Type Description Default
doc_id UUID

Document UUID

required

Returns:

Type Description
bool

True if document exists

Source code in packages/maid-engine/src/maid_engine/storage/document_store.py
@abstractmethod
async def exists(self, doc_id: UUID) -> bool:
    """Check if a document exists.

    Args:
        doc_id: Document UUID

    Returns:
        True if document exists
    """
    ...

get abstractmethod async

get(doc_id: UUID) -> T | None

Get a document by ID.

Parameters:

Name Type Description Default
doc_id UUID

Document UUID

required

Returns:

Type Description
T | None

Document if found, None otherwise

Source code in packages/maid-engine/src/maid_engine/storage/document_store.py
@abstractmethod
async def get(self, doc_id: UUID) -> T | None:
    """Get a document by ID.

    Args:
        doc_id: Document UUID

    Returns:
        Document if found, None otherwise
    """
    ...

get_many abstractmethod async

get_many(doc_ids: list[UUID]) -> list[T]

Get multiple documents by ID.

Parameters:

Name Type Description Default
doc_ids list[UUID]

List of document UUIDs

required

Returns:

Type Description
list[T]

List of found documents (may be shorter than doc_ids)

Source code in packages/maid-engine/src/maid_engine/storage/document_store.py
@abstractmethod
async def get_many(self, doc_ids: list[UUID]) -> list[T]:
    """Get multiple documents by ID.

    Args:
        doc_ids: List of document UUIDs

    Returns:
        List of found documents (may be shorter than doc_ids)
    """
    ...

query abstractmethod async

query(options: QueryOptions) -> list[T]

Query documents.

Parameters:

Name Type Description Default
options QueryOptions

Query options (filters, ordering, pagination)

required

Returns:

Type Description
list[T]

List of matching documents

Source code in packages/maid-engine/src/maid_engine/storage/document_store.py
@abstractmethod
async def query(self, options: QueryOptions) -> list[T]:
    """Query documents.

    Args:
        options: Query options (filters, ordering, pagination)

    Returns:
        List of matching documents
    """
    ...

update abstractmethod async

update(doc_id: UUID, document: T) -> bool

Update an existing document.

Parameters:

Name Type Description Default
doc_id UUID

Document UUID

required
document T

New document data

required

Returns:

Type Description
bool

True if document was updated, False if not found

Source code in packages/maid-engine/src/maid_engine/storage/document_store.py
@abstractmethod
async def update(self, doc_id: UUID, document: T) -> bool:
    """Update an existing document.

    Args:
        doc_id: Document UUID
        document: New document data

    Returns:
        True if document was updated, False if not found
    """
    ...

upsert abstractmethod async

upsert(doc_id: UUID, document: T) -> bool

Insert or update document atomically.

Parameters:

Name Type Description Default
doc_id UUID

Document UUID

required
document T

Document data

required

Returns:

Type Description
bool

True if successful

Source code in packages/maid-engine/src/maid_engine/storage/document_store.py
@abstractmethod
async def upsert(self, doc_id: UUID, document: T) -> bool:
    """Insert or update document atomically.

    Args:
        doc_id: Document UUID
        document: Document data

    Returns:
        True if successful
    """
    ...

upsert_many async

upsert_many(docs: list[tuple[UUID, T]]) -> list[bool]

Insert or update multiple documents.

Default implementation performs sequential upserts. Storage backends can override this method for batched operations.

Source code in packages/maid-engine/src/maid_engine/storage/document_store.py
async def upsert_many(self, docs: list[tuple[UUID, T]]) -> list[bool]:
    """Insert or update multiple documents.

    Default implementation performs sequential upserts. Storage backends can
    override this method for batched operations.
    """
    results: list[bool] = []
    for doc_id, document in docs:
        results.append(await self.upsert(doc_id, document))
    return results

DocumentStore

Bases: ABC

Abstract base class for the document store.

The document store manages collections of documents. Content packs register their document schemas, then use collections for persistence.

Example

In content pack initialization

store.register_schema("characters", CharacterModel) store.register_schema("items", ItemModel)

Later, in game logic

characters = store.get_collection("characters") character = await characters.get(character_id)

query_collector property

query_collector: QueryCollector | None

Get the query collector for profiling (if enabled).

close abstractmethod async

close() -> None

Close the document store.

Closes database connections and cleans up resources.

Source code in packages/maid-engine/src/maid_engine/storage/document_store.py
@abstractmethod
async def close(self) -> None:
    """Close the document store.

    Closes database connections and cleans up resources.
    """
    ...

collection_names abstractmethod

collection_names() -> list[str]

Get names of all registered collections.

Returns:

Type Description
list[str]

List of collection names

Source code in packages/maid-engine/src/maid_engine/storage/document_store.py
@abstractmethod
def collection_names(self) -> list[str]:
    """Get names of all registered collections.

    Returns:
        List of collection names
    """
    ...

get_collection abstractmethod

get_collection(name: str) -> DocumentCollection[Any]

Get a document collection.

Parameters:

Name Type Description Default
name str

Collection name

required

Returns:

Type Description
DocumentCollection[Any]

DocumentCollection for the given name

Raises:

Type Description
KeyError

If collection is not registered

Source code in packages/maid-engine/src/maid_engine/storage/document_store.py
@abstractmethod
def get_collection(self, name: str) -> DocumentCollection[Any]:
    """Get a document collection.

    Args:
        name: Collection name

    Returns:
        DocumentCollection for the given name

    Raises:
        KeyError: If collection is not registered
    """
    ...

has_collection abstractmethod

has_collection(name: str) -> bool

Check if a collection is registered.

Parameters:

Name Type Description Default
name str

Collection name

required

Returns:

Type Description
bool

True if collection exists

Source code in packages/maid-engine/src/maid_engine/storage/document_store.py
@abstractmethod
def has_collection(self, name: str) -> bool:
    """Check if a collection is registered.

    Args:
        name: Collection name

    Returns:
        True if collection exists
    """
    ...

initialize abstractmethod async

initialize() -> None

Initialize the document store.

Creates necessary database tables/structures. Should be called before using the store.

Source code in packages/maid-engine/src/maid_engine/storage/document_store.py
@abstractmethod
async def initialize(self) -> None:
    """Initialize the document store.

    Creates necessary database tables/structures.
    Should be called before using the store.
    """
    ...

register_schema abstractmethod

register_schema(name: str, schema: type[BaseModel]) -> None

Register a document schema.

Must be called before using a collection. The schema defines the structure of documents in the collection.

Parameters:

Name Type Description Default
name str

Collection name

required
schema type[BaseModel]

Pydantic model class for document validation

required
Source code in packages/maid-engine/src/maid_engine/storage/document_store.py
@abstractmethod
def register_schema(self, name: str, schema: type[BaseModel]) -> None:
    """Register a document schema.

    Must be called before using a collection. The schema defines
    the structure of documents in the collection.

    Args:
        name: Collection name
        schema: Pydantic model class for document validation
    """
    ...

set_query_collector

set_query_collector(
    collector: QueryCollector | None,
) -> None

Set the query collector for profiling.

The query collector must be started externally (e.g., via ProfileManager). When set, the store will record query profiling data during operations.

Parameters:

Name Type Description Default
collector QueryCollector | None

The QueryCollector instance, or None to disable profiling.

required
Source code in packages/maid-engine/src/maid_engine/storage/document_store.py
def set_query_collector(self, collector: QueryCollector | None) -> None:
    """Set the query collector for profiling.

    The query collector must be started externally (e.g., via ProfileManager).
    When set, the store will record query profiling data during operations.

    Args:
        collector: The QueryCollector instance, or None to disable profiling.
    """
    self._query_collector = collector

EngineState

Bases: Enum

Game engine states.

Entity

Entity(
    entity_id: UUID | None = None,
    manager: EntityManager | None = None,
)

Represents a game entity as a collection of components.

Entities are essentially just IDs that group components together. The EntityManager handles the actual component storage and queries.

Example

entity = Entity() entity.add(PositionComponent(room_id=room.id)) entity.add(HealthComponent(current=100, maximum=100))

if entity.has(HealthComponent): health = entity.get(HealthComponent) health.damage(10)

Source code in packages/maid-engine/src/maid_engine/core/ecs/entity.py
def __init__(
    self,
    entity_id: UUID | None = None,
    manager: EntityManager | None = None,
) -> None:
    self._id = entity_id or uuid4()
    self._components: dict[type[Component], Component] = {}
    self._tags: set[str] = set()
    self._manager = manager
    self._created_at: datetime = datetime.now(UTC)
    self._updated_at: datetime = self._created_at
    self._protocol_cache: dict[type, object | None] = {}

components property

components: Iterator[Component]

Iterate over all components.

created_at property

created_at: datetime

Get entity creation timestamp.

id property

id: UUID

Get entity ID.

tags property

tags: frozenset[str]

Get entity tags.

updated_at property

updated_at: datetime

Get entity last updated timestamp.

add

add(component: Component) -> Entity

Add a component to this entity.

Source code in packages/maid-engine/src/maid_engine/core/ecs/entity.py
def add(self, component: Component) -> Entity:
    """Add a component to this entity."""
    component_type = type(component)
    component._owner_id = self._id
    if component._dirty_callback is None:
        for existing_component in self._components.values():
            if existing_component._dirty_callback is not None:
                component._dirty_callback = existing_component._dirty_callback
                break
    self._components[component_type] = component
    # Selectively invalidate protocol cache: only clear entries where the
    # new component could satisfy the protocol (positive or previously-negative).
    self._invalidate_protocol_cache_for(component)
    self._mark_updated()
    if self._manager:
        self._manager._on_component_added(self, component_type, component.get_type())
    return self

add_tag

add_tag(tag: str) -> Entity

Add a tag to this entity.

Source code in packages/maid-engine/src/maid_engine/core/ecs/entity.py
def add_tag(self, tag: str) -> Entity:
    """Add a tag to this entity."""
    self._tags.add(tag)
    self._mark_updated()
    if self._manager:
        self._manager._on_tag_added(self, tag)
    return self

find_by_protocol

find_by_protocol(protocol: type[P]) -> P | None

Find a component that satisfies a runtime-checkable Protocol.

Uses an internal cache so that the first lookup for a given protocol iterates over all components (O(n) where n is the number of components on this entity), but subsequent lookups are O(1). The cache is selectively invalidated only for affected protocols when components are added or removed, so hot-path lookups (e.g. Positionable during movement) remain O(1) even when unrelated components change.

Parameters:

Name Type Description Default
protocol type[P]

A runtime-checkable Protocol type to match against.

required

Returns:

Type Description
P | None

The first matching component, or None if no component satisfies

P | None

the protocol.

Source code in packages/maid-engine/src/maid_engine/core/ecs/entity.py
def find_by_protocol(self, protocol: type[P]) -> P | None:
    """Find a component that satisfies a runtime-checkable Protocol.

    Uses an internal cache so that the first lookup for a given protocol
    iterates over all components (O(n) where n is the number of components
    on this entity), but subsequent lookups are O(1).  The cache is
    selectively invalidated only for affected protocols when components
    are added or removed, so hot-path lookups (e.g. Positionable during
    movement) remain O(1) even when unrelated components change.

    Args:
        protocol: A runtime-checkable Protocol type to match against.

    Returns:
        The first matching component, or None if no component satisfies
        the protocol.
    """
    if protocol in self._protocol_cache:
        return self._protocol_cache[protocol]  # type: ignore[return-value]
    for comp in self._components.values():
        if isinstance(comp, protocol):
            self._protocol_cache[protocol] = comp
            return comp
    self._protocol_cache[protocol] = None
    return None

get

get(component_type: type[C]) -> C

Get a component by type. Raises KeyError if not found.

Source code in packages/maid-engine/src/maid_engine/core/ecs/entity.py
def get(self, component_type: type[C]) -> C:
    """Get a component by type. Raises KeyError if not found."""
    component = self._components.get(component_type)
    if component is None:
        raise KeyError(f"Entity {self._id} does not have component {component_type.__name__}")
    # Safe cast: we looked up by component_type, so result is an instance of C
    return component  # type: ignore[return-value]

has

has(*component_types: type[Component]) -> bool

Check if entity has all specified component types.

Source code in packages/maid-engine/src/maid_engine/core/ecs/entity.py
def has(self, *component_types: type[Component]) -> bool:
    """Check if entity has all specified component types."""
    return all(ct in self._components for ct in component_types)

has_any

has_any(*component_types: type[Component]) -> bool

Check if entity has any of the specified component types.

Source code in packages/maid-engine/src/maid_engine/core/ecs/entity.py
def has_any(self, *component_types: type[Component]) -> bool:
    """Check if entity has any of the specified component types."""
    return any(ct in self._components for ct in component_types)

has_tag

has_tag(tag: str) -> bool

Check if entity has a specific tag.

Source code in packages/maid-engine/src/maid_engine/core/ecs/entity.py
def has_tag(self, tag: str) -> bool:
    """Check if entity has a specific tag."""
    return tag in self._tags

remove

remove(component_type: type[C]) -> C | None

Remove and return a component from this entity.

Source code in packages/maid-engine/src/maid_engine/core/ecs/entity.py
def remove(self, component_type: type[C]) -> C | None:
    """Remove and return a component from this entity."""
    component = self._components.get(component_type)
    if component:
        if self._manager:
            self._manager._on_component_removed(
                self,
                component_type,
                component.get_type(),
            )
        self._components.pop(component_type, None)
        component._owner_id = None
        # Selectively invalidate protocol cache: only clear entries where
        # the removed component was the cached match, or could have been.
        self._invalidate_protocol_cache_for(component)
        self._mark_updated()
    # Safe cast: component is None or matches component_type since we used it as dict key
    return component  # type: ignore[return-value]

remove_tag

remove_tag(tag: str) -> bool

Remove a tag from this entity. Returns True if tag was present.

Source code in packages/maid-engine/src/maid_engine/core/ecs/entity.py
def remove_tag(self, tag: str) -> bool:
    """Remove a tag from this entity. Returns True if tag was present."""
    if tag in self._tags:
        self._tags.discard(tag)
        self._mark_updated()
        if self._manager:
            self._manager._emit_tag_removed(self, tag)
            self._manager._on_tag_removed(self, tag, emit_event=False)
        return True
    return False

to_dict

to_dict() -> dict[str, Any]

Serialize entity to dictionary.

Source code in packages/maid-engine/src/maid_engine/core/ecs/entity.py
def to_dict(self) -> dict[str, Any]:
    """Serialize entity to dictionary."""
    return {
        "id": str(self._id),
        "tags": list(self._tags),
        "components": {
            comp_type.__name__: comp.model_dump()
            for comp_type, comp in self._components.items()
        },
        "created_at": self._created_at.isoformat(),
        "updated_at": self._updated_at.isoformat(),
    }

try_get

try_get(component_type: type[C]) -> C | None

Get a component by type, or None if not found.

Source code in packages/maid-engine/src/maid_engine/core/ecs/entity.py
def try_get(self, component_type: type[C]) -> C | None:
    """Get a component by type, or None if not found."""
    # Safe cast: we looked up by component_type, so result is C | None
    return self._components.get(component_type)  # type: ignore[return-value]

EntityCreatedEvent dataclass

EntityCreatedEvent(entity_id: UUID)

Bases: Event

Emitted when an entity is created.

EntityDestroyedEvent dataclass

EntityDestroyedEvent(entity_id: UUID)

Bases: Event

Emitted when an entity is destroyed.

EntityManager

EntityManager()

Manages all entities and provides efficient component queries.

The EntityManager maintains indexes for fast component-based queries and handles entity lifecycle events.

Example

manager = EntityManager()

Create entities

player = manager.create() player.add(PositionComponent(room_id=room_id)) player.add(HealthComponent(current=100, maximum=100))

Query entities with specific components

for entity in manager.with_components(PositionComponent, HealthComponent): pos = entity.get(PositionComponent) health = entity.get(HealthComponent)

Source code in packages/maid-engine/src/maid_engine/core/ecs/entity.py
def __init__(self) -> None:
    self._entities: dict[UUID, Entity] = {}
    self._by_component: dict[type[Component], set[UUID]] = defaultdict(set)
    self._by_tag: dict[str, set[UUID]] = defaultdict(set)
    self._event_bus: EventBus | None = None

entity_count property

entity_count: int

Get total number of entities.

add_entity

add_entity(entity: Entity) -> None

Add an existing entity to the manager.

Source code in packages/maid-engine/src/maid_engine/core/ecs/entity.py
def add_entity(self, entity: Entity) -> None:
    """Add an existing entity to the manager."""
    if entity.id in self._entities:
        self.remove_entity(entity.id)

    entity._manager = self
    self._entities[entity.id] = entity

    for component_type in entity._components:
        entity._components[component_type]._owner_id = entity.id
        self._by_component[component_type].add(entity.id)

    for tag in entity._tags:
        self._by_tag[tag].add(entity.id)

adopt

adopt(entity: Entity) -> None

Adopt an externally created entity into this manager.

Source code in packages/maid-engine/src/maid_engine/core/ecs/entity.py
def adopt(self, entity: Entity) -> None:
    """Adopt an externally created entity into this manager."""
    if entity.id in self._entities:
        self.destroy(entity.id)

    entity._manager = self
    self._entities[entity.id] = entity

    for component_type in entity._components:
        entity._components[component_type]._owner_id = entity.id
        self._by_component[component_type].add(entity.id)
    for tag in entity._tags:
        self._by_tag[tag].add(entity.id)

all

all() -> Iterator[Entity]

Iterate over all entities.

Source code in packages/maid-engine/src/maid_engine/core/ecs/entity.py
def all(self) -> Iterator[Entity]:
    """Iterate over all entities."""
    return iter(self._entities.values())

clear

clear() -> None

Remove all entities.

Source code in packages/maid-engine/src/maid_engine/core/ecs/entity.py
def clear(self) -> None:
    """Remove all entities."""
    for entity in list(self._entities.values()):
        entity._manager = None
    self._entities.clear()
    self._by_component.clear()
    self._by_tag.clear()

count

count() -> int

Get total number of entities.

Source code in packages/maid-engine/src/maid_engine/core/ecs/entity.py
def count(self) -> int:
    """Get total number of entities."""
    return len(self._entities)

count_with_tag

count_with_tag(tag: str) -> int

Get the number of entities with a specific tag in O(1) time.

Source code in packages/maid-engine/src/maid_engine/core/ecs/entity.py
def count_with_tag(self, tag: str) -> int:
    """Get the number of entities with a specific tag in O(1) time."""
    return len(self._by_tag.get(tag, set()))

create

create(entity_id: UUID | None = None) -> Entity

Create a new entity.

Source code in packages/maid-engine/src/maid_engine/core/ecs/entity.py
def create(self, entity_id: UUID | None = None) -> Entity:
    """Create a new entity."""
    entity = Entity(entity_id=entity_id, manager=self)
    if entity.id in self._entities:
        self.remove_entity(entity.id)
    self._entities[entity.id] = entity
    if self._event_bus is not None:
        from maid_engine.core.events import EntityCreatedEvent

        self._event_bus.emit_sync(EntityCreatedEvent(entity_id=entity.id))
    return entity

destroy

destroy(entity_id: UUID) -> bool

Destroy an entity. Returns True if entity existed.

Source code in packages/maid-engine/src/maid_engine/core/ecs/entity.py
def destroy(self, entity_id: UUID) -> bool:
    """Destroy an entity. Returns True if entity existed."""
    entity = self._entities.get(entity_id)
    if entity is None:
        return False

    if self._event_bus is not None:
        from maid_engine.core.events import EntityDestroyedEvent

        self._event_bus.emit_sync(EntityDestroyedEvent(entity_id=entity_id))

    self._entities.pop(entity_id, None)

    # Clean up indexes
    for component_type in list(entity._components.keys()):
        self._by_component[component_type].discard(entity_id)
        entity._components[component_type]._owner_id = None

    for tag in list(entity._tags):
        self._by_tag[tag].discard(entity_id)

    entity._manager = None
    return True

get

get(entity_id: UUID) -> Entity | None

Get an entity by ID.

Source code in packages/maid-engine/src/maid_engine/core/ecs/entity.py
def get(self, entity_id: UUID) -> Entity | None:
    """Get an entity by ID."""
    return self._entities.get(entity_id)

get_all_entities

get_all_entities() -> Iterator[Entity]

Iterate over all tracked entities.

Source code in packages/maid-engine/src/maid_engine/core/ecs/entity.py
def get_all_entities(self) -> Iterator[Entity]:
    """Iterate over all tracked entities."""
    return iter(self._entities.values())

get_or_raise

get_or_raise(entity_id: UUID) -> Entity

Get an entity by ID, raising KeyError if not found.

Source code in packages/maid-engine/src/maid_engine/core/ecs/entity.py
def get_or_raise(self, entity_id: UUID) -> Entity:
    """Get an entity by ID, raising KeyError if not found."""
    entity = self._entities.get(entity_id)
    if entity is None:
        raise KeyError(f"Entity {entity_id} not found")
    return entity

remove_entity

remove_entity(entity_id: UUID) -> bool

Remove an entity from tracking without emitting destroy semantics.

Source code in packages/maid-engine/src/maid_engine/core/ecs/entity.py
def remove_entity(self, entity_id: UUID) -> bool:
    """Remove an entity from tracking without emitting destroy semantics."""
    entity = self._entities.pop(entity_id, None)
    if entity is None:
        return False

    for component_type in list(entity._components.keys()):
        self._by_component[component_type].discard(entity_id)
        entity._components[component_type]._owner_id = None

    for tag in list(entity._tags):
        self._by_tag[tag].discard(entity_id)

    entity._manager = None
    return True

set_event_bus

set_event_bus(event_bus: EventBus) -> None

Set the event bus for lifecycle event emission.

Source code in packages/maid-engine/src/maid_engine/core/ecs/entity.py
def set_event_bus(self, event_bus: EventBus) -> None:
    """Set the event bus for lifecycle event emission."""
    self._event_bus = event_bus

with_components

with_components(
    *component_types: type[Component],
) -> Iterator[Entity]

Get all entities that have all specified component types.

Source code in packages/maid-engine/src/maid_engine/core/ecs/entity.py
def with_components(self, *component_types: type[Component]) -> Iterator[Entity]:
    """Get all entities that have all specified component types."""
    if not component_types:
        yield from self._entities.values()
        return

    # Start with the smallest set for efficiency
    sets = [self._by_component.get(ct, set()) for ct in component_types]
    if not all(sets):
        return

    smallest = min(sets, key=len)
    candidates = smallest.intersection(*sets)

    for entity_id in list(candidates):
        entity = self._entities.get(entity_id)
        if entity:
            yield entity

with_tag

with_tag(tag: str) -> Iterator[Entity]

Get all entities with a specific tag.

Source code in packages/maid-engine/src/maid_engine/core/ecs/entity.py
def with_tag(self, tag: str) -> Iterator[Entity]:
    """Get all entities with a specific tag."""
    for entity_id in list(self._by_tag.get(tag, set())):
        entity = self._entities.get(entity_id)
        if entity:
            yield entity

with_tags

with_tags(*tags: str) -> Iterator[Entity]

Get all entities that have all specified tags.

Source code in packages/maid-engine/src/maid_engine/core/ecs/entity.py
def with_tags(self, *tags: str) -> Iterator[Entity]:
    """Get all entities that have all specified tags."""
    if not tags:
        yield from self._entities.values()
        return

    sets = [self._by_tag.get(tag, set()) for tag in tags]
    if not all(sets):
        return

    smallest = min(sets, key=len)
    candidates = smallest.intersection(*sets)

    for entity_id in list(candidates):
        entity = self._entities.get(entity_id)
        if entity:
            yield entity

Event dataclass

Event()

Base class for all events.

Events are messages that can be emitted and handled by subscribers. They carry data about something that happened in the game.

Example

@dataclass class PlayerDamagedEvent(Event): player_id: UUID damage: int source_id: UUID | None = None

cancel

cancel() -> None

Cancel this event to prevent further processing.

Source code in packages/maid-engine/src/maid_engine/core/events.py
def cancel(self) -> None:
    """Cancel this event to prevent further processing."""
    self.cancelled = True

EventBus

EventBus(obs_registry: ObservabilityRegistry | None = None)

Publish/subscribe event system.

The EventBus allows systems to communicate through events without direct coupling. Handlers can be async or sync.

Supports wildcard subscriptions via subscribe_all() to receive all events.

Example

bus = EventBus()

async def on_damage(event: DamageDealtEvent): print(f"Damage dealt: {event.damage}")

bus.subscribe(DamageDealtEvent, on_damage)

Subscribe to ALL events (wildcard)

def log_event(event: Event): print(f"Event: {event.event_type}") bus.subscribe_all(log_event)

await bus.emit(DamageDealtEvent( source_id=attacker.id, target_id=defender.id, damage=25, damage_type="physical" ))

Source code in packages/maid-engine/src/maid_engine/core/events.py
def __init__(self, obs_registry: ObservabilityRegistry | None = None) -> None:
    self._handlers: dict[str, list[HandlerRegistration]] = defaultdict(list)
    self._pending_events: asyncio.Queue[tuple[Event, MaidContext] | Event] = asyncio.Queue(maxsize=10000)
    # Track handler->pack mapping for hot reload support
    self._handler_sources: dict[UUID, str] = {}  # handler_id -> pack_name
    # Track handlers by pack for efficient bulk unsubscription
    self._handlers_by_pack: dict[str, set[UUID]] = defaultdict(set)
    # Lock for safe handler removal during dispatch
    self._dispatch_lock = RLock()
    self._dispatch_depth = 0  # Track nested dispatches
    # Queue for deferred removals during dispatch
    self._pending_removals: set[UUID] = set()
    self._pending_pack_removals: set[str] = set()
    # Track total number of events processed for metrics
    self._processed_count: int = 0
    # Track total number of handler errors for metrics
    self._error_count: int = 0
    # Track error counts by exception type for diagnostics
    self._error_counts_by_type: dict[str, int] = {}
    # Optional error callback invoked when a handler fails
    self._on_error: Callable[[Event, Exception, Any], None] | None = None
    self.obs_registry = obs_registry

error_count property

error_count: int

Get the total number of handler errors that have occurred.

This count includes all handler exceptions caught during event dispatch since the EventBus was created or last cleared.

Returns:

Type Description
int

Total number of handler errors

error_counts_by_type property

error_counts_by_type: dict[str, int]

Get handler error counts keyed by exception class name.

Returns a copy of the internal error count dictionary. Each key is the class name of an exception that occurred during event handler execution, and each value is the number of times that exception type occurred.

Returns:

Type Description
dict[str, int]

Dictionary mapping exception type names to occurrence counts.

on_error property writable

on_error: Callable[[Event, Exception, Any], None] | None

Get the current error callback.

Returns:

Type Description
Callable[[Event, Exception, Any], None] | None

The error callback, or None if not set.

processed_count property

processed_count: int

Get the total number of events that have been processed.

This count includes all events emitted via emit() since the EventBus was created or last cleared. It does not include events queued via emit_sync() that haven't been processed yet.

Returns:

Type Description
int

Total number of events processed

clear

clear() -> None

Remove all handlers, pending events, and reset counters.

Source code in packages/maid-engine/src/maid_engine/core/events.py
def clear(self) -> None:
    """Remove all handlers, pending events, and reset counters."""
    self._handlers.clear()
    self._handler_sources.clear()
    self._handlers_by_pack.clear()
    self._pending_removals.clear()
    self._pending_pack_removals.clear()
    self._processed_count = 0
    self._error_count = 0
    self._error_counts_by_type.clear()
    # Clear pending events queue
    while not self._pending_events.empty():
        try:
            self._pending_events.get_nowait()
        except asyncio.QueueEmpty:
            break

emit async

emit(event: Event) -> None

Emit an event immediately, invoking all handlers.

Handlers are invoked in priority order. If an event is cancelled, remaining handlers are not called.

Both type-specific handlers and wildcard handlers (registered via subscribe_all) are invoked.

Handler removals during dispatch are deferred until dispatch completes.

Source code in packages/maid-engine/src/maid_engine/core/events.py
async def emit(self, event: Event) -> None:
    """Emit an event immediately, invoking all handlers.

    Handlers are invoked in priority order. If an event is
    cancelled, remaining handlers are not called.

    Both type-specific handlers and wildcard handlers (registered via
    subscribe_all) are invoked.

    Handler removals during dispatch are deferred until dispatch completes.
    """
    type_name = event.event_type
    type_handlers = list(self._handlers.get(type_name, []))
    wildcard_handlers = list(self._handlers.get(self.WILDCARD_KEY, []))
    all_handlers = type_handlers + wildcard_handlers

    span_context: Any = NoOpSpan()
    tracing_settings = (
        getattr(self.obs_registry, "settings", None) if self.obs_registry is not None else None
    )
    if (
        self.obs_registry is not None
        and tracing_settings is not None
        and tracing_mode_enabled(tracing_settings, TracingMode.VERBOSE)
    ):
        get_tracer = getattr(self.obs_registry, "get_tracer", None)
        if callable(get_tracer):
            span_context = start_span_context(
                get_tracer(__name__),
                f"event:{type_name}",
            )

    if self.obs_registry is not None:
        with safe_observe("event_emit", registry=self.obs_registry):
            counter = self.obs_registry.get_meter().create_counter(
                "events_total",
                "Total events emitted",
                labels=("event_domain",),
            )
            counter.labels(event_domain=get_event_domain(type_name)).inc()

    # Enter dispatch mode
    with self._dispatch_lock:
        self._dispatch_depth += 1

    with span_context as span:
        span.set_attribute("event_type", type_name)
        span.set_attribute("handler_count", len(all_handlers))
        span.set_attribute("correlation_id", get_context().correlation_id)

        try:
            # Combine and sort all handlers by priority
            all_handlers.sort(key=lambda h: h.priority.value)

            to_remove: list[UUID] = []

            for registration in all_handlers:
                # Skip handlers that are pending removal
                if registration.handler_id in self._pending_removals:
                    continue

                # Skip handlers from packs pending removal
                if (
                    registration.source_pack
                    and registration.source_pack in self._pending_pack_removals
                ):
                    continue

                if event.cancelled:
                    break

                try:
                    await registration.handler(event)
                except Exception as exc:
                    # Log but don't stop other handlers
                    handler_name = getattr(
                        registration.handler, "__qualname__",
                        getattr(registration.handler, "__name__", repr(registration.handler)),
                    )
                    _logger.error(
                        "Error in event handler '%s' for event '%s': %s",
                        handler_name,
                        event.event_type,
                        exc,
                        exc_info=True,
                    )
                    self._error_count += 1
                    exc_type_name = type(exc).__name__
                    self._error_counts_by_type[exc_type_name] = (
                        self._error_counts_by_type.get(exc_type_name, 0) + 1
                    )
                    if self._on_error is not None:
                        try:
                            self._on_error(event, exc, registration.handler)
                        except Exception:
                            _logger.error(
                                "Error in on_error callback while handling '%s'",
                                event.event_type,
                                exc_info=True,
                            )
                    # Emit SystemErrorEvent for monitoring (avoid recursion
                    # by not emitting if we are already handling a SystemErrorEvent)
                    if event.event_type != "SystemErrorEvent":
                        try:
                            error_event = SystemErrorEvent(
                                error_type=exc_type_name,
                                error_message=str(exc),
                                error_source="event_handler",
                                handler_name=handler_name,
                                event_type_name=event.event_type,
                            )
                            # Queue for later processing to avoid dispatch-in-dispatch issues
                            self._pending_events.put_nowait(
                                (error_event, get_context())
                            )
                        except asyncio.QueueFull:
                            _logger.warning(
                                "Event queue full, dropping SystemErrorEvent for %s",
                                handler_name,
                            )
                        except Exception:
                            # Best-effort: don't let error event emission break dispatch
                            _logger.debug(
                                "Failed to queue SystemErrorEvent for handler error",
                                exc_info=True,
                            )

                if registration.once:
                    to_remove.append(registration.handler_id)

            # Queue one-time handlers for removal (will be processed after dispatch)
            for handler_id in to_remove:
                self._pending_removals.add(handler_id)

            # Increment processed count for successfully emitted events
            self._processed_count += 1
        finally:
            # Exit dispatch mode
            with self._dispatch_lock:
                self._dispatch_depth -= 1

                # Process deferred removals if we're back to top level
                if self._dispatch_depth == 0:
                    self._process_pending_removals()

emit_sync

emit_sync(event: Event) -> None

Queue an event for later processing.

Use this when you need to emit from a sync context. Call process_pending() to handle queued events.

Source code in packages/maid-engine/src/maid_engine/core/events.py
def emit_sync(self, event: Event) -> None:
    """Queue an event for later processing.

    Use this when you need to emit from a sync context.
    Call process_pending() to handle queued events.
    """
    try:
        self._pending_events.put_nowait((event, get_context()))
    except asyncio.QueueFull:
        _logger.warning(
            "Event queue full, dropping event: %s", event.event_type
        )

get_handler_ids_for_pack

get_handler_ids_for_pack(pack_name: str) -> set[UUID]

Get all handler IDs registered by a content pack.

Parameters:

Name Type Description Default
pack_name str

Name of the content pack

required

Returns:

Type Description
set[UUID]

Set of handler UUIDs registered by the pack

Source code in packages/maid-engine/src/maid_engine/core/events.py
def get_handler_ids_for_pack(self, pack_name: str) -> set[UUID]:
    """Get all handler IDs registered by a content pack.

    Args:
        pack_name: Name of the content pack

    Returns:
        Set of handler UUIDs registered by the pack
    """
    return self._handlers_by_pack.get(pack_name, set()).copy()

get_handler_source

get_handler_source(handler_id: UUID) -> str | None

Get the source pack for a handler.

Parameters:

Name Type Description Default
handler_id UUID

The handler's UUID

required

Returns:

Type Description
str | None

The pack name that registered this handler, or None if unknown

Source code in packages/maid-engine/src/maid_engine/core/events.py
def get_handler_source(self, handler_id: UUID) -> str | None:
    """Get the source pack for a handler.

    Args:
        handler_id: The handler's UUID

    Returns:
        The pack name that registered this handler, or None if unknown
    """
    return self._handler_sources.get(handler_id)

get_handlers_for_pack

get_handlers_for_pack(
    pack_name: str,
) -> dict[str, list[EventHandler]]

Get all handlers registered by a content pack.

Parameters:

Name Type Description Default
pack_name str

Name of the content pack

required

Returns:

Type Description
dict[str, list[EventHandler]]

Dictionary mapping event type names (strings) to lists of handlers

Source code in packages/maid-engine/src/maid_engine/core/events.py
def get_handlers_for_pack(
    self, pack_name: str
) -> dict[str, list[EventHandler]]:
    """Get all handlers registered by a content pack.

    Args:
        pack_name: Name of the content pack

    Returns:
        Dictionary mapping event type names (strings) to lists of handlers
    """
    handler_ids = self._handlers_by_pack.get(pack_name, set())
    if not handler_ids:
        return {}

    result: dict[str, list[EventHandler]] = {}

    for type_name, handlers in self._handlers.items():
        pack_handlers = [
            reg.handler for reg in handlers if reg.handler_id in handler_ids
        ]
        if pack_handlers:
            result[type_name] = pack_handlers

    return result

handler_count

handler_count(event_type: type[Event] | None = None) -> int

Get the number of handlers, optionally filtered by event type.

Source code in packages/maid-engine/src/maid_engine/core/events.py
def handler_count(self, event_type: type[Event] | None = None) -> int:
    """Get the number of handlers, optionally filtered by event type."""
    if event_type:
        type_name = event_type.__name__
        return len(self._handlers.get(type_name, []))
    return sum(len(handlers) for handlers in self._handlers.values())

has_handlers

has_handlers(event_type: type[Event]) -> bool

Check if any handlers are registered for an event type.

Source code in packages/maid-engine/src/maid_engine/core/events.py
def has_handlers(self, event_type: type[Event]) -> bool:
    """Check if any handlers are registered for an event type."""
    type_name = event_type.__name__
    return bool(self._handlers.get(type_name))

is_dispatching

is_dispatching() -> bool

Check if currently dispatching events.

Returns:

Type Description
bool

True if in the middle of an event dispatch

Source code in packages/maid-engine/src/maid_engine/core/events.py
def is_dispatching(self) -> bool:
    """Check if currently dispatching events.

    Returns:
        True if in the middle of an event dispatch
    """
    with self._dispatch_lock:
        return self._dispatch_depth > 0

process_pending async

process_pending(max_iterations: int = 1000) -> int

Process all pending events.

Parameters:

Name Type Description Default
max_iterations int

Maximum number of events to process per call to prevent infinite loops from handlers re-emitting events.

1000

Returns:

Type Description
int

Number of events processed

Source code in packages/maid-engine/src/maid_engine/core/events.py
async def process_pending(self, max_iterations: int = 1000) -> int:
    """Process all pending events.

    Args:
        max_iterations: Maximum number of events to process per call
            to prevent infinite loops from handlers re-emitting events.

    Returns:
        Number of events processed
    """
    count = 0
    while not self._pending_events.empty():
        if count >= max_iterations:
            _logger.warning(
                "process_pending hit max iterations (%d), "
                "breaking to prevent infinite loop",
                max_iterations,
            )
            break
        queued = await self._pending_events.get()
        if isinstance(queued, tuple):
            event, context = queued
        else:
            event, context = queued, get_context()
        token = set_context(context)
        try:
            await self.emit(event)
        finally:
            restore_context(token)
        count += 1
    return count

subscribe

subscribe(
    event_type: type[E],
    handler: EventHandler | SyncEventHandler,
    priority: EventPriority = NORMAL,
    once: bool = False,
    source_pack: str | None = None,
) -> UUID

Subscribe to an event type.

Parameters:

Name Type Description Default
event_type type[E]

The event class to subscribe to

required
handler EventHandler | SyncEventHandler

Async or sync function to handle events

required
priority EventPriority

Handler priority (higher priority runs first)

NORMAL
once bool

If True, handler is removed after first invocation

False
source_pack str | None

Name of the content pack registering this handler (for hot reload)

None

Returns:

Type Description
UUID

Handler ID for unsubscribing

Source code in packages/maid-engine/src/maid_engine/core/events.py
def subscribe(
    self,
    event_type: type[E],
    handler: EventHandler | SyncEventHandler,
    priority: EventPriority = EventPriority.NORMAL,
    once: bool = False,
    source_pack: str | None = None,
) -> UUID:
    """Subscribe to an event type.

    Args:
        event_type: The event class to subscribe to
        handler: Async or sync function to handle events
        priority: Handler priority (higher priority runs first)
        once: If True, handler is removed after first invocation
        source_pack: Name of the content pack registering this handler (for hot reload)

    Returns:
        Handler ID for unsubscribing
    """
    type_name = event_type.__name__

    # Wrap sync handlers
    async_handler: EventHandler
    if asyncio.iscoroutinefunction(handler):
        async_handler = handler
    else:
        # Use default argument to capture handler by value, not by reference
        # This prevents closure bugs when registering handlers in loops
        # We've verified it's not a coroutine, so it must be a sync handler
        # Store in a typed variable for the closure
        captured_handler = handler

        async def _wrapper(
            event: Event,
            _handler: EventHandler | SyncEventHandler = captured_handler,
        ) -> None:
            # At runtime, we know this is a SyncEventHandler
            result = _handler(event)
            if asyncio.iscoroutine(result):
                await result

        async_handler = _wrapper

    registration = HandlerRegistration(
        handler_id=uuid4(),
        event_type=type_name,
        handler=async_handler,
        priority=priority,
        once=once,
        source_pack=source_pack,
    )

    handlers = self._handlers[type_name]
    handlers.append(registration)

    # Sort by priority (HIGHEST first)
    handlers.sort(key=lambda h: h.priority.value)

    # Track handler source for hot reload support
    if source_pack is not None:
        self._handler_sources[registration.handler_id] = source_pack
        self._handlers_by_pack[source_pack].add(registration.handler_id)

    return registration.handler_id

subscribe_all

subscribe_all(
    handler: EventHandler | SyncEventHandler,
    priority: EventPriority = NORMAL,
    once: bool = False,
    source_pack: str | None = None,
) -> UUID

Subscribe to ALL events (wildcard subscription).

This handler will be invoked for every event emitted, regardless of type. Useful for logging, metrics collection, or debugging.

Parameters:

Name Type Description Default
handler EventHandler | SyncEventHandler

Async or sync function to handle events

required
priority EventPriority

Handler priority (higher priority runs first)

NORMAL
once bool

If True, handler is removed after first invocation

False
source_pack str | None

Name of the content pack registering this handler (for hot reload)

None

Returns:

Type Description
UUID

Handler ID for unsubscribing

Source code in packages/maid-engine/src/maid_engine/core/events.py
def subscribe_all(
    self,
    handler: EventHandler | SyncEventHandler,
    priority: EventPriority = EventPriority.NORMAL,
    once: bool = False,
    source_pack: str | None = None,
) -> UUID:
    """Subscribe to ALL events (wildcard subscription).

    This handler will be invoked for every event emitted, regardless of type.
    Useful for logging, metrics collection, or debugging.

    Args:
        handler: Async or sync function to handle events
        priority: Handler priority (higher priority runs first)
        once: If True, handler is removed after first invocation
        source_pack: Name of the content pack registering this handler (for hot reload)

    Returns:
        Handler ID for unsubscribing
    """
    # Wrap sync handlers
    async_handler: EventHandler
    if asyncio.iscoroutinefunction(handler):
        async_handler = handler
    else:
        # Use default argument to capture handler by value, not by reference
        # This prevents closure bugs when registering handlers in loops
        # Store in a typed variable for the closure
        captured_handler = handler

        async def _wrapper(
            event: Event,
            _handler: EventHandler | SyncEventHandler = captured_handler,
        ) -> None:
            # At runtime, we know this is a SyncEventHandler
            result = _handler(event)
            if asyncio.iscoroutine(result):
                await result

        async_handler = _wrapper

    registration = HandlerRegistration(
        handler_id=uuid4(),
        event_type=self.WILDCARD_KEY,
        handler=async_handler,
        priority=priority,
        once=once,
        source_pack=source_pack,
    )

    handlers = self._handlers[self.WILDCARD_KEY]
    handlers.append(registration)

    # Sort by priority (HIGHEST first)
    handlers.sort(key=lambda h: h.priority.value)

    # Track handler source for hot reload support
    if source_pack is not None:
        self._handler_sources[registration.handler_id] = source_pack
        self._handlers_by_pack[source_pack].add(registration.handler_id)

    return registration.handler_id

unsubscribe

unsubscribe(handler_id: UUID) -> bool

Unsubscribe a handler by ID.

If called during event dispatch, the removal is deferred until the dispatch completes to ensure safe iteration.

Returns:

Type Description
bool

True if handler was found and removed (or queued for removal)

Source code in packages/maid-engine/src/maid_engine/core/events.py
def unsubscribe(self, handler_id: UUID) -> bool:
    """Unsubscribe a handler by ID.

    If called during event dispatch, the removal is deferred until
    the dispatch completes to ensure safe iteration.

    Returns:
        True if handler was found and removed (or queued for removal)
    """
    with self._dispatch_lock:
        # If we're in a dispatch, defer the removal
        if self._dispatch_depth > 0:
            # Check if handler exists first
            for handlers in self._handlers.values():
                for reg in handlers:
                    if reg.handler_id == handler_id:
                        self._pending_removals.add(handler_id)
                        return True
            return False

        # Safe to remove immediately
        return self._do_unsubscribe(handler_id)

unsubscribe_all

unsubscribe_all(event_type: type[Event]) -> int

Unsubscribe all handlers for an event type.

Returns:

Type Description
int

Number of handlers removed

Source code in packages/maid-engine/src/maid_engine/core/events.py
def unsubscribe_all(self, event_type: type[Event]) -> int:
    """Unsubscribe all handlers for an event type.

    Returns:
        Number of handlers removed
    """
    type_name = event_type.__name__
    handlers = self._handlers.get(type_name, [])
    count = len(handlers)

    # Clean up source tracking for all handlers being removed
    for reg in handlers:
        pack_name = self._handler_sources.pop(reg.handler_id, None)
        if pack_name and reg.handler_id in self._handlers_by_pack.get(pack_name, set()):
            self._handlers_by_pack[pack_name].discard(reg.handler_id)

    self._handlers.pop(type_name, None)
    return count

unsubscribe_pack

unsubscribe_pack(pack_name: str) -> int

Unsubscribe all handlers registered by a content pack.

If called during event dispatch, the removal is deferred until the dispatch completes to ensure safe iteration.

Parameters:

Name Type Description Default
pack_name str

Name of the content pack whose handlers should be removed

required

Returns:

Type Description
int

Number of handlers removed (or queued for removal)

Source code in packages/maid-engine/src/maid_engine/core/events.py
def unsubscribe_pack(self, pack_name: str) -> int:
    """Unsubscribe all handlers registered by a content pack.

    If called during event dispatch, the removal is deferred until
    the dispatch completes to ensure safe iteration.

    Args:
        pack_name: Name of the content pack whose handlers should be removed

    Returns:
        Number of handlers removed (or queued for removal)
    """
    with self._dispatch_lock:
        # If we're in a dispatch, defer the removal
        if self._dispatch_depth > 0:
            handler_ids = self._handlers_by_pack.get(pack_name, set())
            if handler_ids:
                self._pending_pack_removals.add(pack_name)
                return len(handler_ids)
            return 0

        # Safe to remove immediately
        return self._do_unsubscribe_pack(pack_name)

GameEngine

GameEngine(
    settings: Settings | SettingsProxy | None = None,
    document_store: DocumentStore | None = None,
    template_registry: Any | None = None,
)

Main game engine that runs the tick loop.

The GameEngine manages the game world, networking, and the main tick loop that drives all game systems. It is content-agnostic - all game-specific functionality comes from content packs.

Protocol conformance: GameEngine structurally satisfies the :class:~maid_engine.core.protocols.EngineServices protocol. The World stores a reference to this engine typed as EngineServices (not GameEngine) so that systems only depend on the narrow protocol interface. See :attr:World.engine for details.

Example

engine = GameEngine()

Load content packs

engine.load_content_pack(my_content_pack)

await engine.start()

Engine runs until stopped

In another context:

await engine.stop()

Source code in packages/maid-engine/src/maid_engine/core/engine.py
def __init__(
    self,
    settings: Settings | SettingsProxy | None = None,
    document_store: DocumentStore | None = None,
    template_registry: Any | None = None,
) -> None:
    self._settings: Settings | SettingsProxy = settings or get_settings()

    obs_settings = self._settings.observability
    if obs_settings.enabled:
        self._obs_registry: ObservabilityRegistry = DefaultObservabilityRegistry(
            obs_settings
        )
    else:
        self._obs_registry = NullObservabilityRegistry(obs_settings)
    configure_sentry(obs_settings)

    self._world = World(self._settings)
    self._world.set_engine(self)  # EngineServices back-reference for systems (see World.engine docs)
    self._world.events.obs_registry = self._obs_registry
    self._server: MAIDServer | None = None
    self._state = EngineState.STOPPED
    self._tick_task: asyncio.Task[None] | None = None
    self._gauge_export_task: asyncio.Task[None] | None = None
    self._stop_event = asyncio.Event()
    self._shutdown_callbacks: list[Callable[[], None]] = []

    # Command registry for layered command resolution
    # Pass the world's event bus so PlayerCommandEvent can be emitted
    self._command_registry = LayeredCommandRegistry(event_bus=self._world._events)

    # Document store for persistence (default to in-memory for testing)
    self._document_store = document_store or InMemoryDocumentStore(
        obs_registry=self._obs_registry
    )
    from maid_engine.loader.resolver import ReferenceRegistry

    self.reference_registry = ReferenceRegistry()
    self.template_registry = template_registry

    # Content packs
    self._content_packs: dict[str, ContentPack] = {}
    self._content_pack_order: list[str] = []
    self._component_registry = ComponentRegistry()
    self._dirty_tracker: DirtyTracker | None = None
    self._persistence_manager: EntityPersistenceManager | None = None
    self._save_scheduler: SaveScheduler | None = None
    self._player_disconnect_handler_id: UUID | None = None

    # Timing
    self._tick_rate = self._settings.game.tick_rate
    self._tick_interval = 1.0 / self._tick_rate
    self._last_tick_time = 0.0
    self._last_tick_monotonic = 0.0
    self._tick_count = 0

    # Statistics
    self._start_time = 0.0
    self._total_tick_time = 0.0
    self._max_tick_time = 0.0
    self._tick_overruns = 0
    self._tick_error_counts: dict[str, int] = {}

    # Profiling (opt-in)
    self._tick_collector: TickCollector | None = None

    # Bridge manager for external service integration
    self._bridge_manager = BridgeManager()
    self._rss_feed_manager: RSSFeedManager | None = None

    # Hot reload support
    self._hot_reload_manager: HotReloadManager | None = None
    self._hot_reload_pause = asyncio.Event()
    # Acknowledgment event: set by tick loop when it has actually paused
    self._hot_reload_paused_ack = asyncio.Event()

    histogram_profile = (
        HistogramProfile.DETAILED
        if self._settings.observability.histogram_profile.lower() == "detailed"
        else HistogramProfile.COMPACT
    )
    meter = self._obs_registry.get_meter()
    self._tick_duration_histogram = meter.create_histogram(
        "tick_duration_seconds",
        "Duration of a single game tick in seconds",
        buckets=TICK_BUCKETS[histogram_profile],
    )
    self._tick_counter_metric = meter.create_counter(
        "ticks_total",
        "Total number of ticks processed",
    )
    self._tick_loop_healthy_gauge = meter.create_gauge(
        "tick_loop_healthy",
        "Tick loop health state",
    )
    self._system_tick_aggregator = SystemTickAggregator(
        meter=meter,
        mode=self._settings.observability.system_tick_detail,
        profile=histogram_profile,
    )
    self._internal_server: InternalServer | None = None

    # Initialize LLM provider registry from settings
    # The registry is owned by the engine and accessible via engine.provider_registry.
    # Consumers should obtain it via dependency injection rather than the deprecated
    # module-level get_registry() / set_registry() functions.
    self._provider_registry = create_registry_from_settings(
        self._settings,
        obs_registry=self._obs_registry,
        event_bus=self._world.events,
    )
    self._ai_cost_tracker = AICostTracker(
        event_bus=self._world.events,
        obs_registry=self._obs_registry,
    )

    # Initialize bridges from settings
    self._setup_bridges()

ai_cost_tracker property

ai_cost_tracker: AICostTracker

Get the AI cost tracker.

average_tick_time property

average_tick_time: float

Get average tick processing time.

bridge_manager property

bridge_manager: BridgeManager

Get the bridge manager for external service integration.

characters property

characters: Any

Get character manager.

Note: This is a placeholder. Content packs should attach their managers to the engine or world.

command_registry property

command_registry: LayeredCommandRegistry

Get the command registry.

component_registry property

component_registry: ComponentRegistry

Get the persistence component registry.

content_pack_order property

content_pack_order: list[str]

Get content pack load order.

content_packs property

content_packs: dict[str, ContentPack]

Get loaded content packs.

document_store property

document_store: DocumentStore

Get the document store.

hot_reload property

hot_reload: HotReloadManager

Get the hot reload manager (lazy-initialized).

The hot reload manager is created on first access. This allows hot reload operations to be performed on the engine at runtime, including loading, unloading, and reloading content packs while the engine is running.

Returns:

Type Description
HotReloadManager

The HotReloadManager instance for this engine.

is_running property

is_running: bool

Check if engine is running.

items property

items: Any

Get item manager.

last_tick_monotonic property

last_tick_monotonic: float

Get monotonic timestamp of the last tick start.

obs_registry property

obs_registry: ObservabilityRegistry

Get the observability registry.

persistence_manager property

persistence_manager: EntityPersistenceManager | None

Get the entity persistence manager.

provider_registry property

provider_registry: LLMProviderRegistry

Get the LLM provider registry.

The provider registry manages AI/LLM providers (Anthropic, OpenAI, Ollama, etc.) for use by AI-powered features like NPC dialogue. Providers are initialized from settings during engine construction.

Returns:

Type Description
LLMProviderRegistry

The LLMProviderRegistry instance for this engine.

quest_manager property

quest_manager: Any

Get quest manager.

Note: This is set by content packs that provide quest functionality.

reward_distributor property

reward_distributor: Any

Get reward distributor.

Note: This is set by content packs that provide quest functionality.

rooms property

rooms: Any

Get room manager.

rss_feed_manager property

rss_feed_manager: RSSFeedManager | None

Get the RSS feed manager (if enabled).

save_scheduler property

save_scheduler: SaveScheduler | None

Get the save scheduler.

server property

server: MAIDServer | None

Get the network server.

settings property

settings: Settings | SettingsProxy

Get engine settings.

state property

state: EngineState

Get current engine state.

tick_collector property

tick_collector: TickCollector | None

Get the tick collector for profiling (if enabled).

tick_count property

tick_count: int

Get total tick count.

tick_error_counts property

tick_error_counts: dict[str, int]

Get tick error counts keyed by exception class name.

Returns a copy of the internal error count dictionary. Each key is the class name of an exception that occurred during tick processing, and each value is the number of times that exception type occurred.

uptime property

uptime: float

Get engine uptime in seconds.

world property

world: World

Get the game world.

add_shutdown_callback

add_shutdown_callback(callback: Callable[[], None]) -> None

Add a callback to run on shutdown.

Source code in packages/maid-engine/src/maid_engine/core/engine.py
def add_shutdown_callback(self, callback: Callable[[], None]) -> None:
    """Add a callback to run on shutdown."""
    self._shutdown_callbacks.append(callback)

create_data_pipeline

create_data_pipeline(config: Any | None = None) -> Any

Create a configured world data pipeline.

Source code in packages/maid-engine/src/maid_engine/core/engine.py
def create_data_pipeline(self, config: Any | None = None) -> Any:
    """Create a configured world data pipeline."""
    from maid_engine.loader.entity_types import STANDARD_ENTITY_CONFIGS
    from maid_engine.loader.models import LoaderConfig, LoaderContext
    from maid_engine.loader.pipeline import DEFAULT_PHASES, Pipeline
    from maid_engine.loader.protocols import DataLoaderPack
    from maid_engine.loader.rules.builtin import BUILTIN_RULES

    loader_config = config if config is not None else LoaderConfig()
    entity_type_configs = dict(STANDARD_ENTITY_CONFIGS)
    semantic_rules = list(BUILTIN_RULES)

    for pack in self._content_packs.values():
        if isinstance(pack, DataLoaderPack):
            for entity_config in pack.get_entity_type_configs():
                entity_type_configs[entity_config.type_name] = entity_config
            semantic_rules.extend(pack.get_semantic_rules())

    context = LoaderContext(
        world=self._world,
        config=loader_config,
        reference_registry=self.reference_registry,
        template_registry=self.template_registry,
        pack_name="engine",
    )
    context.entity_type_configs.update(entity_type_configs)
    context.semantic_rules.extend(semantic_rules)

    return Pipeline(
        phases=list(DEFAULT_PHASES),
        timeout=loader_config.pipeline_timeout,
        default_context=context,
    )

get_stats

get_stats() -> dict[str, float | int]

Get engine statistics.

Source code in packages/maid-engine/src/maid_engine/core/engine.py
def get_stats(self) -> dict[str, float | int]:
    """Get engine statistics."""
    total_tick_errors = sum(self._tick_error_counts.values())
    stats: dict[str, float | int] = {
        "uptime": self.uptime,
        "tick_count": self._tick_count,
        "tick_rate": self._tick_rate,
        "average_tick_time": self.average_tick_time,
        "max_tick_time": self._max_tick_time,
        "tick_overruns": self._tick_overruns,
        "tick_errors": total_tick_errors,
        "event_handler_errors": self._world.events.error_count,
        "entity_count": len(self._world.entities),
        "room_count": self._world.room_count(),
        "content_packs": len(self._content_packs),
    }
    if self._save_scheduler is not None:
        save_stats = self._save_scheduler.get_stats()
        stats["persistence_dirty_count"] = int(save_stats["dirty_count"])
        stats["persistence_save_errors"] = int(save_stats["save_error_count"])
    return stats

has_pack_entities async

has_pack_entities(
    pack_name: str, world_id: str = "default"
) -> bool

Return True if persisted entities exist for a content pack.

Source code in packages/maid-engine/src/maid_engine/core/engine.py
async def has_pack_entities(self, pack_name: str, world_id: str = "default") -> bool:
    """Return True if persisted entities exist for a content pack."""
    if self._persistence_manager is None:
        return False
    return await self._persistence_manager.has_pack_entities(pack_name, world_id=world_id)

load_content_pack

load_content_pack(
    pack: ContentPack, use_hot_reload: bool = False
) -> None

Load a content pack.

When the engine is stopped, content packs are loaded synchronously. When the engine is running, set use_hot_reload=True to use the hot reload manager for safe runtime loading.

Parameters:

Name Type Description Default
pack ContentPack

The content pack to load.

required
use_hot_reload bool

If True and engine is running, use hot reload. If False and engine is running, raises RuntimeError.

False

Raises:

Type Description
RuntimeError

If engine is running and use_hot_reload is False.

ValueError

If pack is already loaded or dependencies are missing.

Source code in packages/maid-engine/src/maid_engine/core/engine.py
def load_content_pack(self, pack: "ContentPack", use_hot_reload: bool = False) -> None:
    """Load a content pack.

    When the engine is stopped, content packs are loaded synchronously.
    When the engine is running, set use_hot_reload=True to use the
    hot reload manager for safe runtime loading.

    Args:
        pack: The content pack to load.
        use_hot_reload: If True and engine is running, use hot reload.
                       If False and engine is running, raises RuntimeError.

    Raises:
        RuntimeError: If engine is running and use_hot_reload is False.
        ValueError: If pack is already loaded or dependencies are missing.
    """
    if self._state != EngineState.STOPPED:
        if use_hot_reload:
            # Delegate to hot reload for runtime loading
            # Note: This is a synchronous method, but hot reload is async.
            # The caller should use hot_reload.load_pack() directly for
            # async operation with full result feedback.
            raise RuntimeError(
                "Cannot use synchronous load_content_pack() with use_hot_reload=True. "
                "Use engine.hot_reload.load_pack() for runtime loading."
            )
        raise RuntimeError("Cannot load content packs while engine is running")

    pack_name = pack.manifest.name
    if pack_name in self._content_packs:
        raise ValueError(f"Content pack '{pack_name}' is already loaded")

    # Verify dependencies are loaded
    for dep in pack.get_dependencies():
        if dep not in self._content_packs:
            raise ValueError(
                f"Content pack '{pack_name}' depends on '{dep}' which is not loaded"
            )

    self._content_packs[pack_name] = pack
    self._content_pack_order.append(pack_name)

register_pack_runtime

register_pack_runtime(
    pack_name: str, pack: ContentPack
) -> None

Register a content pack at runtime (for hot reload use).

Unlike load_content_pack(), this does not check engine state or dependencies. The caller is responsible for those checks.

Source code in packages/maid-engine/src/maid_engine/core/engine.py
def register_pack_runtime(self, pack_name: str, pack: "ContentPack") -> None:
    """Register a content pack at runtime (for hot reload use).

    Unlike load_content_pack(), this does not check engine state or
    dependencies. The caller is responsible for those checks.
    """
    self._content_packs[pack_name] = pack
    if pack_name not in self._content_pack_order:
        self._content_pack_order.append(pack_name)

relay_to_external async

relay_to_external(
    game_channel: str, sender_name: str, message: str
) -> None

Relay a game message to mapped external channels.

This is a convenience method that wraps bridge_manager.relay_to_external(). Content packs can call this to send game channel messages to external services like Discord and IRC.

Parameters:

Name Type Description Default
game_channel str

The game channel name (e.g., "ooc", "chat", "newbie")

required
sender_name str

The name of the sender in the game

required
message str

The message content

required
Source code in packages/maid-engine/src/maid_engine/core/engine.py
async def relay_to_external(
    self, game_channel: str, sender_name: str, message: str
) -> None:
    """Relay a game message to mapped external channels.

    This is a convenience method that wraps bridge_manager.relay_to_external().
    Content packs can call this to send game channel messages to external
    services like Discord and IRC.

    Args:
        game_channel: The game channel name (e.g., "ooc", "chat", "newbie")
        sender_name: The name of the sender in the game
        message: The message content
    """
    await self._bridge_manager.relay_to_external(game_channel, sender_name, message)

run async

run() -> None

Start and run until stopped.

This is a convenience method that starts the engine and waits for it to be stopped.

Source code in packages/maid-engine/src/maid_engine/core/engine.py
async def run(self) -> None:
    """Start and run until stopped.

    This is a convenience method that starts the engine and
    waits for it to be stopped.
    """
    await self.start()
    await self._stop_event.wait()
    await self.stop()

set_server

set_server(server: MAIDServer) -> None

Set the network server.

Source code in packages/maid-engine/src/maid_engine/core/engine.py
def set_server(self, server: "MAIDServer") -> None:
    """Set the network server."""
    self._server = server

set_tick_collector

set_tick_collector(collector: TickCollector | None) -> None

Set the tick collector for profiling.

The tick collector must be started externally (e.g., via ProfileManager). When set, the engine will record tick profiling data during each tick.

Parameters:

Name Type Description Default
collector TickCollector | None

The TickCollector instance, or None to disable profiling.

required
Source code in packages/maid-engine/src/maid_engine/core/engine.py
def set_tick_collector(self, collector: "TickCollector | None") -> None:
    """Set the tick collector for profiling.

    The tick collector must be started externally (e.g., via ProfileManager).
    When set, the engine will record tick profiling data during each tick.

    Args:
        collector: The TickCollector instance, or None to disable profiling.
    """
    self._tick_collector = collector

start async

start() -> None

Start the game engine.

Source code in packages/maid-engine/src/maid_engine/core/engine.py
async def start(self) -> None:
    """Start the game engine."""
    if self._state != EngineState.STOPPED:
        return

    setup_logging(self._settings.observability)
    self._state = EngineState.STARTING
    self._stop_event.clear()

    # Set up signal handlers
    self._setup_signals()

    # Initialize document store
    await self._document_store.initialize()
    self._component_registry = ComponentRegistry()
    self._dirty_tracker = None
    self._persistence_manager = None
    self._save_scheduler = None
    if self._settings.persistence.enabled and not self._document_store.has_collection(
        "entities"
    ):
        self._document_store.register_schema("entities", EntityDocument)

    # Initialize content packs (in dependency order)
    # Set pack priorities based on load order (later = higher priority)
    for priority, pack_name in enumerate(self._content_pack_order):
        self._command_registry.set_pack_priority(pack_name, priority * 10)

    for pack_name in self._content_pack_order:
        pack = self._content_packs[pack_name]

        # Register document schemas from content pack
        pack.register_document_schemas(self._document_store)

        # Register events from content pack
        # (Events are used for documentation/discovery, emitting is dynamic)
        _events = pack.get_events()  # noqa: F841 - events for future use

        register_component_types = getattr(pack, "register_component_types", None)
        if callable(register_component_types):
            register_component_types(self._component_registry)

        # Register commands from content pack
        pack.register_commands(self._command_registry)

        # Register systems from content pack
        for system in pack.get_systems(self._world):
            self._world.systems.register(system)

            # Auto-inject storage into systems that implement StorageAware
            if isinstance(system, StorageAware):
                system.set_storage(self._document_store)

    for issue in self._component_registry.validate_migration_chains():
        _logger.warning("Persistence migration chain issue: %s", issue)

    # Register world persistence schemas and load saved state
    # This is done here after content packs have loaded so that
    # grid/wilderness managers are set up on the world
    register_world_persistence_schemas(self._document_store)
    loaded = await load_world_systems_state(
        self._document_store,
        self._world,
        world_id="default",
    )
    if loaded:
        _logger.info("Loaded world systems state from storage")
    else:
        _logger.debug("No saved world systems state found")

    has_registered_components = bool(self._component_registry.get_registered_types())
    if self._settings.persistence.enabled and has_registered_components:
        self._component_registry.register(
            QuarantineComponent,
            pack_name="maid-engine",
            allow_overwrite=True,
        )
        self._persistence_manager = EntityPersistenceManager(
            self._world,
            self._document_store,
            self._component_registry,
            self._settings.persistence,
        )
        load_result = await self._persistence_manager.load_all_entities(
            world_id="default"
        )
        try:
            _logger.info(
                "Loaded persisted entities",
                extra={
                    "entities_loaded": load_result.entities_loaded,
                    "entities_failed": load_result.entities_failed,
                },
            )
        except BaseException:  # noqa: BLE001
            _logger.debug(
                "Loaded persisted entities (%s loaded, %s failed)",
                load_result.entities_loaded,
                load_result.entities_failed,
            )

    for pack_name in self._content_pack_order:
        pack = self._content_packs[pack_name]
        await pack.on_load(self)

    # Start the world
    await self._world.startup()

    if self._persistence_manager is not None:
        self._dirty_tracker = DirtyTracker(self._world.events)
        for entity in self._world.get_all_entities():
            bind_entity_dirty_tracking(entity, self._dirty_tracker)
        self._dirty_tracker.start_tracking()
        self._save_scheduler = SaveScheduler(
            self._persistence_manager,
            self._dirty_tracker,
            self._world.events,
            obs_registry=self._obs_registry,
            save_interval=self._settings.persistence.save_interval,
            cycle_timeout=self._settings.persistence.cycle_timeout,
        )
        self._save_scheduler.start()
        self._player_disconnect_handler_id = self._world.events.subscribe(
            PlayerDisconnectedEvent,
            self._on_player_disconnected,
        )

    health_checker = self._obs_registry.health_checker()
    health_checker.bind_engine(self, tick_interval=self._tick_interval)
    await health_checker.start()

    if self._settings.observability.metrics_enabled and not self._settings.debug:
        self._internal_server = InternalServer(
            host=self._settings.observability.internal_host,
            port=self._settings.observability.internal_port,
            scrape_cache=ScrapeCache(
                ttl=self._settings.observability.scrape_cache_ttl
            ),
            health_checker=health_checker,
            metrics_token=self._settings.observability.metrics_token,
        )
        try:
            await self._internal_server.start()
        except BaseException as exc:  # noqa: BLE001
            _logger.warning("Failed to start internal metrics server: %s", exc)
            self._internal_server = None

    # Start the network server if configured
    if self._server:
        await self._server.start()

        # Register RSS router with web server if enabled
        if self._rss_feed_manager and self._settings.bridges.rss.enabled:
            self._register_rss_router()

        # Wire up events WebSocket to game events
        from maid_engine.api.v1.events_ws import setup_event_hooks

        setup_event_hooks(self)
        _logger.info("Events WebSocket hooks configured")

        # Wire up admin WebSocket to game events
        from maid_engine.api.admin.websocket import setup_admin_event_hooks

        setup_admin_event_hooks(self)
        _logger.info("Admin WebSocket hooks configured")

    # Start external bridges
    await self._bridge_manager.start_all()

    # Wire up bridge message handler for external -> game messages
    self._setup_bridge_message_handler()

    # Start the tick loop
    self._start_time = time.monotonic()
    self._last_tick_time = time.monotonic()
    self._state = EngineState.RUNNING
    self._tick_task = asyncio.create_task(self._tick_loop())
    if self._settings.observability.metrics_enabled:
        self._gauge_export_task = asyncio.create_task(
            gauge_export_loop(
                self,
                interval=self._settings.observability.gauge_export_interval,
            )
        )

stop async

stop() -> None

Stop the game engine.

Source code in packages/maid-engine/src/maid_engine/core/engine.py
async def stop(self) -> None:
    """Stop the game engine."""
    if self._state not in (EngineState.RUNNING, EngineState.STARTING):
        return

    self._state = EngineState.STOPPING
    self._stop_event.set()

    # Wait for tick loop to finish
    if self._tick_task:
        try:
            await asyncio.wait_for(self._tick_task, timeout=5.0)
        except TimeoutError:
            self._tick_task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await self._tick_task
    if self._gauge_export_task:
        self._gauge_export_task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._gauge_export_task
        self._gauge_export_task = None

    # Stop external bridges
    await self._bridge_manager.stop_all()

    # Stop the network server
    if self._server:
        await self._server.stop()

    if self._internal_server is not None:
        await self._internal_server.stop()
        self._internal_server = None

    await self._obs_registry.health_checker().stop()

    if self._save_scheduler is not None:
        await self._save_scheduler.stop()
        self._save_scheduler = None
    if self._player_disconnect_handler_id is not None:
        self._world.events.unsubscribe(self._player_disconnect_handler_id)
        self._player_disconnect_handler_id = None
    if self._dirty_tracker is not None:
        self._dirty_tracker.stop_tracking()
        self._dirty_tracker = None

    # Save world systems state before shutdown
    # This preserves grid and wilderness state across server restarts
    try:
        await save_world_systems_state(
            self._document_store,
            self._world,
            world_id="default",
        )
        _logger.info("Saved world systems state to storage")
    except Exception as e:
        _logger.error("Failed to save world systems state: %s", e)

    # Shut down the world
    await self._world.shutdown()
    self._persistence_manager = None

    # Unload content packs (in reverse order)
    for pack_name in reversed(self._content_pack_order):
        pack = self._content_packs[pack_name]
        # Unregister commands from this pack
        self._command_registry.unregister_pack(pack_name)
        await pack.on_unload(self)

    # Close AI providers
    await self._provider_registry.close_all()

    # Close document store
    await self._document_store.close()

    # Run shutdown callbacks
    for callback in self._shutdown_callbacks:
        try:
            callback()
        except Exception as e:
            # Log but don't let callbacks prevent shutdown
            _logger.warning("Shutdown callback %s raised exception: %s", callback, e)

    self._state = EngineState.STOPPED

unload_content_pack

unload_content_pack(pack_name: str) -> bool

Unload a content pack.

Can only be done when engine is stopped. Returns True if pack was unloaded.

Source code in packages/maid-engine/src/maid_engine/core/engine.py
def unload_content_pack(self, pack_name: str) -> bool:
    """Unload a content pack.

    Can only be done when engine is stopped.
    Returns True if pack was unloaded.
    """
    if self._state != EngineState.STOPPED:
        raise RuntimeError("Cannot unload content packs while engine is running")

    if pack_name not in self._content_packs:
        return False

    # Check if other packs depend on this one
    for other_name, other_pack in self._content_packs.items():
        if other_name != pack_name and pack_name in other_pack.get_dependencies():
            raise ValueError(
                f"Cannot unload '{pack_name}': "
                f"'{other_name}' depends on it"
            )

    del self._content_packs[pack_name]
    self._content_pack_order.remove(pack_name)
    return True

unregister_pack_runtime

unregister_pack_runtime(pack_name: str) -> None

Unregister a content pack at runtime (for hot reload use).

Unlike unload_content_pack(), this does not check engine state or dependent packs. The caller is responsible for those checks.

Source code in packages/maid-engine/src/maid_engine/core/engine.py
def unregister_pack_runtime(self, pack_name: str) -> None:
    """Unregister a content pack at runtime (for hot reload use).

    Unlike unload_content_pack(), this does not check engine state or
    dependent packs. The caller is responsible for those checks.
    """
    self._content_packs.pop(pack_name, None)
    if pack_name in self._content_pack_order:
        self._content_pack_order.remove(pack_name)

InMemoryDocumentStore

InMemoryDocumentStore(
    obs_registry: ObservabilityRegistry | None = None,
)

Bases: DocumentStore

In-memory document store for testing.

Uses dictionaries. Not suitable for production.

Source code in packages/maid-engine/src/maid_engine/storage/document_store.py
def __init__(self, obs_registry: ObservabilityRegistry | None = None) -> None:
    self._schemas: dict[str, type[BaseModel]] = {}
    self._collections: dict[str, InMemoryDocumentCollection[Any]] = {}
    self._query_collector: QueryCollector | None = None
    self._obs_registry = obs_registry

LayeredCommandRegistry

LayeredCommandRegistry(event_bus: EventBus | None = None)

Command registry with layered resolution for content packs.

Commands can be registered by multiple content packs. When a command is executed, the highest priority registration is used.

The registry now integrates with: - Hook system: Pre/post command hooks for logging, rate limiting, etc. - Lock system: Permission checks using lock expressions

Example

registry = LayeredCommandRegistry()

Set pack priorities (higher = wins)

registry.set_pack_priority("my-game", 100) registry.set_pack_priority("stdlib", 50) registry.set_pack_priority("engine", 0)

Register commands from different packs

registry.register("look", look_handler, pack_name="stdlib") registry.register("look", custom_look_handler, pack_name="my-game")

When "look" is executed, my-game's handler is used (priority 100)

Register hooks

registry.register_pre_hook("rate_limiter", check_rate_limit) registry.register_post_hook("logger", log_command)

Register custom lock functions

registry.register_lock_function("is_owner", check_ownership)

Source code in packages/maid-engine/src/maid_engine/commands/registry.py
def __init__(self, event_bus: EventBus | None = None) -> None:
    # command_name -> [(priority, pack_name, definition)]
    self._layers: dict[str, list[tuple[int, str, CommandDefinition]]] = defaultdict(list)
    self._pack_priorities: dict[str, int] = {}
    self._aliases: dict[str, str] = {}  # alias -> primary name

    # Event bus for emitting PlayerCommandEvent
    self._event_bus = event_bus

    # Hook system
    self._hook_registry = HookRegistry()
    self._command_executor = CommandExecutor(self._hook_registry)

    # Lock system
    self._lock_evaluator = LockEvaluator()

event_bus property writable

event_bus: EventBus | None

Get the event bus for PlayerCommandEvent emission.

hook_registry property

hook_registry: HookRegistry

Get the hook registry for advanced hook management.

lock_evaluator property

lock_evaluator: LockEvaluator

Get the lock evaluator for advanced lock management.

all_commands

all_commands() -> list[CommandDefinition]

Get highest-priority definitions for all commands.

Source code in packages/maid-engine/src/maid_engine/commands/registry.py
def all_commands(self) -> list[CommandDefinition]:
    """Get highest-priority definitions for all commands."""
    commands = []
    for name in self._layers:
        defn = self.get(name)
        if defn:
            commands.append(defn)
    return commands

commands_by_category

commands_by_category() -> dict[
    str, list[CommandDefinition]
]

Get commands grouped by category.

Source code in packages/maid-engine/src/maid_engine/commands/registry.py
def commands_by_category(self) -> dict[str, list[CommandDefinition]]:
    """Get commands grouped by category."""
    by_category: dict[str, list[CommandDefinition]] = defaultdict(list)
    for cmd in self.all_commands():
        if not cmd.hidden:
            by_category[cmd.category].append(cmd)
    return dict(by_category)

execute async

execute(
    context: CommandContext,
    player_access_level: AccessLevel = PLAYER,
) -> bool

Execute a command using the highest-priority handler with lock checking and hook pipeline.

Execution flow: 1. Find the command definition 2. Check lock expression (if defined) 3. Check access level 4. Run pre-hooks (can cancel execution) 5. Execute command handler 6. Run post-hooks (always runs, even on exception)

Parameters:

Name Type Description Default
context CommandContext

Command context containing session, player_id, command, args, etc.

required
player_access_level AccessLevel

The access level of the player executing

PLAYER

Returns:

Type Description
bool

True if command was handled successfully, False if command not found

bool

or execution was cancelled

Raises:

Type Description
PermissionError

If player lacks required access level

Source code in packages/maid-engine/src/maid_engine/commands/registry.py
async def execute(
    self,
    context: CommandContext,
    player_access_level: AccessLevel = AccessLevel.PLAYER,
) -> bool:
    """Execute a command using the highest-priority handler with lock checking and hook pipeline.

    Execution flow:
    1. Find the command definition
    2. Check lock expression (if defined)
    3. Check access level
    4. Run pre-hooks (can cancel execution)
    5. Execute command handler
    6. Run post-hooks (always runs, even on exception)

    Args:
        context: Command context containing session, player_id, command, args, etc.
        player_access_level: The access level of the player executing

    Returns:
        True if command was handled successfully, False if command not found
        or execution was cancelled

    Raises:
        PermissionError: If player lacks required access level
    """
    definition = self.get(context.command)
    if not definition:
        return False

    obs_registry = context.obs_registry
    engine = context.world.engine
    if obs_registry is None and engine is not None:
        maybe_obs_registry = getattr(engine, "obs_registry", None)
        if maybe_obs_registry is not None:
            obs_registry = maybe_obs_registry

    context_token = bind_context(
        command=definition.name,
        player_id=str(context.player_id),
        correlation_id=uuid4().hex[:16],
    )
    started = time.monotonic()
    status = "ok"
    try:
        correlation_id = get_context().correlation_id
        span_context: Any = NoOpSpan()
        tracing_settings = (
            getattr(obs_registry, "settings", None) if obs_registry is not None else None
        )
        if (
            obs_registry is not None
            and tracing_settings is not None
            and tracing_mode_enabled(tracing_settings, TracingMode.MINIMAL)
        ):
            get_tracer = getattr(obs_registry, "get_tracer", None)
            if callable(get_tracer):
                span_context = start_span_context(
                    get_tracer(__name__),
                    f"command:{definition.name}",
                )

        with span_context as span:
            span.set_attribute("command", definition.name)
            span.set_attribute("player_id", str(context.player_id))
            span.set_attribute("correlation_id", correlation_id)

            # Check locks BEFORE hooks and access level
            if definition.locks:
                # Determine target for lock evaluation
                # Use explicit target if provided, otherwise try to resolve from first argument
                target = context.target
                if target is None and context.args:
                    target = self._resolve_target_for_lock(
                        context.args[0], context.world, context.player_id
                    )

                lock_ctx = LockContext(
                    player_entity_id=context.player_id,
                    world=context.world,
                    command_name=context.command,
                    session=context.session,
                    target=target,
                )
                if not self._lock_evaluator.check(definition.locks, lock_ctx):
                    await send_to_session(context.session, "You don't have permission to do that.")
                    status = "error"
                    return False

            # Check access level
            if player_access_level < definition.access_level:
                raise PermissionError(
                    f"Command '{definition.name}' requires {definition.access_level.name} "
                    f"access (you have {player_access_level.name})"
                )

            # Create handler wrapper that calls the actual command
            # Returns True if handler returns None (implicit success)
            async def handler_wrapper() -> bool:
                result = await definition.handler(context)
                # Convert None to True (handler succeeded but didn't return a value)
                return result if result is not None else True

            # Execute with hook pipeline
            # Note: CommandExecutor returns None when pre-hooks cancel execution
            success = False
            try:
                result = await self._command_executor.execute(
                    handler=handler_wrapper,
                    command_name=definition.name,
                    category=definition.category,
                    args=context.args,
                    raw_input=context.raw_input,
                    session=context.session,
                    player_entity_id=context.player_id,
                    world=context.world,
                    metadata=definition.metadata,
                )
                # If result is None, pre-hooks cancelled execution
                # If result is True/False, that's the handler result
                success = False if result is None else bool(result)
                return success
            except Exception as e:
                # Catch command execution errors and convert to user-friendly message.
                # We don't re-raise because commands should fail gracefully with feedback.
                logger.exception("Error executing command %r", context.command)
                await send_to_session(context.session, f"Error executing command: {e}")
                success = False
                status = "error"
                span.set_attribute("error", True)
                return False
            finally:
                duration = time.monotonic() - started
                if not success:
                    status = "error"
                span.set_attribute("duration_ms", duration * 1000.0)
                span.set_attribute("status", status)
                span.set_attribute("success", success)

                if obs_registry is not None:
                    with safe_observe("command_execute", registry=obs_registry):
                        meter = obs_registry.get_meter()
                        duration_histogram = meter.create_histogram(
                            "command_duration_seconds",
                            "Command execution duration in seconds",
                            labels=("command",),
                        )
                        command_counter = meter.create_counter(
                            "commands_total",
                            "Total number of commands processed",
                            labels=("command", "status"),
                        )
                        duration_histogram.labels(command=definition.name).observe(duration)
                        command_counter.labels(command=definition.name, status=status).inc()

                # Emit PlayerCommandEvent after execution completes (success or failure).
                # This is intentionally in the finally block to ensure the event is ALWAYS
                # emitted, even when exceptions occur. The event includes success=False for
                # failures, allowing observers (logging, analytics, WebSocket streaming) to
                # track all command executions. The event is emitted before the function
                # returns, which is correct - observers need notification regardless of outcome.
                # Import locally to avoid circular imports.
                if self._event_bus:
                    from maid_engine.core.events import PlayerCommandEvent

                    event = PlayerCommandEvent(
                        player_id=context.player_id,
                        command=definition.name,
                        args=context.args,
                        success=success,
                        raw_input=context.raw_input,
                    )
                    await self._event_bus.emit(event)
    finally:
        restore_context(context_token)

get

get(name: str) -> CommandDefinition | None

Get the highest-priority command definition by name, alias, or prefix.

Source code in packages/maid-engine/src/maid_engine/commands/registry.py
def get(self, name: str) -> CommandDefinition | None:
    """Get the highest-priority command definition by name, alias, or prefix."""
    # Check if it's an alias
    if name in self._aliases:
        name = self._aliases[name]

    layers = self._layers.get(name, [])
    if layers:
        return layers[0][2]

    # Try prefix matching — find all commands/aliases starting with name
    matches: set[str] = set()
    for cmd_name in self._layers:
        if cmd_name.startswith(name):
            matches.add(cmd_name)
    for alias, cmd_name in self._aliases.items():
        if alias.startswith(name):
            matches.add(cmd_name)

    if len(matches) == 1:
        resolved = matches.pop()
        result_layers = self._layers.get(resolved, [])
        if result_layers:
            return result_layers[0][2]
    return None

get_all_layers

get_all_layers(name: str) -> list[CommandDefinition]

Get all definitions for a command across all packs.

Source code in packages/maid-engine/src/maid_engine/commands/registry.py
def get_all_layers(self, name: str) -> list[CommandDefinition]:
    """Get all definitions for a command across all packs."""
    if name in self._aliases:
        name = self._aliases[name]

    return [layer[2] for layer in self._layers.get(name, [])]

get_commands_for_pack

get_commands_for_pack(pack_name: str) -> list[str]

Get all command names registered by a content pack.

This is useful for hot reload inspection to see what commands a pack has registered.

Parameters:

Name Type Description Default
pack_name str

Name of the content pack

required

Returns:

Type Description
list[str]

List of command names registered by the pack

Source code in packages/maid-engine/src/maid_engine/commands/registry.py
def get_commands_for_pack(self, pack_name: str) -> list[str]:
    """Get all command names registered by a content pack.

    This is useful for hot reload inspection to see what commands
    a pack has registered.

    Args:
        pack_name: Name of the content pack

    Returns:
        List of command names registered by the pack
    """
    commands: list[str] = []
    for name, layers in self._layers.items():
        for _, pname, _ in layers:
            if pname == pack_name:
                commands.append(name)
                break  # Only count each command once per pack
    return commands

get_pack_command_definitions

get_pack_command_definitions(
    pack_name: str,
) -> list[CommandDefinition]

Get all command definitions registered by a content pack.

Parameters:

Name Type Description Default
pack_name str

Name of the content pack

required

Returns:

Type Description
list[CommandDefinition]

List of CommandDefinition objects registered by the pack

Source code in packages/maid-engine/src/maid_engine/commands/registry.py
def get_pack_command_definitions(
    self, pack_name: str
) -> list[CommandDefinition]:
    """Get all command definitions registered by a content pack.

    Args:
        pack_name: Name of the content pack

    Returns:
        List of CommandDefinition objects registered by the pack
    """
    definitions: list[CommandDefinition] = []
    for layers in self._layers.values():
        for _, pname, definition in layers:
            if pname == pack_name:
                definitions.append(definition)
    return definitions

get_pack_priority

get_pack_priority(pack_name: str) -> int

Get the priority for a content pack.

Source code in packages/maid-engine/src/maid_engine/commands/registry.py
def get_pack_priority(self, pack_name: str) -> int:
    """Get the priority for a content pack."""
    return self._pack_priorities.get(pack_name, 0)

register

register(
    name: str,
    handler: CommandHandler,
    pack_name: str,
    *,
    priority: int | None = None,
    aliases: list[str] | None = None,
    category: str = "general",
    description: str = "",
    usage: str = "",
    access_level: AccessLevel = PLAYER,
    hidden: bool = False,
    locks: str = "",
    metadata: dict[str, Any] | None = None,
) -> None

Register a command from a content pack.

Parameters:

Name Type Description Default
name str

Primary command name

required
handler CommandHandler

Async function to handle the command

required
pack_name str

Name of the content pack registering this command

required
priority int | None

Priority for layered resolution (defaults to pack priority)

None
aliases list[str] | None

Alternative names for this command

None
category str

Command category for help

'general'
description str

Short description

''
usage str

Usage string

''
access_level AccessLevel

Minimum access level required

PLAYER
hidden bool

Whether to hide from help listings

False
locks str

Lock expression for permission checks (e.g., "perm(admin) OR owns()")

''
metadata dict[str, Any] | None

Additional metadata for hooks (e.g., cooldowns, combat state)

None
Source code in packages/maid-engine/src/maid_engine/commands/registry.py
def register(
    self,
    name: str,
    handler: CommandHandler,
    pack_name: str,
    *,
    priority: int | None = None,
    aliases: list[str] | None = None,
    category: str = "general",
    description: str = "",
    usage: str = "",
    access_level: AccessLevel = AccessLevel.PLAYER,
    hidden: bool = False,
    locks: str = "",
    metadata: dict[str, Any] | None = None,
) -> None:
    """Register a command from a content pack.

    Args:
        name: Primary command name
        handler: Async function to handle the command
        pack_name: Name of the content pack registering this command
        priority: Priority for layered resolution (defaults to pack priority)
        aliases: Alternative names for this command
        category: Command category for help
        description: Short description
        usage: Usage string
        access_level: Minimum access level required
        hidden: Whether to hide from help listings
        locks: Lock expression for permission checks (e.g., "perm(admin) OR owns()")
        metadata: Additional metadata for hooks (e.g., cooldowns, combat state)
    """
    # Use pack priority if not explicitly specified
    if priority is None:
        priority = self.get_pack_priority(pack_name)

    definition = CommandDefinition(
        name=name,
        handler=handler,
        pack_name=pack_name,
        priority=priority,
        aliases=aliases or [],
        category=category,
        description=description,
        usage=usage,
        access_level=access_level,
        hidden=hidden,
        locks=locks,
        metadata=metadata or {},
    )

    # Add to layers
    layers = self._layers[name]
    layers.append((priority, pack_name, definition))
    # Keep sorted by priority (highest first)
    layers.sort(key=lambda x: x[0], reverse=True)

    # Register aliases
    for alias in definition.aliases:
        self._aliases[alias] = name

register_cooldown_hooks

register_cooldown_hooks(
    cooldown_hook: CooldownHook,
    *,
    pre_hook_name: str = "cooldown_check",
    post_hook_name: str = "cooldown_record",
    pre_priority: HookPriority = HIGH,
    post_priority: HookPriority = NORMAL,
    record_on_failure: bool = False,
    commands: list[str] | None = None,
    categories: list[str] | None = None,
) -> CooldownRecordHook

Register both CooldownHook and CooldownRecordHook together.

This is the recommended way to set up cooldown tracking. It ensures that both the pre-hook (for checking) and post-hook (for recording) are properly registered with shared state.

Parameters:

Name Type Description Default
cooldown_hook CooldownHook

The CooldownHook instance to register

required
pre_hook_name str

Name for the pre-hook (default: "cooldown_check")

'cooldown_check'
post_hook_name str

Name for the post-hook (default: "cooldown_record")

'cooldown_record'
pre_priority HookPriority

Priority for the pre-hook (default: HIGH)

HIGH
post_priority HookPriority

Priority for the post-hook (default: NORMAL)

NORMAL
record_on_failure bool

If True, record cooldown even on command failure

False
commands list[str] | None

Filter to specific command names (None = all)

None
categories list[str] | None

Filter to specific categories (None = all)

None

Returns:

Type Description
CooldownRecordHook

The created CooldownRecordHook instance

Example

cooldown_hook = CooldownHook(document_store=store) await cooldown_hook.load_cooldowns()

record_hook = registry.register_cooldown_hooks(cooldown_hook)

Before shutdown:

await cooldown_hook.save_cooldowns()

Source code in packages/maid-engine/src/maid_engine/commands/registry.py
def register_cooldown_hooks(
    self,
    cooldown_hook: CooldownHook,
    *,
    pre_hook_name: str = "cooldown_check",
    post_hook_name: str = "cooldown_record",
    pre_priority: HookPriority = HookPriority.HIGH,
    post_priority: HookPriority = HookPriority.NORMAL,
    record_on_failure: bool = False,
    commands: list[str] | None = None,
    categories: list[str] | None = None,
) -> CooldownRecordHook:
    """Register both CooldownHook and CooldownRecordHook together.

    This is the recommended way to set up cooldown tracking. It ensures
    that both the pre-hook (for checking) and post-hook (for recording)
    are properly registered with shared state.

    Args:
        cooldown_hook: The CooldownHook instance to register
        pre_hook_name: Name for the pre-hook (default: "cooldown_check")
        post_hook_name: Name for the post-hook (default: "cooldown_record")
        pre_priority: Priority for the pre-hook (default: HIGH)
        post_priority: Priority for the post-hook (default: NORMAL)
        record_on_failure: If True, record cooldown even on command failure
        commands: Filter to specific command names (None = all)
        categories: Filter to specific categories (None = all)

    Returns:
        The created CooldownRecordHook instance

    Example:
        cooldown_hook = CooldownHook(document_store=store)
        await cooldown_hook.load_cooldowns()

        record_hook = registry.register_cooldown_hooks(cooldown_hook)

        # Before shutdown:
        await cooldown_hook.save_cooldowns()
    """
    return self._hook_registry.register_cooldown_hooks(
        cooldown_hook,
        pre_hook_name=pre_hook_name,
        post_hook_name=post_hook_name,
        pre_priority=pre_priority,
        post_priority=post_priority,
        record_on_failure=record_on_failure,
        commands=commands,
        categories=categories,
    )

register_lock_function

register_lock_function(
    name: str,
    func: Callable[[LockContext, list[str]], bool],
) -> None

Register a custom lock function.

Lock functions are named predicates that can be used in lock expressions. Each function receives a LockContext and a list of string arguments, and returns a boolean.

Parameters:

Name Type Description Default
name str

The function name used in expressions (e.g., "is_owner")

required
func Callable[[LockContext, list[str]], bool]

The function to call, receives (context, args) and returns bool

required
Example

def check_gold(ctx: LockContext, args: list[str]) -> bool: player = ctx.get_player() if not player: return False required = int(args[0]) if args else 0 return player.gold >= required

registry.register_lock_function("has_gold", check_gold)

Now you can use "has_gold(100)" in lock expressions
Source code in packages/maid-engine/src/maid_engine/commands/registry.py
def register_lock_function(
    self, name: str, func: Callable[[LockContext, list[str]], bool]
) -> None:
    """Register a custom lock function.

    Lock functions are named predicates that can be used in lock expressions.
    Each function receives a LockContext and a list of string arguments,
    and returns a boolean.

    Args:
        name: The function name used in expressions (e.g., "is_owner")
        func: The function to call, receives (context, args) and returns bool

    Example:
        def check_gold(ctx: LockContext, args: list[str]) -> bool:
            player = ctx.get_player()
            if not player:
                return False
            required = int(args[0]) if args else 0
            return player.gold >= required

        registry.register_lock_function("has_gold", check_gold)
        # Now you can use "has_gold(100)" in lock expressions
    """
    self._lock_evaluator.register_function(name, func)

register_post_hook

register_post_hook(
    name: str,
    handler: Callable[..., Any],
    priority: HookPriority = NORMAL,
    commands: list[str] | None = None,
    categories: list[str] | None = None,
) -> None

Register a post-command hook.

Post-hooks run after the command handler completes.

Parameters:

Name Type Description Default
name str

Unique identifier for the hook

required
handler Callable[..., Any]

Async or sync function accepting PostHookContext

required
priority HookPriority

Execution priority (lower runs first)

NORMAL
commands list[str] | None

Filter to specific command names (None = all)

None
categories list[str] | None

Filter to specific categories (None = all)

None
Source code in packages/maid-engine/src/maid_engine/commands/registry.py
def register_post_hook(
    self,
    name: str,
    handler: Callable[..., Any],
    priority: HookPriority = HookPriority.NORMAL,
    commands: list[str] | None = None,
    categories: list[str] | None = None,
) -> None:
    """Register a post-command hook.

    Post-hooks run after the command handler completes.

    Args:
        name: Unique identifier for the hook
        handler: Async or sync function accepting PostHookContext
        priority: Execution priority (lower runs first)
        commands: Filter to specific command names (None = all)
        categories: Filter to specific categories (None = all)
    """
    self._hook_registry.register_post_hook(name, handler, priority, commands, categories)

register_pre_hook

register_pre_hook(
    name: str,
    handler: Callable[..., Any],
    priority: HookPriority = NORMAL,
    commands: list[str] | None = None,
    categories: list[str] | None = None,
) -> None

Register a pre-command hook.

Pre-hooks run before the command handler and can cancel execution.

Parameters:

Name Type Description Default
name str

Unique identifier for the hook

required
handler Callable[..., Any]

Async or sync function accepting PreHookContext

required
priority HookPriority

Execution priority (lower runs first)

NORMAL
commands list[str] | None

Filter to specific command names (None = all)

None
categories list[str] | None

Filter to specific categories (None = all)

None
Source code in packages/maid-engine/src/maid_engine/commands/registry.py
def register_pre_hook(
    self,
    name: str,
    handler: Callable[..., Any],
    priority: HookPriority = HookPriority.NORMAL,
    commands: list[str] | None = None,
    categories: list[str] | None = None,
) -> None:
    """Register a pre-command hook.

    Pre-hooks run before the command handler and can cancel execution.

    Args:
        name: Unique identifier for the hook
        handler: Async or sync function accepting PreHookContext
        priority: Execution priority (lower runs first)
        commands: Filter to specific command names (None = all)
        categories: Filter to specific categories (None = all)
    """
    self._hook_registry.register_pre_hook(name, handler, priority, commands, categories)

set_pack_priority

set_pack_priority(pack_name: str, priority: int) -> None

Set the priority for a content pack.

Higher priority packs override lower priority ones.

Source code in packages/maid-engine/src/maid_engine/commands/registry.py
def set_pack_priority(self, pack_name: str, priority: int) -> None:
    """Set the priority for a content pack.

    Higher priority packs override lower priority ones.
    """
    self._pack_priorities[pack_name] = priority

unregister

unregister(name: str, pack_name: str) -> bool

Unregister a command from a specific pack.

Source code in packages/maid-engine/src/maid_engine/commands/registry.py
def unregister(self, name: str, pack_name: str) -> bool:
    """Unregister a command from a specific pack."""
    if name not in self._layers:
        return False

    layers = self._layers[name]
    original_len = len(layers)

    # Collect aliases that the removed pack defined
    removed_aliases: set[str] = set()
    for _, pname, definition in layers:
        if pname == pack_name:
            removed_aliases.update(definition.aliases)

    # Remove entries for this pack
    self._layers[name] = [
        layer for layer in layers if layer[1] != pack_name
    ]

    # Collect aliases still defined by remaining layers
    remaining_aliases: set[str] = set()
    for _, _, definition in self._layers.get(name, []):
        remaining_aliases.update(definition.aliases)

    # Only delete aliases that are no longer defined by any layer
    for alias in removed_aliases:
        if alias not in remaining_aliases and self._aliases.get(alias) == name:
            del self._aliases[alias]

    if len(self._layers[name]) == 0:
        del self._layers[name]

    return len(self._layers.get(name, [])) < original_len

unregister_hook

unregister_hook(name: str) -> bool

Unregister a hook by name.

Parameters:

Name Type Description Default
name str

The unique identifier of the hook to remove

required

Returns:

Type Description
bool

True if a hook was found and removed, False otherwise

Source code in packages/maid-engine/src/maid_engine/commands/registry.py
def unregister_hook(self, name: str) -> bool:
    """Unregister a hook by name.

    Args:
        name: The unique identifier of the hook to remove

    Returns:
        True if a hook was found and removed, False otherwise
    """
    return self._hook_registry.unregister(name)

unregister_pack

unregister_pack(pack_name: str) -> int

Unregister all commands from a pack.

Returns:

Type Description
int

Number of commands unregistered

Source code in packages/maid-engine/src/maid_engine/commands/registry.py
def unregister_pack(self, pack_name: str) -> int:
    """Unregister all commands from a pack.

    Returns:
        Number of commands unregistered
    """
    count = 0
    for name in list(self._layers.keys()):
        if self.unregister(name, pack_name):
            count += 1
    return count

Positionable

Bases: Protocol

Protocol for components that track an entity's room location.

Any component with a mutable room_id attribute satisfies this protocol. The engine uses it in World.move_entity() and World.place_entity_in_room() to update position data without importing concrete PositionComponent types from maid-stdlib.

Example

class PositionComponent(Component, Positionable): room_id: UUID

QueryOptions dataclass

QueryOptions(
    filters: dict[str, Any] = dict(),
    order_by: str | None = None,
    order: SortOrder = ASC,
    limit: int | None = None,
    offset: int = 0,
)

Options for querying documents.

Attributes:

Name Type Description
filters dict[str, Any]

Dict of field_name -> value for exact matches

order_by str | None

Field name to sort by

order SortOrder

Sort direction

limit int | None

Maximum number of results

offset int

Number of results to skip

RoomEnterEvent dataclass

RoomEnterEvent(
    entity_id: UUID,
    room_id: UUID,
    from_room_id: UUID | None = None,
)

Bases: Event

Emitted when an entity enters a room.

RoomIndex

RoomIndex()

Index for efficient room-based entity lookups.

Source code in packages/maid-engine/src/maid_engine/core/world.py
def __init__(self) -> None:
    self._entities_by_room: dict[UUID, set[UUID]] = defaultdict(set)
    self._room_by_entity: dict[UUID, UUID] = {}

add

add(entity_id: UUID, room_id: UUID) -> None

Add entity to room index.

Source code in packages/maid-engine/src/maid_engine/core/world.py
def add(self, entity_id: UUID, room_id: UUID) -> None:
    """Add entity to room index."""
    # Remove from previous room if present
    if entity_id in self._room_by_entity:
        old_room = self._room_by_entity[entity_id]
        self._entities_by_room[old_room].discard(entity_id)

    self._entities_by_room[room_id].add(entity_id)
    self._room_by_entity[entity_id] = room_id

clear

clear() -> None

Clear all indexes.

Source code in packages/maid-engine/src/maid_engine/core/world.py
def clear(self) -> None:
    """Clear all indexes."""
    self._entities_by_room.clear()
    self._room_by_entity.clear()

count_in_room

count_in_room(room_id: UUID) -> int

Get count of entities in a room.

Source code in packages/maid-engine/src/maid_engine/core/world.py
def count_in_room(self, room_id: UUID) -> int:
    """Get count of entities in a room."""
    return len(self._entities_by_room.get(room_id, set()))

get_entities

get_entities(room_id: UUID) -> set[UUID]

Get all entities in a room.

Source code in packages/maid-engine/src/maid_engine/core/world.py
def get_entities(self, room_id: UUID) -> set[UUID]:
    """Get all entities in a room."""
    return self._entities_by_room.get(room_id, set()).copy()

get_room

get_room(entity_id: UUID) -> UUID | None

Get the room an entity is in.

Source code in packages/maid-engine/src/maid_engine/core/world.py
def get_room(self, entity_id: UUID) -> UUID | None:
    """Get the room an entity is in."""
    return self._room_by_entity.get(entity_id)

move

move(entity_id: UUID, new_room_id: UUID) -> UUID | None

Move entity to new room. Returns previous room ID.

Source code in packages/maid-engine/src/maid_engine/core/world.py
def move(self, entity_id: UUID, new_room_id: UUID) -> UUID | None:
    """Move entity to new room. Returns previous room ID."""
    old_room = self._room_by_entity.get(entity_id)
    if old_room == new_room_id:
        return old_room  # Already in this room

    if old_room:
        self._entities_by_room[old_room].discard(entity_id)

    self._entities_by_room[new_room_id].add(entity_id)
    self._room_by_entity[entity_id] = new_room_id
    return old_room

remove

remove(entity_id: UUID) -> UUID | None

Remove entity from index. Returns previous room ID.

Source code in packages/maid-engine/src/maid_engine/core/world.py
def remove(self, entity_id: UUID) -> UUID | None:
    """Remove entity from index. Returns previous room ID."""
    room_id = self._room_by_entity.pop(entity_id, None)
    if room_id:
        self._entities_by_room[room_id].discard(entity_id)
    return room_id

RoomLeaveEvent dataclass

RoomLeaveEvent(
    entity_id: UUID,
    room_id: UUID,
    to_room_id: UUID | None = None,
)

Bases: Event

Emitted when an entity leaves a room.

SortOrder

Bases: Enum

Sort order for queries.

StatusEffectProvider

Bases: Protocol

Protocol for systems that manage status effects.

Used by the checkpoint system to capture active status effects during hot reload without importing StatusEffectManager by name.

get_effects

get_effects(entity_id: UUID) -> list[Any]

Get active status effects for an entity.

Source code in packages/maid-engine/src/maid_engine/core/protocols.py
def get_effects(self, entity_id: UUID) -> list[Any]:
    """Get active status effects for an entity."""
    ...

StorageAware

Bases: Protocol

Protocol for systems that need document store access.

Systems implementing this protocol will automatically receive the document store during registration with the engine.

Example

class GuildSystem(System, StorageAware): def init(self, world: World) -> None: super().init(world) self._storage: DocumentStore | None = None

def set_storage(self, store: DocumentStore) -> None:
    self._storage = store

async def startup(self) -> None:
    if self._storage:
        # Load from database
        ...

set_storage

set_storage(store: DocumentStore) -> None

Receive the document store for persistence.

Called automatically by the engine when the system is registered.

Parameters:

Name Type Description Default
store DocumentStore

The document store instance

required
Source code in packages/maid-engine/src/maid_engine/storage/protocols.py
def set_storage(self, store: "DocumentStore") -> None:
    """Receive the document store for persistence.

    Called automatically by the engine when the system is registered.

    Args:
        store: The document store instance
    """
    ...

System

System(world: World)

Bases: ABC

Base class for all ECS systems.

Systems contain the logic that operates on entities with specific components. They are executed each tick by the SystemManager.

Example

class MovementSystem(System): priority = 10 # Lower = runs earlier

async def update(self, delta: float) -> None:
    for entity in self.entities.with_components(
        PositionComponent, MovementComponent
    ):
        pos = entity.get(PositionComponent)
        mov = entity.get(MovementComponent)
        # Process movement...
Source code in packages/maid-engine/src/maid_engine/core/ecs/system.py
def __init__(self, world: World) -> None:
    self._world = world

entities property

entities: EntityManager

Get the entity manager.

events property

events: EventBus

Get the event bus.

world property

world: World

Get the world instance.

shutdown async

shutdown() -> None

Called when the system shuts down. Override in subclasses.

Source code in packages/maid-engine/src/maid_engine/core/ecs/system.py
async def shutdown(self) -> None:  # noqa: B027
    """Called when the system shuts down. Override in subclasses."""
    pass

startup async

startup() -> None

Called when the system starts up. Override in subclasses.

Source code in packages/maid-engine/src/maid_engine/core/ecs/system.py
async def startup(self) -> None:  # noqa: B027
    """Called when the system starts up. Override in subclasses."""
    pass

update abstractmethod async

update(delta: float) -> None

Update the system.

Parameters:

Name Type Description Default
delta float

Time since last update in seconds

required
Source code in packages/maid-engine/src/maid_engine/core/ecs/system.py
@abstractmethod
async def update(self, delta: float) -> None:
    """Update the system.

    Args:
        delta: Time since last update in seconds
    """
    pass

SystemErrorEvent dataclass

SystemErrorEvent(
    error_type: str = "",
    error_message: str = "",
    error_source: str = "",
    handler_name: str | None = None,
    event_type_name: str | None = None,
    tick_number: int | None = None,
)

Bases: Event

Emitted when an error occurs in the tick loop or event dispatch.

Monitoring systems can subscribe to this event to track errors, trigger alerts, or collect diagnostics. The error_source field indicates where the error originated (e.g., "tick_loop" or "event_handler").

Note: This event is emitted on a best-effort basis. If the EventBus itself is in a broken state, emission may be skipped to avoid cascading failures.

SystemManager

SystemManager(world: World)

Manages and runs all ECS systems.

The SystemManager handles system lifecycle and executes systems in priority order each tick. It also tracks which content pack registered each system to support hot reloading.

Example

manager = SystemManager(world) manager.register(MovementSystem(world), source_pack="maid-stdlib") manager.register(CombatSystem(world), source_pack="maid-classic-rpg")

In game loop

await manager.update(delta_time)

Hot reload: unregister all systems from a pack

removed = await manager.unregister_systems_for_pack("maid-classic-rpg")

Source code in packages/maid-engine/src/maid_engine/core/ecs/system.py
def __init__(self, world: World) -> None:
    self._world = world
    self._systems: list[System] = []
    self._enabled_systems: list[System] = []
    self._systems_by_type: dict[type[System], System] = {}
    self._started = False

    # Track which content pack registered each system (system class name -> pack name)
    self._system_sources: dict[str, str] = {}

    # Track systems by pack for efficient lookup (pack name -> set of system class names)
    self._systems_by_pack: dict[str, set[str]] = {}

    # Track system dependencies (system class name -> set of dependency class names)
    self._system_dependencies: dict[str, set[str]] = {}

    # Tick pause control for safe system removal
    self._removal_pause_event: asyncio.Event | None = None
    self._tick_in_progress = False
    self._tick_complete_event: asyncio.Event = asyncio.Event()
    self._tick_complete_event.set()  # Initially not in a tick

is_tick_in_progress property

is_tick_in_progress: bool

Check if a tick is currently in progress.

is_tick_paused property

is_tick_paused: bool

Check if ticks are currently paused for removal.

systems property

systems: list[System]

Get all registered systems (sorted by priority).

disable

disable(system_type: type[System]) -> bool

Disable a system. Returns True if system exists.

Source code in packages/maid-engine/src/maid_engine/core/ecs/system.py
def disable(self, system_type: type[System]) -> bool:
    """Disable a system. Returns True if system exists."""
    system = self._systems_by_type.get(system_type)
    if system:
        system.enabled = False
        self._rebuild_enabled_systems()
        return True
    return False

enable

enable(system_type: type[System]) -> bool

Enable a system. Returns True if system exists.

Source code in packages/maid-engine/src/maid_engine/core/ecs/system.py
def enable(self, system_type: type[System]) -> bool:
    """Enable a system. Returns True if system exists."""
    system = self._systems_by_type.get(system_type)
    if system:
        system.enabled = True
        self._rebuild_enabled_systems()
        return True
    return False

get

get(system_type: type[System]) -> System | None

Get a registered system by type.

Source code in packages/maid-engine/src/maid_engine/core/ecs/system.py
def get(self, system_type: type[System]) -> System | None:
    """Get a registered system by type."""
    return self._systems_by_type.get(system_type)

get_or_raise

get_or_raise(system_type: type[System]) -> System

Get a registered system by type, raising KeyError if not found.

Source code in packages/maid-engine/src/maid_engine/core/ecs/system.py
def get_or_raise(self, system_type: type[System]) -> System:
    """Get a registered system by type, raising KeyError if not found."""
    system = self._systems_by_type.get(system_type)
    if system is None:
        raise KeyError(f"System {system_type.__name__} not registered")
    return system

get_source_pack

get_source_pack(system_type: type[System]) -> str | None

Get the source pack name for a system.

Parameters:

Name Type Description Default
system_type type[System]

The type of system to look up

required

Returns:

Type Description
str | None

The pack name or None if no source pack was specified

Source code in packages/maid-engine/src/maid_engine/core/ecs/system.py
def get_source_pack(self, system_type: type[System]) -> str | None:
    """Get the source pack name for a system.

    Args:
        system_type: The type of system to look up

    Returns:
        The pack name or None if no source pack was specified
    """
    return self._system_sources.get(system_type.__name__)

get_system_dependents

get_system_dependents(
    system_type: type[System],
) -> list[type[System]]

Get systems that depend on the given system type.

Parameters:

Name Type Description Default
system_type type[System]

The system type to check

required

Returns:

Type Description
list[type[System]]

List of system types that depend on this system

Source code in packages/maid-engine/src/maid_engine/core/ecs/system.py
def get_system_dependents(
    self, system_type: type[System]
) -> list[type[System]]:
    """Get systems that depend on the given system type.

    Args:
        system_type: The system type to check

    Returns:
        List of system types that depend on this system
    """
    system_name = system_type.__name__
    dependents: list[type[System]] = []

    for other_type in self._systems_by_type:
        other_name = other_type.__name__
        deps = self._system_dependencies.get(other_name, set())
        if system_name in deps:
            dependents.append(other_type)

    return dependents

get_system_types_for_pack

get_system_types_for_pack(
    pack_name: str,
) -> list[type[System]]

Get all system types registered by a specific content pack.

Parameters:

Name Type Description Default
pack_name str

The name of the content pack

required

Returns:

Type Description
list[type[System]]

List of system types registered by this pack

Source code in packages/maid-engine/src/maid_engine/core/ecs/system.py
def get_system_types_for_pack(self, pack_name: str) -> list[type[System]]:
    """Get all system types registered by a specific content pack.

    Args:
        pack_name: The name of the content pack

    Returns:
        List of system types registered by this pack
    """
    system_names = self._systems_by_pack.get(pack_name, set())
    return [
        system_type
        for system_type in self._systems_by_type
        if system_type.__name__ in system_names
    ]

get_systems_for_pack

get_systems_for_pack(pack_name: str) -> list[System]

Get all systems registered by a specific content pack.

Parameters:

Name Type Description Default
pack_name str

The name of the content pack

required

Returns:

Type Description
list[System]

List of systems registered by this pack

Source code in packages/maid-engine/src/maid_engine/core/ecs/system.py
def get_systems_for_pack(self, pack_name: str) -> list[System]:
    """Get all systems registered by a specific content pack.

    Args:
        pack_name: The name of the content pack

    Returns:
        List of systems registered by this pack
    """
    system_names = self._systems_by_pack.get(pack_name, set())
    result: list[System] = []
    for system in self._systems:
        if type(system).__name__ in system_names:
            result.append(system)
    return result

pause_for_removal async

pause_for_removal() -> None

Pause tick processing for safe system removal.

This method waits for any in-progress tick to complete and prevents new ticks from starting until resume_after_removal is called.

Source code in packages/maid-engine/src/maid_engine/core/ecs/system.py
async def pause_for_removal(self) -> None:
    """Pause tick processing for safe system removal.

    This method waits for any in-progress tick to complete and
    prevents new ticks from starting until resume_after_removal is called.
    """
    # Wait for current tick to complete if one is in progress
    if self._tick_in_progress:
        _logger.debug("Waiting for current tick to complete...")
        await self._tick_complete_event.wait()

    # Set pause event to prevent new ticks
    self._removal_pause_event = asyncio.Event()
    _logger.debug("System removal pause activated")

register

register(
    system: System,
    source_pack: str | None = None,
    dependencies: list[type[System]] | None = None,
) -> None

Register a system.

Note: If the SystemManager is already started (i.e., startup() has been called), this method does NOT call startup() on the newly registered system. Use register_and_start() for hot reload scenarios where you need the system to be started immediately after registration.

Parameters:

Name Type Description Default
system System

The system instance to register

required
source_pack str | None

Optional name of the content pack that registered this system

None
dependencies list[type[System]] | None

Optional list of system types this system depends on

None
Source code in packages/maid-engine/src/maid_engine/core/ecs/system.py
def register(
    self,
    system: System,
    source_pack: str | None = None,
    dependencies: list[type[System]] | None = None,
) -> None:
    """Register a system.

    Note: If the SystemManager is already started (i.e., startup() has been called),
    this method does NOT call startup() on the newly registered system. Use
    register_and_start() for hot reload scenarios where you need the system
    to be started immediately after registration.

    Args:
        system: The system instance to register
        source_pack: Optional name of the content pack that registered this system
        dependencies: Optional list of system types this system depends on
    """
    system_type = type(system)
    system_name = system_type.__name__

    if system_type in self._systems_by_type:
        raise ValueError(f"System {system_name} already registered")

    self._systems.append(system)
    self._systems_by_type[system_type] = system

    # Track source pack
    if source_pack is not None:
        self._system_sources[system_name] = source_pack
        if source_pack not in self._systems_by_pack:
            self._systems_by_pack[source_pack] = set()
        self._systems_by_pack[source_pack].add(system_name)

    # Track dependencies
    if dependencies:
        self._system_dependencies[system_name] = {
            dep.__name__ for dep in dependencies
        }

    # Keep systems sorted by priority
    self._systems.sort(key=lambda s: s.priority)

    # Rebuild the enabled systems cache
    self._rebuild_enabled_systems()

    _logger.debug(
        f"Registered system {system_name}"
        + (f" from pack {source_pack}" if source_pack else "")
    )

register_and_start async

register_and_start(
    system: System,
    source_pack: str | None = None,
    dependencies: list[type[System]] | None = None,
) -> None

Register a system and start it if the manager is already running.

This method should be used for hot reload scenarios where systems are being registered after the engine has started. It ensures the newly registered system's startup() method is called.

Parameters:

Name Type Description Default
system System

The system instance to register

required
source_pack str | None

Optional name of the content pack that registered this system

None
dependencies list[type[System]] | None

Optional list of system types this system depends on

None
Source code in packages/maid-engine/src/maid_engine/core/ecs/system.py
async def register_and_start(
    self,
    system: System,
    source_pack: str | None = None,
    dependencies: list[type[System]] | None = None,
) -> None:
    """Register a system and start it if the manager is already running.

    This method should be used for hot reload scenarios where systems are
    being registered after the engine has started. It ensures the newly
    registered system's startup() method is called.

    Args:
        system: The system instance to register
        source_pack: Optional name of the content pack that registered this system
        dependencies: Optional list of system types this system depends on
    """
    self.register(system, source_pack=source_pack, dependencies=dependencies)

    # If the manager is already started, call startup on the new system
    if self._started:
        system_name = type(system).__name__
        try:
            await system.startup()
            _logger.debug(f"Started hot-reloaded system {system_name}")
        except Exception as e:
            _logger.error(f"Error starting hot-reloaded system {system_name}: {e}")
            raise

remove_system_safely async

remove_system_safely(
    system_type: type[System],
    check_dependencies: bool = True,
) -> System | None

Remove a single system safely with tick pause.

This is a convenience method that handles pausing ticks, removing the system, and resuming ticks.

Parameters:

Name Type Description Default
system_type type[System]

The type of system to remove

required
check_dependencies bool

Whether to check for dependent systems

True

Returns:

Type Description
System | None

The removed system or None if not found

Raises:

Type Description
SystemDependencyError

If other systems depend on this one

Source code in packages/maid-engine/src/maid_engine/core/ecs/system.py
async def remove_system_safely(
    self,
    system_type: type[System],
    check_dependencies: bool = True,
) -> System | None:
    """Remove a single system safely with tick pause.

    This is a convenience method that handles pausing ticks,
    removing the system, and resuming ticks.

    Args:
        system_type: The type of system to remove
        check_dependencies: Whether to check for dependent systems

    Returns:
        The removed system or None if not found

    Raises:
        SystemDependencyError: If other systems depend on this one
    """
    async with self.safe_removal_context() as ctx:
        return await ctx.remove_system(
            system_type, check_dependencies=check_dependencies
        )

resume_after_removal async

resume_after_removal() -> None

Resume tick processing after system removal.

Source code in packages/maid-engine/src/maid_engine/core/ecs/system.py
async def resume_after_removal(self) -> None:
    """Resume tick processing after system removal."""
    if self._removal_pause_event is not None:
        self._removal_pause_event.set()
        self._removal_pause_event = None
        _logger.debug("System removal pause deactivated")

safe_removal_context async

safe_removal_context() -> AsyncIterator[
    SystemRemovalContext
]

Context manager for safe system removal.

This provides a convenient way to safely remove systems while ensuring ticks are paused and properly resumed.

Example

async with manager.safe_removal_context() as ctx: await ctx.remove_system(MovementSystem) await ctx.remove_system(CombatSystem)

Yields:

Type Description
AsyncIterator[SystemRemovalContext]

SystemRemovalContext for performing removals

Source code in packages/maid-engine/src/maid_engine/core/ecs/system.py
@asynccontextmanager
async def safe_removal_context(self) -> AsyncIterator[SystemRemovalContext]:
    """Context manager for safe system removal.

    This provides a convenient way to safely remove systems while
    ensuring ticks are paused and properly resumed.

    Example:
        async with manager.safe_removal_context() as ctx:
            await ctx.remove_system(MovementSystem)
            await ctx.remove_system(CombatSystem)

    Yields:
        SystemRemovalContext for performing removals
    """
    ctx = SystemRemovalContext(self)
    async with ctx:
        yield ctx

shutdown async

shutdown() -> None

Shut down all systems.

Source code in packages/maid-engine/src/maid_engine/core/ecs/system.py
async def shutdown(self) -> None:
    """Shut down all systems."""
    if not self._started:
        return

    # Copy list to avoid concurrent modification if systems unregister during shutdown
    systems = list(self._systems)

    # Shut down in reverse order
    for system in reversed(systems):
        try:
            await system.shutdown()
        except Exception:
            _logger.error(
                "Error shutting down system %s",
                getattr(system, "name", type(system).__name__),
                exc_info=True,
            )

    self._started = False

startup async

startup() -> None

Start all systems.

Source code in packages/maid-engine/src/maid_engine/core/ecs/system.py
async def startup(self) -> None:
    """Start all systems."""
    if self._started:
        return

    for system in self._systems:
        await system.startup()

    self._started = True

unregister

unregister(system_type: type[System]) -> System | None

Unregister a system by type.

Parameters:

Name Type Description Default
system_type type[System]

The type of system to unregister

required

Returns:

Type Description
System | None

The unregistered system or None if not found

Source code in packages/maid-engine/src/maid_engine/core/ecs/system.py
def unregister(self, system_type: type[System]) -> System | None:
    """Unregister a system by type.

    Args:
        system_type: The type of system to unregister

    Returns:
        The unregistered system or None if not found
    """
    system = self._systems_by_type.pop(system_type, None)
    if system:
        system_name = system_type.__name__
        self._systems.remove(system)

        # Clean up source pack tracking
        if system_name in self._system_sources:
            pack_name = self._system_sources.pop(system_name)
            if pack_name in self._systems_by_pack:
                self._systems_by_pack[pack_name].discard(system_name)
                if not self._systems_by_pack[pack_name]:
                    del self._systems_by_pack[pack_name]

        # Clean up dependencies
        self._system_dependencies.pop(system_name, None)

        # Rebuild the enabled systems cache
        self._rebuild_enabled_systems()

        _logger.debug(f"Unregistered system {system_name}")

    return system

unregister_systems_for_pack async

unregister_systems_for_pack(
    pack_name: str, check_dependencies: bool = True
) -> list[System]

Unregister all systems registered by a content pack.

This method safely removes all systems from a pack, calling shutdown on each system and respecting dependency ordering.

Parameters:

Name Type Description Default
pack_name str

The name of the content pack

required
check_dependencies bool

Whether to check for and warn about dependencies

True

Returns:

Type Description
list[System]

List of systems that were unregistered

Raises:

Type Description
SystemDependencyError

If check_dependencies is True and systems from other packs depend on systems being removed

Source code in packages/maid-engine/src/maid_engine/core/ecs/system.py
async def unregister_systems_for_pack(
    self,
    pack_name: str,
    check_dependencies: bool = True,
) -> list[System]:
    """Unregister all systems registered by a content pack.

    This method safely removes all systems from a pack, calling
    shutdown on each system and respecting dependency ordering.

    Args:
        pack_name: The name of the content pack
        check_dependencies: Whether to check for and warn about dependencies

    Returns:
        List of systems that were unregistered

    Raises:
        SystemDependencyError: If check_dependencies is True and systems
            from other packs depend on systems being removed
    """
    systems_to_remove = self.get_system_types_for_pack(pack_name)
    if not systems_to_remove:
        return []

    # Check dependencies from other packs
    if check_dependencies:
        for system_type in systems_to_remove:
            dependents = self.get_system_dependents(system_type)
            # Filter to only dependents from other packs
            external_dependents = [
                dep
                for dep in dependents
                if self.get_source_pack(dep) != pack_name
            ]
            if external_dependents:
                raise SystemDependencyError(
                    f"Cannot remove systems from pack '{pack_name}': "
                    f"{system_type.__name__} has dependents from other packs",
                    system_type=system_type,
                    dependents=external_dependents,
                )

    removed: list[System] = []

    # Remove in reverse priority order (highest priority first)
    systems_to_remove.sort(
        key=lambda t: self._systems_by_type.get(t, System).priority,
        reverse=True,
    )

    for system_type in systems_to_remove:
        system = self.unregister(system_type)
        if system:
            # Call shutdown on the removed system
            if self._started:
                try:
                    await system.shutdown()
                except Exception as e:
                    _logger.warning(
                        f"Error during shutdown of {system_type.__name__}: {e}"
                    )
            removed.append(system)

    _logger.info(
        f"Unregistered {len(removed)} systems from pack '{pack_name}'"
    )
    return removed

update async

update(delta: float) -> None

Update all enabled systems.

Parameters:

Name Type Description Default
delta float

Time since last update in seconds

required
Source code in packages/maid-engine/src/maid_engine/core/ecs/system.py
async def update(self, delta: float) -> None:
    """Update all enabled systems.

    Args:
        delta: Time since last update in seconds
    """
    # Check if paused for removal
    if self._removal_pause_event is not None:
        _logger.debug("Tick paused for system removal")
        await self._removal_pause_event.wait()

    # Mark tick as in progress
    self._tick_in_progress = True
    self._tick_complete_event.clear()

    try:
        # Copy the enabled systems list to avoid concurrent modification
        # during iteration. This list is pre-filtered to only contain
        # enabled systems, avoiding per-system enabled checks each tick.
        systems = list(self._enabled_systems)
        for system in systems:
            await system.update(delta)
    finally:
        # Mark tick as complete
        self._tick_in_progress = False
        self._tick_complete_event.set()

TagAddedEvent dataclass

TagAddedEvent(entity_id: UUID, tag: str)

Bases: Event

Emitted when a tag is added to an entity.

TagRemovedEvent dataclass

TagRemovedEvent(entity_id: UUID, tag: str)

Bases: Event

Emitted when a tag is removed from an entity.

TickEvent dataclass

TickEvent(tick_number: int, delta: float)

Bases: Event

Emitted at the start of each game tick.

TranslationCatalog

TranslationCatalog(locale: str)

A catalog of translations for a single locale.

The catalog stores translations indexed by their msgid (and optional context). It supports loading from PO and MO files, plural forms, and context-based lookups.

Attributes:

Name Type Description
locale

The locale code (e.g., 'en_US', 'de_DE').

entries dict[str, TranslationEntry]

Dictionary mapping keys to TranslationEntry objects.

Parameters:

Name Type Description Default
locale str

The locale code for this catalog.

required
Source code in packages/maid-engine/src/maid_engine/i18n/catalog.py
def __init__(self, locale: str) -> None:
    """Initialize a translation catalog.

    Args:
        locale: The locale code for this catalog.
    """
    self.locale = locale
    self.entries: dict[str, TranslationEntry] = {}
    self._nplurals: int = 2
    self._plural_expr: str = "(n != 1)"
    self._metadata: dict[str, str] = {}

metadata property

metadata: dict[str, str]

Return the catalog metadata.

nplurals property

nplurals: int

Return the number of plural forms for this catalog.

plural_expr property

plural_expr: str

Return the plural expression for this catalog.

get_entry

get_entry(
    msgid: str, context: str | None = None
) -> TranslationEntry | None

Get the translation entry for a msgid.

Parameters:

Name Type Description Default
msgid str

The message to look up.

required
context str | None

Optional context for disambiguation.

None

Returns:

Type Description
TranslationEntry | None

The TranslationEntry, or None if not found.

Source code in packages/maid-engine/src/maid_engine/i18n/catalog.py
def get_entry(self, msgid: str, context: str | None = None) -> TranslationEntry | None:
    """Get the translation entry for a msgid.

    Args:
        msgid: The message to look up.
        context: Optional context for disambiguation.

    Returns:
        The TranslationEntry, or None if not found.
    """
    key = self._make_key(msgid, context)
    return self.entries.get(key)

has_translation

has_translation(
    msgid: str,
    context: str | None = None,
    allow_fuzzy: bool = False,
) -> bool

Check if a translation exists for the given msgid.

Parameters:

Name Type Description Default
msgid str

The message to check.

required
context str | None

Optional context for disambiguation.

None
allow_fuzzy bool

If True, consider fuzzy translations as valid. Defaults to False. See module docstring for details on fuzzy translation behavior.

False

Returns:

Type Description
bool

True if a valid translation exists (non-fuzzy by default,

bool

or including fuzzy if allow_fuzzy=True).

Source code in packages/maid-engine/src/maid_engine/i18n/catalog.py
def has_translation(
    self,
    msgid: str,
    context: str | None = None,
    allow_fuzzy: bool = False,
) -> bool:
    """Check if a translation exists for the given msgid.

    Args:
        msgid: The message to check.
        context: Optional context for disambiguation.
        allow_fuzzy: If True, consider fuzzy translations as valid.
            Defaults to False. See module docstring for details on
            fuzzy translation behavior.

    Returns:
        True if a valid translation exists (non-fuzzy by default,
        or including fuzzy if allow_fuzzy=True).
    """
    key = self._make_key(msgid, context)
    entry = self.entries.get(key)
    if entry is None or not entry.is_translated:
        return False
    return not (entry.fuzzy and not allow_fuzzy)

load classmethod

load(
    path: str | Path, locale: str | None = None
) -> TranslationCatalog

Load a translation catalog from a file.

Supports both PO and MO file formats (detected by extension).

Parameters:

Name Type Description Default
path str | Path

Path to the translation file.

required
locale str | None

Optional locale override (defaults to filename).

None

Returns:

Type Description
TranslationCatalog

A TranslationCatalog instance.

Raises:

Type Description
FileNotFoundError

If the file doesn't exist.

ValueError

If the file format is not supported.

Source code in packages/maid-engine/src/maid_engine/i18n/catalog.py
@classmethod
def load(cls, path: str | Path, locale: str | None = None) -> TranslationCatalog:
    """Load a translation catalog from a file.

    Supports both PO and MO file formats (detected by extension).

    Args:
        path: Path to the translation file.
        locale: Optional locale override (defaults to filename).

    Returns:
        A TranslationCatalog instance.

    Raises:
        FileNotFoundError: If the file doesn't exist.
        ValueError: If the file format is not supported.
    """
    path = Path(path)

    if not path.exists():
        raise FileNotFoundError(f"Translation file not found: {path}")

    # Determine locale from filename if not provided
    if locale is None:
        locale = path.stem

    catalog = cls(locale)

    suffix = path.suffix.lower()
    if suffix == ".po":
        catalog._load_po(path)
    elif suffix == ".mo":
        catalog._load_mo(path)
    else:
        raise ValueError(f"Unsupported translation file format: {suffix}")

    return catalog

translate

translate(
    msgid: str,
    context: str | None = None,
    n: int | None = None,
    default: str | None = None,
    allow_fuzzy: bool = False,
) -> str

Translate a message.

Parameters:

Name Type Description Default
msgid str

The message to translate.

required
context str | None

Optional context for disambiguation.

None
n int | None

The number for plural selection (None for singular).

None
default str | None

Default value if translation not found (defaults to msgid).

None
allow_fuzzy bool

If True, use fuzzy (uncertain/unverified) translations when no verified translation exists. Defaults to False because fuzzy translations may be inaccurate. See module docstring for details on fuzzy translation behavior.

False

Returns:

Type Description
str

The translated string, or default/msgid if not found.

Source code in packages/maid-engine/src/maid_engine/i18n/catalog.py
def translate(
    self,
    msgid: str,
    context: str | None = None,
    n: int | None = None,
    default: str | None = None,
    allow_fuzzy: bool = False,
) -> str:
    """Translate a message.

    Args:
        msgid: The message to translate.
        context: Optional context for disambiguation.
        n: The number for plural selection (None for singular).
        default: Default value if translation not found (defaults to msgid).
        allow_fuzzy: If True, use fuzzy (uncertain/unverified) translations
            when no verified translation exists. Defaults to False because
            fuzzy translations may be inaccurate. See module docstring for
            details on fuzzy translation behavior.

    Returns:
        The translated string, or default/msgid if not found.
    """
    key = self._make_key(msgid, context)
    entry = self.entries.get(key)

    if entry is None or not entry.is_translated:
        return default if default is not None else msgid

    # Skip fuzzy translations unless explicitly allowed
    if entry.fuzzy and not allow_fuzzy:
        return default if default is not None else msgid

    return entry.get_translation(n, self._plural_expr)

TranslationEntry dataclass

TranslationEntry(
    msgid: str,
    msgstr: str = "",
    msgstr_plural: list[str] = list(),
    msgid_plural: str | None = None,
    context: str | None = None,
    fuzzy: bool = False,
)

A single translation entry.

Attributes:

Name Type Description
msgid str

The original message (source string).

msgstr str

The translated message (single string for singular).

msgstr_plural list[str]

List of plural forms (index 0 is singular).

msgid_plural str | None

The plural form of the source string.

context str | None

Optional context for disambiguation.

fuzzy bool

Whether this translation is marked as fuzzy/uncertain.

is_plural property

is_plural: bool

Return True if this is a plural entry.

is_translated property

is_translated: bool

Return True if this entry has a translation.

get_translation

get_translation(
    n: int | None = None, plural_expr: str = "(n != 1)"
) -> str

Get the translation for this entry.

Parameters:

Name Type Description Default
n int | None

The number for plural selection (None for singular).

None
plural_expr str

Expression to evaluate plural form index.

'(n != 1)'

Returns:

Type Description
str

The translated string, or empty string if not translated.

Source code in packages/maid-engine/src/maid_engine/i18n/catalog.py
def get_translation(self, n: int | None = None, plural_expr: str = "(n != 1)") -> str:
    """Get the translation for this entry.

    Args:
        n: The number for plural selection (None for singular).
        plural_expr: Expression to evaluate plural form index.

    Returns:
        The translated string, or empty string if not translated.
    """
    if n is not None and self.is_plural and self.msgstr_plural:
        # Plural form requested
        index = evaluate_plural(n, plural_expr)
        index = min(index, len(self.msgstr_plural) - 1)
        return self.msgstr_plural[index] if index < len(self.msgstr_plural) else ""
    return self.msgstr

Translator

Translator(
    default_locale: str = "en",
    fallback_locale: str | None = None,
    log_missing: bool = True,
    missing_handler: Callable[[str, str | None, str], None]
    | None = None,
    cache_size: int | None = None,
)

Translation service that manages catalogs and provides translation functions.

The Translator loads translation catalogs for multiple locales and provides methods for translating messages with support for: - Locale fallback chains - Plural forms - Context-based disambiguation - Missing translation logging - LRU caching for translation lookups

Example

translator = Translator(default_locale="en", fallback_locale="en") translator.load_directory("/path/to/locales") translator.translate("Hello, world!") 'Hallo, Welt!' # if current_locale is 'de'

Parameters:

Name Type Description Default
default_locale str

The default locale to use when none is set.

'en'
fallback_locale str | None

Locale to fall back to when translation is missing.

None
log_missing bool

Whether to log missing translations.

True
missing_handler Callable[[str, str | None, str], None] | None

Optional callback for missing translations. Called with (msgid, context, locale).

None
cache_size int | None

Maximum number of translation results to cache. Set to 0 to disable caching. Defaults to _DEFAULT_CACHE_SIZE (1000).

None
Source code in packages/maid-engine/src/maid_engine/i18n/translator.py
def __init__(
    self,
    default_locale: str = "en",
    fallback_locale: str | None = None,
    log_missing: bool = True,
    missing_handler: Callable[[str, str | None, str], None] | None = None,
    cache_size: int | None = None,
) -> None:
    """Initialize the translator.

    Args:
        default_locale: The default locale to use when none is set.
        fallback_locale: Locale to fall back to when translation is missing.
        log_missing: Whether to log missing translations.
        missing_handler: Optional callback for missing translations.
                        Called with (msgid, context, locale).
        cache_size: Maximum number of translation results to cache.
                   Set to 0 to disable caching. Defaults to _DEFAULT_CACHE_SIZE (1000).
    """
    self.default_locale = default_locale
    self.fallback_locale = fallback_locale or default_locale
    self.log_missing = log_missing
    self.missing_handler = missing_handler

    self._catalogs: dict[str, TranslationCatalog] = {}
    # Bounded set to prevent unbounded memory growth from unique missing keys.
    # When the set exceeds _MISSING_LOGGED_MAX_SIZE, oldest entries are evicted.
    # We use a dict as an ordered set (Python 3.7+ preserves insertion order).
    self._missing_logged: dict[tuple[str, str | None, str], None] = {}

    # LRU cache for translation results with bounded size to prevent memory growth.
    # Uses OrderedDict to maintain insertion order; oldest entries evicted when full.
    # Thread-safe via _cache_lock to prevent race conditions during cache access.
    # Cache key: (msgid, context, n, msgid_plural, locale)
    # Cache value: translated string
    self._cache: OrderedDict[tuple[str, str | None, int | None, str | None, str], str] = (
        OrderedDict()
    )
    self._cache_lock = threading.Lock()
    self._cache_max_size = cache_size if cache_size is not None else self._DEFAULT_CACHE_SIZE

available_locales property

available_locales: list[str]

Return list of available locale codes.

cache_max_size property

cache_max_size: int

Return the maximum cache size (0 means caching is disabled).

cache_size property

cache_size: int

Return the current number of cached translations.

clear_cache

clear_cache() -> None

Clear the translation cache.

This is automatically called when the locale changes via set_current_locale(), but can also be called manually when catalogs are reloaded or modified. Thread-safe.

Source code in packages/maid-engine/src/maid_engine/i18n/translator.py
def clear_cache(self) -> None:
    """Clear the translation cache.

    This is automatically called when the locale changes via set_current_locale(),
    but can also be called manually when catalogs are reloaded or modified.
    Thread-safe.
    """
    with self._cache_lock:
        self._cache.clear()

get_catalog

get_catalog(locale: str) -> TranslationCatalog | None

Get the catalog for a specific locale.

Parameters:

Name Type Description Default
locale str

The locale code.

required

Returns:

Type Description
TranslationCatalog | None

The TranslationCatalog, or None if not loaded.

Source code in packages/maid-engine/src/maid_engine/i18n/translator.py
def get_catalog(self, locale: str) -> TranslationCatalog | None:
    """Get the catalog for a specific locale.

    Args:
        locale: The locale code.

    Returns:
        The TranslationCatalog, or None if not loaded.
    """
    return self._catalogs.get(locale)

get_current_locale

get_current_locale() -> str

Get the current locale from context.

Returns:

Type Description
str

The current locale code.

Source code in packages/maid-engine/src/maid_engine/i18n/translator.py
def get_current_locale(self) -> str:
    """Get the current locale from context.

    Returns:
        The current locale code.
    """
    return current_locale.get()

gettext

gettext(msgid: str) -> str

Translate a message (GNU gettext compatible).

Parameters:

Name Type Description Default
msgid str

The message to translate.

required

Returns:

Type Description
str

The translated string.

Source code in packages/maid-engine/src/maid_engine/i18n/translator.py
def gettext(self, msgid: str) -> str:
    """Translate a message (GNU gettext compatible).

    Args:
        msgid: The message to translate.

    Returns:
        The translated string.
    """
    return self.translate(msgid)

load_catalog

load_catalog(
    path: str | Path, locale: str | None = None
) -> None

Load a translation catalog from a file.

Clears the translation cache since translations may have changed.

Parameters:

Name Type Description Default
path str | Path

Path to the PO or MO file.

required
locale str | None

Optional locale override (defaults to filename).

None
Source code in packages/maid-engine/src/maid_engine/i18n/translator.py
def load_catalog(self, path: str | Path, locale: str | None = None) -> None:
    """Load a translation catalog from a file.

    Clears the translation cache since translations may have changed.

    Args:
        path: Path to the PO or MO file.
        locale: Optional locale override (defaults to filename).
    """
    catalog = TranslationCatalog.load(path, locale)
    self._catalogs[catalog.locale] = catalog
    # Clear cache since translations may have changed
    self.clear_cache()
    logger.info(f"Loaded translation catalog: {catalog.locale} ({len(catalog)} entries)")

load_directory

load_directory(
    directory: str | Path,
    pattern: str = "*.po",
    recursive: bool = False,
) -> None

Load all translation catalogs from a directory.

Parameters:

Name Type Description Default
directory str | Path

Path to the directory containing translation files.

required
pattern str

Glob pattern for translation files.

'*.po'
recursive bool

Whether to search recursively.

False
Source code in packages/maid-engine/src/maid_engine/i18n/translator.py
def load_directory(
    self,
    directory: str | Path,
    pattern: str = "*.po",
    recursive: bool = False,
) -> None:
    """Load all translation catalogs from a directory.

    Args:
        directory: Path to the directory containing translation files.
        pattern: Glob pattern for translation files.
        recursive: Whether to search recursively.
    """
    directory = Path(directory)
    if not directory.exists():
        logger.warning(f"Locale directory does not exist: {directory}")
        return

    glob_method = directory.rglob if recursive else directory.glob

    for path in glob_method(pattern):
        try:
            self.load_catalog(path)
        except Exception as e:
            logger.error(f"Failed to load translation file {path}: {e}")

ngettext

ngettext(msgid: str, msgid_plural: str, n: int) -> str

Translate a message with plural forms.

Parameters:

Name Type Description Default
msgid str

The singular form of the message.

required
msgid_plural str

The plural form of the message.

required
n int

The number determining which form to use.

required

Returns:

Type Description
str

The translated string.

Source code in packages/maid-engine/src/maid_engine/i18n/translator.py
def ngettext(self, msgid: str, msgid_plural: str, n: int) -> str:
    """Translate a message with plural forms.

    Args:
        msgid: The singular form of the message.
        msgid_plural: The plural form of the message.
        n: The number determining which form to use.

    Returns:
        The translated string.
    """
    return self.translate(msgid, n=n, msgid_plural=msgid_plural)

npgettext

npgettext(
    context: str, msgid: str, msgid_plural: str, n: int
) -> str

Translate a message with context and plural forms.

Parameters:

Name Type Description Default
context str

The context for disambiguation.

required
msgid str

The singular form of the message.

required
msgid_plural str

The plural form of the message.

required
n int

The number determining which form to use.

required

Returns:

Type Description
str

The translated string.

Source code in packages/maid-engine/src/maid_engine/i18n/translator.py
def npgettext(self, context: str, msgid: str, msgid_plural: str, n: int) -> str:
    """Translate a message with context and plural forms.

    Args:
        context: The context for disambiguation.
        msgid: The singular form of the message.
        msgid_plural: The plural form of the message.
        n: The number determining which form to use.

    Returns:
        The translated string.
    """
    return self.translate(msgid, context=context, n=n, msgid_plural=msgid_plural)

pgettext

pgettext(context: str, msgid: str) -> str

Translate a message with context.

Parameters:

Name Type Description Default
context str

The context for disambiguation.

required
msgid str

The message to translate.

required

Returns:

Type Description
str

The translated string.

Source code in packages/maid-engine/src/maid_engine/i18n/translator.py
def pgettext(self, context: str, msgid: str) -> str:
    """Translate a message with context.

    Args:
        context: The context for disambiguation.
        msgid: The message to translate.

    Returns:
        The translated string.
    """
    return self.translate(msgid, context=context)

set_current_locale

set_current_locale(locale: str) -> None

Set the current locale in context.

Parameters:

Name Type Description Default
locale str

The locale code to set.

required
Source code in packages/maid-engine/src/maid_engine/i18n/translator.py
def set_current_locale(self, locale: str) -> None:
    """Set the current locale in context.

    Args:
        locale: The locale code to set.
    """
    current_locale.set(locale)

translate

translate(
    msgid: str,
    context: str | None = None,
    n: int | None = None,
    msgid_plural: str | None = None,
    locale: str | None = None,
    default: str | None = None,
) -> str

Translate a message.

Uses LRU caching for repeated translations. The cache is bounded to _cache_max_size entries to prevent unbounded memory growth. Thread-safe: cache access is protected by _cache_lock.

Parameters:

Name Type Description Default
msgid str

The message to translate.

required
context str | None

Optional context for disambiguation.

None
n int | None

The number for plural selection.

None
msgid_plural str | None

The plural form of the message (for fallback).

None
locale str | None

Optional locale override (uses current_locale if not provided).

None
default str | None

Default value if translation not found.

None

Returns:

Type Description
str

The translated string, or the original msgid if not found.

Source code in packages/maid-engine/src/maid_engine/i18n/translator.py
def translate(
    self,
    msgid: str,
    context: str | None = None,
    n: int | None = None,
    msgid_plural: str | None = None,
    locale: str | None = None,
    default: str | None = None,
) -> str:
    """Translate a message.

    Uses LRU caching for repeated translations. The cache is bounded to
    _cache_max_size entries to prevent unbounded memory growth.
    Thread-safe: cache access is protected by _cache_lock.

    Args:
        msgid: The message to translate.
        context: Optional context for disambiguation.
        n: The number for plural selection.
        msgid_plural: The plural form of the message (for fallback).
        locale: Optional locale override (uses current_locale if not provided).
        default: Default value if translation not found.

    Returns:
        The translated string, or the original msgid if not found.
    """
    locale = locale or self.get_current_locale()

    # Build cache key - includes all parameters that affect the result
    # Note: We don't include 'default' in the cache key because it's a fallback
    # value and caching with different defaults could return wrong results.
    # Instead, we cache the "canonical" result (with default=None) and apply
    # the default afterward if needed.
    cache_key = (msgid, context, n, msgid_plural, locale)

    # Check cache first (with lock for thread safety)
    if self._cache_max_size > 0:
        with self._cache_lock:
            if cache_key in self._cache:
                # Move to end (most recently used) for LRU behavior
                self._cache.move_to_end(cache_key)
                cached_result = self._cache[cache_key]
                # If cached result is the msgid (no translation found) and
                # a default was provided, return the default instead
                if cached_result == msgid and default is not None:
                    return default
                return cached_result

    # Perform the actual translation lookup
    result = self._translate_uncached(msgid, context, n, msgid_plural, locale)

    # Add to cache (with lock for thread safety)
    # Skip caching entirely if cache is disabled (max_size=0)
    if self._cache_max_size > 0:
        with self._cache_lock:
            # Check again in case another thread added it while we were translating
            if cache_key not in self._cache:
                # Evict oldest entry if at capacity
                if len(self._cache) >= self._cache_max_size:
                    self._cache.popitem(last=False)
                self._cache[cache_key] = result
            else:
                # Another thread beat us to it, use their result for consistency
                result = self._cache[cache_key]
                self._cache.move_to_end(cache_key)

    # Apply default if needed (after caching the canonical result)
    if result == msgid and default is not None:
        return default
    return result

World

World(settings: Settings | SettingsProxy)

Central game world state manager.

The World holds all game state including entities, rooms, and provides efficient queries for game logic.

Example

world = World(settings) await world.startup()

Create a player entity

player = world.create_entity() player.add(PositionComponent(room_id=starting_room.id)) player.add(HealthComponent(current=100, maximum=100))

Query entities in a room

for entity in world.entities_in_room(room_id): ...

Source code in packages/maid-engine/src/maid_engine/core/world.py
def __init__(self, settings: Settings | SettingsProxy) -> None:
    self._settings = settings
    self._entities = EntityManager()
    self._events = EventBus()
    self._entities.set_event_bus(self._events)
    self._systems = SystemManager(self)
    self._room_index = RoomIndex()
    self._rooms: dict[UUID, Any] = {}  # Room data storage
    self._areas: dict[UUID, Any] = {}  # Area data storage
    self._custom_data: dict[str, dict[str, Any]] = {}  # Namespace-isolated data storage
    self._tick_count = 0
    self._running = False
    self._engine: EngineServices | None = None  # Set via set_engine(); see engine property docs
    self._grid = GridManager(world=self)  # Grid coordinate system

engine property

engine: EngineServices | None

Get the engine services back-reference.

This back-reference is set by :meth:GameEngine.__init__ immediately after creating the World. It allows ECS systems and commands to reach engine-level resources (provider_registry, command_registry, document_store, content_packs, relay_to_external, etc.) that live outside the World itself.

Architectural note (known debt): The World is intentionally a content-agnostic state container; it has no knowledge of networking, persistence, or AI providers. However, many game systems need those capabilities. Rather than threading individual references into every system constructor (which would require changing the ContentPack.get_systems() signature and every content pack), the engine stores a single typed back-reference here so systems can access it via self.world.engine.

The reference is typed as :class:EngineServices -- a protocol that constrains the surface area to a defined set of properties and methods. This ensures systems cannot accidentally couple to arbitrary engine internals while the back-reference exists. A future refactor may replace this with direct dependency injection into systems.

The value is None only when a World is created outside of a GameEngine (e.g. in unit tests).

Returns:

Type Description
EngineServices | None

The engine services interface, or None if no engine is attached.

See Also

:class:~maid_engine.core.protocols.EngineServices -- the protocol defining the allowed engine surface area.

entities property

entities: EntityManager

Get the entity manager.

entity_count property

entity_count: int

Get total entity count in O(1) time.

events property

events: EventBus

Get the event bus.

grid property

grid: GridManager

Get the grid manager for coordinate-based room operations.

The GridManager provides spatial indexing and queries for rooms placed on a 3D coordinate grid. It supports: - O(1) coordinate lookups - Radius and rectangular region queries - A* pathfinding between rooms - Automatic exit creation between adjacent rooms

Example
Register a room at a coordinate

world.grid.register_room(room_id, GridCoord(5, 10))

Or use the convenience method

world.register_room_at_coord(room_id, GridCoord(5, 10))

Find a path between rooms

result = world.grid.find_path(from_room_id, to_room_id) if result.found: print(result.directions) # ["n", "e", "e"]

is_running property

is_running: bool

Check if world is running.

settings property

settings: Settings | SettingsProxy

Get settings.

systems property

systems: SystemManager

Get the system manager.

tick_count property

tick_count: int

Get current tick count.

add_entity

add_entity(entity: Entity) -> None

Add an existing entity to this world.

Source code in packages/maid-engine/src/maid_engine/core/world.py
def add_entity(self, entity: Entity) -> None:
    """Add an existing entity to this world."""
    self.adopt_entity(entity)

adopt_entity

adopt_entity(entity: Entity) -> None

Adopt entity from an external manager into this world.

Source code in packages/maid-engine/src/maid_engine/core/world.py
def adopt_entity(self, entity: Entity) -> None:
    """Adopt entity from an external manager into this world."""
    existing = self._entities.get(entity.id)
    if existing is not None:
        _logger.warning("Replacing existing entity during adopt: %s", entity.id)
        if not self.destroy_entity(entity.id):
            msg = f"Cannot adopt entity {entity.id}: failed to destroy existing entity"
            raise ValueError(msg)

    pos = entity.find_by_protocol(Positionable)
    if pos is not None and self.get_room(pos.room_id) is None:
        msg = f"Cannot adopt entity {entity.id}: room {pos.room_id} does not exist"
        raise ValueError(msg)

    self._entities.adopt(entity)
    if pos is not None:
        self._room_index.add(entity.id, pos.room_id)

all_areas

all_areas() -> Iterator[tuple[UUID, Any]]

Iterate over all registered areas.

Source code in packages/maid-engine/src/maid_engine/core/world.py
def all_areas(self) -> Iterator[tuple[UUID, Any]]:
    """Iterate over all registered areas."""
    yield from self._areas.items()

all_rooms

all_rooms() -> Iterator[tuple[UUID, Any]]

Iterate over all registered rooms.

Source code in packages/maid-engine/src/maid_engine/core/world.py
def all_rooms(self) -> Iterator[tuple[UUID, Any]]:
    """Iterate over all registered rooms."""
    yield from self._rooms.items()

clear async

clear() -> None

Clear all world state.

Source code in packages/maid-engine/src/maid_engine/core/world.py
async def clear(self) -> None:
    """Clear all world state."""
    if self._running:
        await self._systems.shutdown()
    self._entities.clear()
    self._room_index.clear()
    self._rooms.clear()
    self._areas.clear()
    self._events.clear()
    self._custom_data.clear()
    self._grid.clear()
    self._tick_count = 0
    self._running = False

count_in_room

count_in_room(room_id: UUID) -> int

Get count of entities in a room.

Source code in packages/maid-engine/src/maid_engine/core/world.py
def count_in_room(self, room_id: UUID) -> int:
    """Get count of entities in a room."""
    return self._room_index.count_in_room(room_id)

create_entity

create_entity(entity_id: UUID | None = None) -> Entity

Create a new entity.

Source code in packages/maid-engine/src/maid_engine/core/world.py
def create_entity(self, entity_id: UUID | None = None) -> Entity:
    """Create a new entity."""
    return self._entities.create(entity_id)

delete_data

delete_data(key: str) -> Any | None
delete_data(namespace: str, key: str) -> Any | None
delete_data(*args: Any) -> Any | None

Delete arbitrary data from the world.

Supports two call signatures for backward compatibility
  • delete_data(key) deletes from the "_global" namespace.
  • delete_data(namespace, key) deletes from the given namespace.

Parameters:

Name Type Description Default
*args Any

Either (key,) for global namespace or (namespace, key) for a specific namespace.

()

Returns:

Type Description
Any | None

The deleted value, or None if not found.

Source code in packages/maid-engine/src/maid_engine/core/world.py
def delete_data(self, *args: Any) -> Any | None:  # type: ignore[misc]
    """Delete arbitrary data from the world.

    Supports two call signatures for backward compatibility:
      - ``delete_data(key)`` deletes from the "_global" namespace.
      - ``delete_data(namespace, key)`` deletes from the given namespace.

    Args:
        *args: Either ``(key,)`` for global namespace or
            ``(namespace, key)`` for a specific namespace.

    Returns:
        The deleted value, or None if not found.
    """
    if len(args) == 1:
        namespace, key = self._GLOBAL_NAMESPACE, args[0]
    elif len(args) == 2:
        namespace, key = args[0], args[1]
    else:
        msg = f"delete_data() takes 1 or 2 positional arguments, got {len(args)}"
        raise TypeError(msg)

    ns_data = self._custom_data.get(namespace)
    if ns_data is None:
        return None
    return ns_data.pop(key, None)

destroy_entity

destroy_entity(entity_id: UUID) -> bool

Destroy an entity.

Source code in packages/maid-engine/src/maid_engine/core/world.py
def destroy_entity(self, entity_id: UUID) -> bool:
    """Destroy an entity."""
    # Clean up room index
    self._room_index.remove(entity_id)
    return self._entities.destroy(entity_id)

entities_in_room

entities_in_room(room_id: UUID) -> Iterator[Entity]

Get all entities in a specific room.

Source code in packages/maid-engine/src/maid_engine/core/world.py
def entities_in_room(self, room_id: UUID) -> Iterator[Entity]:
    """Get all entities in a specific room."""
    for entity_id in self._room_index.get_entities(room_id):
        entity = self._entities.get(entity_id)
        if entity:
            yield entity

entity_type_count

entity_type_count(entity_type: str) -> int

Get entity count for a specific tag in O(1) time.

Source code in packages/maid-engine/src/maid_engine/core/world.py
def entity_type_count(self, entity_type: str) -> int:
    """Get entity count for a specific tag in O(1) time."""
    return self._entities.count_with_tag(entity_type)

get_all_entities

get_all_entities() -> Iterator[Entity]

Iterate over all entities in this world.

Source code in packages/maid-engine/src/maid_engine/core/world.py
def get_all_entities(self) -> Iterator[Entity]:
    """Iterate over all entities in this world."""
    return self._entities.get_all_entities()

get_area

get_area(area_id: UUID) -> Any | None

Get area data by ID.

Source code in packages/maid-engine/src/maid_engine/core/world.py
def get_area(self, area_id: UUID) -> Any | None:
    """Get area data by ID."""
    return self._areas.get(area_id)

get_data

get_data(key: str) -> Any | None
get_data(namespace: str, key: str) -> Any | None
get_data(namespace: str, key: str, default: Any) -> Any
get_data(*args: Any) -> Any | None

Get arbitrary data from the world.

Supports two call signatures for backward compatibility
  • get_data(key) reads from the "_global" namespace.
  • get_data(namespace, key) reads from the given namespace.
  • get_data(namespace, key, default) reads with a default value.

Parameters:

Name Type Description Default
*args Any

Either (key,) for global namespace, (namespace, key) for a specific namespace, or (namespace, key, default) to provide a fallback value.

()

Returns:

Type Description
Any | None

The stored value, or default if not found.

Source code in packages/maid-engine/src/maid_engine/core/world.py
def get_data(self, *args: Any) -> Any | None:  # type: ignore[misc]
    """Get arbitrary data from the world.

    Supports two call signatures for backward compatibility:
      - ``get_data(key)`` reads from the "_global" namespace.
      - ``get_data(namespace, key)`` reads from the given namespace.
      - ``get_data(namespace, key, default)`` reads with a default value.

    Args:
        *args: Either ``(key,)`` for global namespace,
            ``(namespace, key)`` for a specific namespace, or
            ``(namespace, key, default)`` to provide a fallback value.

    Returns:
        The stored value, or default if not found.
    """
    if len(args) == 1:
        namespace, key, default = self._GLOBAL_NAMESPACE, args[0], None
    elif len(args) == 2:
        namespace, key, default = args[0], args[1], None
    elif len(args) == 3:
        namespace, key, default = args[0], args[1], args[2]
    else:
        msg = f"get_data() takes 1 to 3 positional arguments, got {len(args)}"
        raise TypeError(msg)

    ns_data = self._custom_data.get(namespace)
    if ns_data is None:
        return default
    return ns_data.get(key, default)

get_entity

get_entity(entity_id: UUID) -> Entity | None

Get an entity by ID.

Source code in packages/maid-engine/src/maid_engine/core/world.py
def get_entity(self, entity_id: UUID) -> Entity | None:
    """Get an entity by ID."""
    return self._entities.get(entity_id)

get_entity_room

get_entity_room(entity_id: UUID) -> UUID | None

Get the room ID an entity is in.

Source code in packages/maid-engine/src/maid_engine/core/world.py
def get_entity_room(self, entity_id: UUID) -> UUID | None:
    """Get the room ID an entity is in."""
    return self._room_index.get_room(entity_id)

get_namespace_data

get_namespace_data(namespace: str) -> dict[str, Any]

Get all data stored under a namespace.

Returns a copy of the namespace dict so callers cannot accidentally mutate the internal store.

Parameters:

Name Type Description Default
namespace str

The namespace to retrieve.

required

Returns:

Type Description
dict[str, Any]

A dict of all key-value pairs in the namespace, or an empty dict.

Source code in packages/maid-engine/src/maid_engine/core/world.py
def get_namespace_data(self, namespace: str) -> dict[str, Any]:
    """Get all data stored under a namespace.

    Returns a copy of the namespace dict so callers cannot
    accidentally mutate the internal store.

    Args:
        namespace: The namespace to retrieve.

    Returns:
        A dict of all key-value pairs in the namespace, or an empty dict.
    """
    return dict(self._custom_data.get(namespace, {}))

get_room

get_room(room_id: UUID) -> Any | None

Get room data by ID.

Source code in packages/maid-engine/src/maid_engine/core/world.py
def get_room(self, room_id: UUID) -> Any | None:
    """Get room data by ID."""
    return self._rooms.get(room_id)

increment_tick

increment_tick() -> int

Increment and return tick count.

Source code in packages/maid-engine/src/maid_engine/core/world.py
def increment_tick(self) -> int:
    """Increment and return tick count."""
    self._tick_count += 1
    return self._tick_count

move_entity

move_entity(entity_id: UUID, to_room_id: UUID) -> bool

Move an entity to a new room.

Updates both the room index and any PositionComponent on the entity. Emits RoomLeaveEvent and RoomEnterEvent.

Returns:

Type Description
bool

True if move was successful

Source code in packages/maid-engine/src/maid_engine/core/world.py
def move_entity(self, entity_id: UUID, to_room_id: UUID) -> bool:
    """Move an entity to a new room.

    Updates both the room index and any PositionComponent on the entity.
    Emits RoomLeaveEvent and RoomEnterEvent.

    Returns:
        True if move was successful
    """
    from maid_engine.core.events import RoomEnterEvent, RoomLeaveEvent

    entity = self._entities.get(entity_id)
    if not entity:
        return False

    # Get current room before moving
    from_room_id = self._room_index.get_room(entity_id)

    # Emit leave event if entity was in a room
    if from_room_id:
        self._events.emit_sync(
            RoomLeaveEvent(
                entity_id=entity_id,
                room_id=from_room_id,
                to_room_id=to_room_id,
            )
        )

    # Update room index
    self._room_index.move(entity_id, to_room_id)

    # Update PositionComponent if entity has one (O(1) cached lookup)
    pos = entity.find_by_protocol(Positionable)
    if pos is not None:
        pos.room_id = to_room_id

    # Emit enter event
    self._events.emit_sync(
        RoomEnterEvent(
            entity_id=entity_id,
            room_id=to_room_id,
            from_room_id=from_room_id,
        )
    )

    return True

place_entity_in_room

place_entity_in_room(
    entity_id: UUID, room_id: UUID
) -> None

Place an entity in a room (for initial placement).

Source code in packages/maid-engine/src/maid_engine/core/world.py
def place_entity_in_room(self, entity_id: UUID, room_id: UUID) -> None:
    """Place an entity in a room (for initial placement)."""
    self._room_index.add(entity_id, room_id)

    # Update PositionComponent if entity has one (O(1) cached lookup)
    entity = self._entities.get(entity_id)
    if entity:
        pos = entity.find_by_protocol(Positionable)
        if pos is not None:
            pos.room_id = room_id

register_area

register_area(area_id: UUID, area_data: Any) -> None

Register an area in the world.

Source code in packages/maid-engine/src/maid_engine/core/world.py
def register_area(self, area_id: UUID, area_data: Any) -> None:
    """Register an area in the world."""
    self._areas[area_id] = area_data

register_room

register_room(room_id: UUID, room_data: Any) -> None

Register a room in the world.

Source code in packages/maid-engine/src/maid_engine/core/world.py
def register_room(self, room_id: UUID, room_data: Any) -> None:
    """Register a room in the world."""
    self._rooms[room_id] = room_data

register_room_at_coord

register_room_at_coord(
    room_id: UUID,
    coord: GridCoord,
    movement_cost: float = 1.0,
    terrain_type: str | None = None,
    metadata: dict[str, Any] | None = None,
) -> GridRoom

Register a room at a specific grid coordinate.

This is a convenience method that delegates to grid.register_room(). If the GridManager has auto_create_exits enabled, this will automatically create bidirectional exits to adjacent registered rooms.

Parameters:

Name Type Description Default
room_id UUID

The UUID of the room entity.

required
coord GridCoord

The grid coordinate for the room.

required
movement_cost float

Cost to enter this room for pathfinding (default 1.0). Higher values make the room less desirable for pathfinding.

1.0
terrain_type str | None

Optional terrain type string (e.g., "forest", "water").

None
metadata dict[str, Any] | None

Optional dictionary of additional room metadata.

None

Returns:

Type Description
GridRoom

The created GridRoom object.

Raises:

Type Description
CoordinateOccupiedError

If a room already exists at the coordinate.

CoordinateBlockedError

If the coordinate is blocked.

Example

from maid_engine.core.grid import GridCoord

Register the starting room

starting_room = world.create_entity() world.register_room(starting_room.id, {"name": "Town Square"}) world.register_room_at_coord( starting_room.id, GridCoord(0, 0), terrain_type="cobblestone", )

Register adjacent rooms - exits are created automatically

inn = world.create_entity() world.register_room(inn.id, {"name": "The Rusty Lantern Inn"}) world.register_room_at_coord(inn.id, GridCoord(0, 1))

Source code in packages/maid-engine/src/maid_engine/core/world.py
def register_room_at_coord(
    self,
    room_id: UUID,
    coord: GridCoord,
    movement_cost: float = 1.0,
    terrain_type: str | None = None,
    metadata: dict[str, Any] | None = None,
) -> GridRoom:
    """Register a room at a specific grid coordinate.

    This is a convenience method that delegates to grid.register_room().
    If the GridManager has auto_create_exits enabled, this will automatically
    create bidirectional exits to adjacent registered rooms.

    Args:
        room_id: The UUID of the room entity.
        coord: The grid coordinate for the room.
        movement_cost: Cost to enter this room for pathfinding (default 1.0).
                      Higher values make the room less desirable for pathfinding.
        terrain_type: Optional terrain type string (e.g., "forest", "water").
        metadata: Optional dictionary of additional room metadata.

    Returns:
        The created GridRoom object.

    Raises:
        CoordinateOccupiedError: If a room already exists at the coordinate.
        CoordinateBlockedError: If the coordinate is blocked.

    Example:
        from maid_engine.core.grid import GridCoord

        # Register the starting room
        starting_room = world.create_entity()
        world.register_room(starting_room.id, {"name": "Town Square"})
        world.register_room_at_coord(
            starting_room.id,
            GridCoord(0, 0),
            terrain_type="cobblestone",
        )

        # Register adjacent rooms - exits are created automatically
        inn = world.create_entity()
        world.register_room(inn.id, {"name": "The Rusty Lantern Inn"})
        world.register_room_at_coord(inn.id, GridCoord(0, 1))
    """
    return self._grid.register_room(
        room_id,
        coord,
        movement_cost=movement_cost,
        terrain_type=terrain_type,
        metadata=metadata,
    )

remove_entity

remove_entity(entity_id: UUID) -> bool

Remove an entity from world tracking without destroy semantics.

Source code in packages/maid-engine/src/maid_engine/core/world.py
def remove_entity(self, entity_id: UUID) -> bool:
    """Remove an entity from world tracking without destroy semantics."""
    self._room_index.remove(entity_id)
    return self._entities.remove_entity(entity_id)

remove_entity_from_room

remove_entity_from_room(entity_id: UUID) -> UUID | None

Remove an entity from its current room without destroying it.

This is used when an item is picked up and moves to inventory.

Returns:

Type Description
UUID | None

The room ID the entity was in, or None if not in a room

Source code in packages/maid-engine/src/maid_engine/core/world.py
def remove_entity_from_room(self, entity_id: UUID) -> UUID | None:
    """Remove an entity from its current room without destroying it.

    This is used when an item is picked up and moves to inventory.

    Returns:
        The room ID the entity was in, or None if not in a room
    """
    return self._room_index.remove(entity_id)

room_count

room_count() -> int

Get count of registered rooms.

Source code in packages/maid-engine/src/maid_engine/core/world.py
def room_count(self) -> int:
    """Get count of registered rooms."""
    return len(self._rooms)

set_data

set_data(key: str, value: Any) -> None
set_data(namespace: str, key: str, value: Any) -> None
set_data(*args: Any) -> None

Store arbitrary data in the world.

Supports two call signatures for backward compatibility
  • set_data(key, value) stores under the "_global" namespace.
  • set_data(namespace, key, value) stores under the given namespace.

Using namespaces prevents key collisions between content packs.

Parameters:

Name Type Description Default
*args Any

Either (key, value) for global namespace or (namespace, key, value) for a specific namespace.

()
Source code in packages/maid-engine/src/maid_engine/core/world.py
def set_data(self, *args: Any) -> None:  # type: ignore[misc]
    """Store arbitrary data in the world.

    Supports two call signatures for backward compatibility:
      - ``set_data(key, value)`` stores under the "_global" namespace.
      - ``set_data(namespace, key, value)`` stores under the given namespace.

    Using namespaces prevents key collisions between content packs.

    Args:
        *args: Either ``(key, value)`` for global namespace or
            ``(namespace, key, value)`` for a specific namespace.
    """
    if len(args) == 2:
        namespace, key, value = self._GLOBAL_NAMESPACE, args[0], args[1]
    elif len(args) == 3:
        namespace, key, value = args[0], args[1], args[2]
    else:
        msg = f"set_data() takes 2 or 3 positional arguments, got {len(args)}"
        raise TypeError(msg)

    if namespace not in self._custom_data:
        self._custom_data[namespace] = {}
    self._custom_data[namespace][key] = value

set_engine

set_engine(engine: EngineServices) -> None

Attach engine services to this World.

This is called once by :meth:GameEngine.__init__ and should not be called by other code. It establishes the typed back-reference that systems use via :attr:engine.

The engine parameter is typed as :class:EngineServices (a protocol) rather than the concrete GameEngine class. This avoids a circular import between world.py and engine.py and ensures the World only depends on the narrow protocol interface.

Parameters:

Name Type Description Default
engine EngineServices

An object satisfying the :class:EngineServices protocol (in practice, the GameEngine that owns this World).

required

Raises:

Type Description
RuntimeError

If an engine is already attached.

Source code in packages/maid-engine/src/maid_engine/core/world.py
def set_engine(self, engine: EngineServices) -> None:
    """Attach engine services to this World.

    This is called once by :meth:`GameEngine.__init__` and should not be
    called by other code.  It establishes the typed back-reference that
    systems use via :attr:`engine`.

    The ``engine`` parameter is typed as :class:`EngineServices` (a
    protocol) rather than the concrete ``GameEngine`` class.  This avoids
    a circular import between ``world.py`` and ``engine.py`` and ensures
    the World only depends on the narrow protocol interface.

    Args:
        engine: An object satisfying the :class:`EngineServices` protocol
            (in practice, the ``GameEngine`` that owns this World).

    Raises:
        RuntimeError: If an engine is already attached.
    """
    if self._engine is not None:
        raise RuntimeError(
            "World already has an engine attached. "
            "A World instance should only be owned by one GameEngine."
        )
    self._engine = engine

shutdown async

shutdown() -> None

Shut down the world.

Source code in packages/maid-engine/src/maid_engine/core/world.py
async def shutdown(self) -> None:
    """Shut down the world."""
    if not self._running:
        return

    await self._systems.shutdown()
    self._running = False

startup async

startup() -> None

Start up the world.

Source code in packages/maid-engine/src/maid_engine/core/world.py
async def startup(self) -> None:
    """Start up the world."""
    if self._running:
        return

    await self._systems.startup()
    self._running = True

tick async

tick(delta: float) -> None

Process a single game tick.

Parameters:

Name Type Description Default
delta float

Time since last tick in seconds

required
Source code in packages/maid-engine/src/maid_engine/core/world.py
async def tick(self, delta: float) -> None:
    """Process a single game tick.

    Args:
        delta: Time since last tick in seconds
    """
    self.increment_tick()

    # Process any pending events from sync emitters
    await self._events.process_pending()

    # Update all systems
    await self._systems.update(delta)

    # Process any events generated during system updates
    await self._events.process_pending()

unregister_area

unregister_area(area_id: UUID) -> Any | None

Unregister an area from the world.

Source code in packages/maid-engine/src/maid_engine/core/world.py
def unregister_area(self, area_id: UUID) -> Any | None:
    """Unregister an area from the world."""
    return self._areas.pop(area_id, None)

unregister_room

unregister_room(room_id: UUID) -> Any | None

Unregister a room from the world.

Source code in packages/maid-engine/src/maid_engine/core/world.py
def unregister_room(self, room_id: UUID) -> Any | None:
    """Unregister a room from the world."""
    return self._rooms.pop(room_id, None)

discover_content_packs

discover_content_packs(
    search_paths: list[Path] | None = None,
    use_entry_points: bool = True,
) -> list[ContentPack]

Discover available content packs.

Content packs are discovered from: 1. Python entry points (maid.content_packs) - installed packages 2. Local directories containing manifest.toml files

Parameters:

Name Type Description Default
search_paths list[Path] | None

Additional directories to search for packs

None
use_entry_points bool

Whether to search entry points

True

Returns:

Type Description
list[ContentPack]

List of discovered ContentPack instances

Source code in packages/maid-engine/src/maid_engine/plugins/loader.py
def discover_content_packs(
    search_paths: list[Path] | None = None,
    use_entry_points: bool = True,
) -> list[ContentPack]:
    """Discover available content packs.

    Content packs are discovered from:
    1. Python entry points (maid.content_packs) - installed packages
    2. Local directories containing manifest.toml files

    Args:
        search_paths: Additional directories to search for packs
        use_entry_points: Whether to search entry points

    Returns:
        List of discovered ContentPack instances
    """
    packs: list[ContentPack] = []

    # Discover from entry points
    if use_entry_points:
        packs.extend(_discover_from_entry_points())

    # Discover from search paths
    if search_paths:
        for path in search_paths:
            if path.exists() and path.is_dir():
                packs.extend(_discover_from_directory(path))

    return packs

get_translator

get_translator() -> Translator | None

Get the global translator instance.

Returns:

Type Description
Translator | None

The global Translator for the current context, or None if not set.

Source code in packages/maid-engine/src/maid_engine/i18n/translator.py
def get_translator() -> Translator | None:
    """Get the global translator instance.

    Returns:
        The global Translator for the current context, or None if not set.
    """
    return _translator.get()

set_translator

set_translator(translator: Translator | None) -> None

Set the global translator instance.

Parameters:

Name Type Description Default
translator Translator | None

The Translator to use globally, or None to clear.

required
Source code in packages/maid-engine/src/maid_engine/i18n/translator.py
def set_translator(translator: Translator | None) -> None:
    """Set the global translator instance.

    Args:
        translator: The Translator to use globally, or None to clear.
    """
    _translator.set(translator)