Performance Optimization¶
This guide covers techniques for optimizing MAID server performance, including profiling, memory management, and best practices.
Profiling Tools¶
MAID includes built-in profiling capabilities:
CLI Profiling¶
# Profile memory and tick performance
maid dev profile --types=memory,tick --duration=60
# Save report to file
maid dev profile -t memory,tick -d 60 -o report.html
# JSON output for CI/CD
maid dev profile -t all --json
Memory Snapshots¶
# Take a memory snapshot
maid dev memory-snapshot
# Save to file with more detail
maid dev memory-snapshot -o snapshot.json --depth 15 --top 50
Programmatic Profiling¶
from maid_engine.profiling import ProfileManager, ProfileType
manager = ProfileManager(enabled=True)
# Start profiling session
session = await manager.start_session(
ProfileType.TICK | ProfileType.MEMORY,
duration=60.0,
)
# ... run game ...
# Stop and get results
results = await manager.stop_session(session.session_id)
print(results)
Tick Performance¶
Understanding Tick Budget¶
At 4 ticks per second, each tick has ~250ms budget:
Tick Budget: 250ms
├── System Updates: ~150ms (target)
├── Event Processing: ~50ms (target)
├── Network I/O: ~30ms (target)
└── Buffer: ~20ms (safety margin)
Monitoring Tick Time¶
import time
from maid_engine.core.world import World
class TickMonitor:
def __init__(self, world: World):
self._world = world
self._tick_times: list[float] = []
async def tick_wrapper(self, delta: float) -> None:
start = time.perf_counter()
await self._world.tick(delta)
elapsed = time.perf_counter() - start
self._tick_times.append(elapsed)
if elapsed > 0.25: # Over budget
logger.warning(f"Tick took {elapsed*1000:.1f}ms (budget: 250ms)")
def get_stats(self) -> dict:
if not self._tick_times:
return {}
return {
"avg_tick_ms": sum(self._tick_times) / len(self._tick_times) * 1000,
"max_tick_ms": max(self._tick_times) * 1000,
"min_tick_ms": min(self._tick_times) * 1000,
"over_budget_count": sum(1 for t in self._tick_times if t > 0.25),
}
Optimizing System Updates¶
class OptimizedSystem(System):
priority = 50
def __init__(self, world: World):
super().__init__(world)
# Cache frequently used queries
self._cached_entities: list[Entity] = []
self._cache_dirty = True
async def startup(self) -> None:
# Subscribe to invalidate cache on entity changes
self.events.subscribe(EntityCreatedEvent, self._invalidate_cache)
self.events.subscribe(EntityDestroyedEvent, self._invalidate_cache)
self.events.subscribe(ComponentAddedEvent, self._invalidate_cache)
self.events.subscribe(ComponentRemovedEvent, self._invalidate_cache)
async def _invalidate_cache(self, event) -> None:
self._cache_dirty = True
def _refresh_cache(self) -> None:
if self._cache_dirty:
self._cached_entities = list(
self.entities.with_components(HealthComponent, CombatComponent)
)
self._cache_dirty = False
async def update(self, delta: float) -> None:
self._refresh_cache()
# Use cached list instead of querying every tick
for entity in self._cached_entities:
await self._process_entity(entity, delta)
Entity Queries¶
Efficient Queries¶
# Good - single query, iterate once
async def update(self, delta: float) -> None:
for entity in self.entities.with_components(HealthComponent):
health = entity.get(HealthComponent)
if health.current < health.maximum:
health.current += delta
# Bad - multiple queries
async def update(self, delta: float) -> None:
for entity in self.entities.all():
if entity.has(HealthComponent): # Second query
health = entity.get(HealthComponent)
if health.current < health.maximum:
health.current += delta
Batch Processing¶
# Process entities in batches to avoid long-running loops
BATCH_SIZE = 100
async def update(self, delta: float) -> None:
entities = list(self.entities.with_components(AIComponent))
for i in range(0, len(entities), BATCH_SIZE):
batch = entities[i:i + BATCH_SIZE]
for entity in batch:
await self._process_ai(entity, delta)
# Yield to event loop between batches
await asyncio.sleep(0)
Index Optimization¶
# Use tags for fast filtering
class OptimizedWorld:
def get_players_in_room(self, room_id: UUID) -> list[Entity]:
"""Get players in a room efficiently."""
# Fast: tag query + room check
return [
e for e in self.entities.with_tag("player")
if e.get(PositionComponent).room_id == room_id
]
# Slow: check all entities
# return [
# e for e in self.entities.all()
# if e.has_tag("player") and e.get(PositionComponent).room_id == room_id
# ]
Memory Management¶
Component Memory¶
# Good - minimal component data
class HealthComponent(Component):
current: int
maximum: int
# Bad - unnecessary data
class HealthComponent(Component):
current: int
maximum: int
history: list[int] = [] # Grows unbounded!
last_damage_source: str = ""
created_at: datetime = None
Entity Cleanup¶
class CleanupSystem(System):
"""Periodically clean up dead entities."""
priority = 200 # Run late
async def update(self, delta: float) -> None:
to_destroy = []
for entity in self.entities.with_tag("dead"):
# Check if death animation/effects are done
if entity.try_get(DeathTimerComponent) is None:
to_destroy.append(entity.id)
for entity_id in to_destroy:
self.entities.destroy(entity_id)
Event Cleanup¶
# Avoid holding references in event handlers
async def bad_handler(event: DamageEvent) -> None:
# Bad - holds reference to entity
self._damaged_entities.append(
world.get_entity(event.target_id)
)
async def good_handler(event: DamageEvent) -> None:
# Good - stores only ID
self._damaged_entity_ids.append(event.target_id)
Memory Profiling¶
from maid_engine.profiling import MemoryCollector
collector = MemoryCollector(
trace_depth=10,
top_allocations_count=20,
)
collector.start()
# Run some game ticks
for _ in range(100):
await world.tick(0.25)
snapshot = collector.take_snapshot()
collector.stop()
print(f"Total allocated: {snapshot.total_allocated / 1024 / 1024:.1f} MB")
print(f"By module:")
for module, size in sorted(snapshot.by_module.items(), key=lambda x: -x[1])[:10]:
print(f" {module}: {size / 1024:.1f} KB")
Network Optimization¶
Message Batching¶
class BatchedMessageSender:
"""Batch messages to reduce network calls."""
def __init__(self, session: Session, flush_interval: float = 0.1):
self._session = session
self._buffer: list[str] = []
self._flush_interval = flush_interval
self._last_flush = time.time()
def queue(self, message: str) -> None:
self._buffer.append(message)
async def flush_if_needed(self) -> None:
now = time.time()
if self._buffer and (now - self._last_flush) >= self._flush_interval:
await self._session.send("\n".join(self._buffer))
self._buffer.clear()
self._last_flush = now
Compression¶
import zlib
class CompressedSession:
"""Session with compression for large messages."""
COMPRESSION_THRESHOLD = 1024 # Bytes
async def send(self, message: str) -> None:
data = message.encode()
if len(data) > self.COMPRESSION_THRESHOLD:
# Compress large messages
compressed = zlib.compress(data)
await self._raw_send(compressed, compressed=True)
else:
await self._raw_send(data, compressed=False)
AI Performance¶
Rate Limiting¶
import asyncio
from collections import deque
class RateLimitedAI:
"""Rate-limited AI calls."""
def __init__(self, calls_per_second: float = 2.0):
self._min_interval = 1.0 / calls_per_second
self._last_call = 0.0
self._lock = asyncio.Lock()
async def complete(self, messages: list) -> str:
async with self._lock:
# Enforce rate limit
elapsed = time.time() - self._last_call
if elapsed < self._min_interval:
await asyncio.sleep(self._min_interval - elapsed)
self._last_call = time.time()
return await self._provider.complete(messages)
Response Caching¶
from functools import lru_cache
import hashlib
class CachedAI:
"""Cache AI responses for identical prompts."""
def __init__(self, provider, cache_size: int = 100):
self._provider = provider
self._cache: dict[str, str] = {}
self._cache_size = cache_size
def _cache_key(self, messages: list) -> str:
content = str([(m.role, m.content) for m in messages])
return hashlib.md5(content.encode()).hexdigest()
async def complete(self, messages: list) -> str:
key = self._cache_key(messages)
if key in self._cache:
return self._cache[key]
result = await self._provider.complete(messages)
# LRU eviction
if len(self._cache) >= self._cache_size:
oldest = next(iter(self._cache))
del self._cache[oldest]
self._cache[key] = result.content
return result.content
Multi-World Optimization¶
World Tick Scheduling¶
async def optimized_tick_all(manager: WorldManager, delta: float) -> None:
"""Tick worlds with priority scheduling."""
worlds = manager.list_worlds(include_private=True)
# Sort by player count (busy worlds first)
worlds.sort(
key=lambda w: count_players(w.world),
reverse=True,
)
# Tick high-priority worlds
high_priority = [w for w in worlds if count_players(w.world) > 0]
for world in high_priority:
await world.world.tick(delta)
# Tick empty worlds less frequently
low_priority = [w for w in worlds if count_players(w.world) == 0]
if should_tick_empty_worlds():
for world in low_priority:
await world.world.tick(delta)
Instance Pooling¶
class DungeonPool:
"""Pool dungeon instances for reuse."""
def __init__(self, manager: WorldManager, pool_size: int = 5):
self._manager = manager
self._pool_size = pool_size
self._available: deque[str] = deque()
self._in_use: set[str] = set()
async def initialize(self) -> None:
"""Pre-create dungeon instances."""
for i in range(self._pool_size):
world_id = f"dungeon-pool-{i}"
await self._create_instance(world_id)
self._available.append(world_id)
async def acquire(self) -> str:
"""Get a dungeon instance from the pool."""
if self._available:
world_id = self._available.popleft()
else:
# Pool exhausted, create new instance
world_id = f"dungeon-dynamic-{uuid4()}"
await self._create_instance(world_id)
self._in_use.add(world_id)
return world_id
async def release(self, world_id: str) -> None:
"""Return a dungeon instance to the pool."""
self._in_use.discard(world_id)
# Reset and return to pool
await self._reset_instance(world_id)
self._available.append(world_id)
Best Practices Summary¶
Do¶
- Cache entity queries when possible
- Use tags for fast entity categorization
- Batch network messages
- Rate limit AI calls
- Clean up dead entities promptly
- Profile before optimizing
- Use async operations for I/O
Don't¶
- Query entities multiple times per tick
- Store growing lists in components
- Make synchronous I/O calls in update()
- Create new objects every tick
- Hold references to destroyed entities
- Ignore memory growth
- Optimize prematurely
Monitoring¶
Health Checks¶
class HealthMonitor:
"""Monitor server health metrics."""
def __init__(self, world: World):
self._world = world
self._tick_count = 0
self._start_time = time.time()
def get_health(self) -> dict:
uptime = time.time() - self._start_time
return {
"uptime_seconds": uptime,
"tick_count": self._tick_count,
"ticks_per_second": self._tick_count / uptime if uptime > 0 else 0,
"entity_count": self._world.entities.count(),
"player_count": len(list(self._world.entities.with_tag("player"))),
"memory_mb": get_process_memory_mb(),
}
Alerts¶
async def check_performance_alerts(monitor: HealthMonitor) -> list[str]:
"""Check for performance issues."""
alerts = []
health = monitor.get_health()
if health["ticks_per_second"] < 3.5: # Below 4 TPS target
alerts.append(f"Low tick rate: {health['ticks_per_second']:.1f} TPS")
if health["memory_mb"] > 1024: # Over 1GB
alerts.append(f"High memory usage: {health['memory_mb']:.0f} MB")
if health["entity_count"] > 10000: # Many entities
alerts.append(f"High entity count: {health['entity_count']}")
return alerts
Performance Benchmarks¶
MAID includes performance benchmark tests in packages/maid-engine/tests/benchmarks/ that validate documented performance claims:
| Operation | Target |
|---|---|
| Session authentication lookup | < 0.1ms |
| Command execution latency | < 2ms |
| Event dispatch latency | < 1ms |
| Room lookup latency | < 0.5ms |
| Entity creation | < 1ms |
| Entity query (1000 entities) | < 5ms |
Running Benchmarks¶
# Run all benchmarks
uv run pytest packages/maid-engine/tests/benchmarks/ -v -m benchmark
# Skip benchmarks in regular test runs
uv run pytest packages/maid-engine/tests/ -m "not benchmark"
Future Enhancement: CI Benchmark Enforcement
Currently, performance benchmarks are available for manual execution but are not automatically enforced in CI pipelines. This is tracked as technical debt. Future enhancements should include:
- Automated benchmark runs on PRs that modify performance-critical code paths
- Regression detection with configurable thresholds
- Historical performance tracking and trend analysis
- Alerting when benchmarks exceed target thresholds
See GitHub Issue Tracking for updates on CI benchmark integration.
Next Steps¶
- Multi-World Support - Optimize multi-world setups
- AI Integration - Efficient AI usage