Skip to content

Performance Optimization

This guide covers techniques for optimizing MAID server performance, including profiling, memory management, and best practices.

Profiling Tools

MAID includes built-in profiling capabilities:

CLI Profiling

# Profile memory and tick performance
maid dev profile --types=memory,tick --duration=60

# Save report to file
maid dev profile -t memory,tick -d 60 -o report.html

# JSON output for CI/CD
maid dev profile -t all --json

Memory Snapshots

# Take a memory snapshot
maid dev memory-snapshot

# Save to file with more detail
maid dev memory-snapshot -o snapshot.json --depth 15 --top 50

Programmatic Profiling

from maid_engine.profiling import ProfileManager, ProfileType

manager = ProfileManager(enabled=True)

# Start profiling session
session = await manager.start_session(
    ProfileType.TICK | ProfileType.MEMORY,
    duration=60.0,
)

# ... run game ...

# Stop and get results
results = await manager.stop_session(session.session_id)
print(results)

Tick Performance

Understanding Tick Budget

At 4 ticks per second, each tick has ~250ms budget:

Tick Budget: 250ms
├── System Updates: ~150ms (target)
├── Event Processing: ~50ms (target)
├── Network I/O: ~30ms (target)
└── Buffer: ~20ms (safety margin)

Monitoring Tick Time

import time
from maid_engine.core.world import World

class TickMonitor:
    def __init__(self, world: World):
        self._world = world
        self._tick_times: list[float] = []

    async def tick_wrapper(self, delta: float) -> None:
        start = time.perf_counter()
        await self._world.tick(delta)
        elapsed = time.perf_counter() - start

        self._tick_times.append(elapsed)

        if elapsed > 0.25:  # Over budget
            logger.warning(f"Tick took {elapsed*1000:.1f}ms (budget: 250ms)")

    def get_stats(self) -> dict:
        if not self._tick_times:
            return {}

        return {
            "avg_tick_ms": sum(self._tick_times) / len(self._tick_times) * 1000,
            "max_tick_ms": max(self._tick_times) * 1000,
            "min_tick_ms": min(self._tick_times) * 1000,
            "over_budget_count": sum(1 for t in self._tick_times if t > 0.25),
        }

Optimizing System Updates

class OptimizedSystem(System):
    priority = 50

    def __init__(self, world: World):
        super().__init__(world)
        # Cache frequently used queries
        self._cached_entities: list[Entity] = []
        self._cache_dirty = True

    async def startup(self) -> None:
        # Subscribe to invalidate cache on entity changes
        self.events.subscribe(EntityCreatedEvent, self._invalidate_cache)
        self.events.subscribe(EntityDestroyedEvent, self._invalidate_cache)
        self.events.subscribe(ComponentAddedEvent, self._invalidate_cache)
        self.events.subscribe(ComponentRemovedEvent, self._invalidate_cache)

    async def _invalidate_cache(self, event) -> None:
        self._cache_dirty = True

    def _refresh_cache(self) -> None:
        if self._cache_dirty:
            self._cached_entities = list(
                self.entities.with_components(HealthComponent, CombatComponent)
            )
            self._cache_dirty = False

    async def update(self, delta: float) -> None:
        self._refresh_cache()

        # Use cached list instead of querying every tick
        for entity in self._cached_entities:
            await self._process_entity(entity, delta)

Entity Queries

Efficient Queries

# Good - single query, iterate once
async def update(self, delta: float) -> None:
    for entity in self.entities.with_components(HealthComponent):
        health = entity.get(HealthComponent)
        if health.current < health.maximum:
            health.current += delta

# Bad - multiple queries
async def update(self, delta: float) -> None:
    for entity in self.entities.all():
        if entity.has(HealthComponent):  # Second query
            health = entity.get(HealthComponent)
            if health.current < health.maximum:
                health.current += delta

Batch Processing

# Process entities in batches to avoid long-running loops
BATCH_SIZE = 100

async def update(self, delta: float) -> None:
    entities = list(self.entities.with_components(AIComponent))

    for i in range(0, len(entities), BATCH_SIZE):
        batch = entities[i:i + BATCH_SIZE]
        for entity in batch:
            await self._process_ai(entity, delta)

        # Yield to event loop between batches
        await asyncio.sleep(0)

Index Optimization

# Use tags for fast filtering
class OptimizedWorld:
    def get_players_in_room(self, room_id: UUID) -> list[Entity]:
        """Get players in a room efficiently."""
        # Fast: tag query + room check
        return [
            e for e in self.entities.with_tag("player")
            if e.get(PositionComponent).room_id == room_id
        ]

        # Slow: check all entities
        # return [
        #     e for e in self.entities.all()
        #     if e.has_tag("player") and e.get(PositionComponent).room_id == room_id
        # ]

Memory Management

Component Memory

# Good - minimal component data
class HealthComponent(Component):
    current: int
    maximum: int

# Bad - unnecessary data
class HealthComponent(Component):
    current: int
    maximum: int
    history: list[int] = []  # Grows unbounded!
    last_damage_source: str = ""
    created_at: datetime = None

Entity Cleanup

class CleanupSystem(System):
    """Periodically clean up dead entities."""
    priority = 200  # Run late

    async def update(self, delta: float) -> None:
        to_destroy = []

        for entity in self.entities.with_tag("dead"):
            # Check if death animation/effects are done
            if entity.try_get(DeathTimerComponent) is None:
                to_destroy.append(entity.id)

        for entity_id in to_destroy:
            self.entities.destroy(entity_id)

Event Cleanup

# Avoid holding references in event handlers
async def bad_handler(event: DamageEvent) -> None:
    # Bad - holds reference to entity
    self._damaged_entities.append(
        world.get_entity(event.target_id)
    )

async def good_handler(event: DamageEvent) -> None:
    # Good - stores only ID
    self._damaged_entity_ids.append(event.target_id)

Memory Profiling

from maid_engine.profiling import MemoryCollector

collector = MemoryCollector(
    trace_depth=10,
    top_allocations_count=20,
)

collector.start()

# Run some game ticks
for _ in range(100):
    await world.tick(0.25)

snapshot = collector.take_snapshot()
collector.stop()

print(f"Total allocated: {snapshot.total_allocated / 1024 / 1024:.1f} MB")
print(f"By module:")
for module, size in sorted(snapshot.by_module.items(), key=lambda x: -x[1])[:10]:
    print(f"  {module}: {size / 1024:.1f} KB")

Network Optimization

Message Batching

class BatchedMessageSender:
    """Batch messages to reduce network calls."""

    def __init__(self, session: Session, flush_interval: float = 0.1):
        self._session = session
        self._buffer: list[str] = []
        self._flush_interval = flush_interval
        self._last_flush = time.time()

    def queue(self, message: str) -> None:
        self._buffer.append(message)

    async def flush_if_needed(self) -> None:
        now = time.time()
        if self._buffer and (now - self._last_flush) >= self._flush_interval:
            await self._session.send("\n".join(self._buffer))
            self._buffer.clear()
            self._last_flush = now

Compression

import zlib

class CompressedSession:
    """Session with compression for large messages."""

    COMPRESSION_THRESHOLD = 1024  # Bytes

    async def send(self, message: str) -> None:
        data = message.encode()

        if len(data) > self.COMPRESSION_THRESHOLD:
            # Compress large messages
            compressed = zlib.compress(data)
            await self._raw_send(compressed, compressed=True)
        else:
            await self._raw_send(data, compressed=False)

AI Performance

Rate Limiting

import asyncio
from collections import deque

class RateLimitedAI:
    """Rate-limited AI calls."""

    def __init__(self, calls_per_second: float = 2.0):
        self._min_interval = 1.0 / calls_per_second
        self._last_call = 0.0
        self._lock = asyncio.Lock()

    async def complete(self, messages: list) -> str:
        async with self._lock:
            # Enforce rate limit
            elapsed = time.time() - self._last_call
            if elapsed < self._min_interval:
                await asyncio.sleep(self._min_interval - elapsed)

            self._last_call = time.time()
            return await self._provider.complete(messages)

Response Caching

from functools import lru_cache
import hashlib

class CachedAI:
    """Cache AI responses for identical prompts."""

    def __init__(self, provider, cache_size: int = 100):
        self._provider = provider
        self._cache: dict[str, str] = {}
        self._cache_size = cache_size

    def _cache_key(self, messages: list) -> str:
        content = str([(m.role, m.content) for m in messages])
        return hashlib.md5(content.encode()).hexdigest()

    async def complete(self, messages: list) -> str:
        key = self._cache_key(messages)

        if key in self._cache:
            return self._cache[key]

        result = await self._provider.complete(messages)

        # LRU eviction
        if len(self._cache) >= self._cache_size:
            oldest = next(iter(self._cache))
            del self._cache[oldest]

        self._cache[key] = result.content
        return result.content

Multi-World Optimization

World Tick Scheduling

async def optimized_tick_all(manager: WorldManager, delta: float) -> None:
    """Tick worlds with priority scheduling."""
    worlds = manager.list_worlds(include_private=True)

    # Sort by player count (busy worlds first)
    worlds.sort(
        key=lambda w: count_players(w.world),
        reverse=True,
    )

    # Tick high-priority worlds
    high_priority = [w for w in worlds if count_players(w.world) > 0]
    for world in high_priority:
        await world.world.tick(delta)

    # Tick empty worlds less frequently
    low_priority = [w for w in worlds if count_players(w.world) == 0]
    if should_tick_empty_worlds():
        for world in low_priority:
            await world.world.tick(delta)

Instance Pooling

class DungeonPool:
    """Pool dungeon instances for reuse."""

    def __init__(self, manager: WorldManager, pool_size: int = 5):
        self._manager = manager
        self._pool_size = pool_size
        self._available: deque[str] = deque()
        self._in_use: set[str] = set()

    async def initialize(self) -> None:
        """Pre-create dungeon instances."""
        for i in range(self._pool_size):
            world_id = f"dungeon-pool-{i}"
            await self._create_instance(world_id)
            self._available.append(world_id)

    async def acquire(self) -> str:
        """Get a dungeon instance from the pool."""
        if self._available:
            world_id = self._available.popleft()
        else:
            # Pool exhausted, create new instance
            world_id = f"dungeon-dynamic-{uuid4()}"
            await self._create_instance(world_id)

        self._in_use.add(world_id)
        return world_id

    async def release(self, world_id: str) -> None:
        """Return a dungeon instance to the pool."""
        self._in_use.discard(world_id)

        # Reset and return to pool
        await self._reset_instance(world_id)
        self._available.append(world_id)

Best Practices Summary

Do

  • Cache entity queries when possible
  • Use tags for fast entity categorization
  • Batch network messages
  • Rate limit AI calls
  • Clean up dead entities promptly
  • Profile before optimizing
  • Use async operations for I/O

Don't

  • Query entities multiple times per tick
  • Store growing lists in components
  • Make synchronous I/O calls in update()
  • Create new objects every tick
  • Hold references to destroyed entities
  • Ignore memory growth
  • Optimize prematurely

Monitoring

Health Checks

class HealthMonitor:
    """Monitor server health metrics."""

    def __init__(self, world: World):
        self._world = world
        self._tick_count = 0
        self._start_time = time.time()

    def get_health(self) -> dict:
        uptime = time.time() - self._start_time
        return {
            "uptime_seconds": uptime,
            "tick_count": self._tick_count,
            "ticks_per_second": self._tick_count / uptime if uptime > 0 else 0,
            "entity_count": self._world.entities.count(),
            "player_count": len(list(self._world.entities.with_tag("player"))),
            "memory_mb": get_process_memory_mb(),
        }

Alerts

async def check_performance_alerts(monitor: HealthMonitor) -> list[str]:
    """Check for performance issues."""
    alerts = []
    health = monitor.get_health()

    if health["ticks_per_second"] < 3.5:  # Below 4 TPS target
        alerts.append(f"Low tick rate: {health['ticks_per_second']:.1f} TPS")

    if health["memory_mb"] > 1024:  # Over 1GB
        alerts.append(f"High memory usage: {health['memory_mb']:.0f} MB")

    if health["entity_count"] > 10000:  # Many entities
        alerts.append(f"High entity count: {health['entity_count']}")

    return alerts

Performance Benchmarks

MAID includes performance benchmark tests in packages/maid-engine/tests/benchmarks/ that validate documented performance claims:

Operation Target
Session authentication lookup < 0.1ms
Command execution latency < 2ms
Event dispatch latency < 1ms
Room lookup latency < 0.5ms
Entity creation < 1ms
Entity query (1000 entities) < 5ms

Running Benchmarks

# Run all benchmarks
uv run pytest packages/maid-engine/tests/benchmarks/ -v -m benchmark

# Skip benchmarks in regular test runs
uv run pytest packages/maid-engine/tests/ -m "not benchmark"

Future Enhancement: CI Benchmark Enforcement

Currently, performance benchmarks are available for manual execution but are not automatically enforced in CI pipelines. This is tracked as technical debt. Future enhancements should include:

  • Automated benchmark runs on PRs that modify performance-critical code paths
  • Regression detection with configurable thresholds
  • Historical performance tracking and trend analysis
  • Alerting when benchmarks exceed target thresholds

See GitHub Issue Tracking for updates on CI benchmark integration.

Next Steps