Source code for yoker.events.replay

"""Event replay agent for Yoker sessions.

Provides EventReplayAgent for replaying recorded sessions from JSONL files
without requiring LLM calls.

This module is part of the Event System (domain layer), enabling
session replay for demos, testing, and debugging.
"""

import inspect
import json
from pathlib import Path
from typing import TYPE_CHECKING

from yoker.events.recorder import deserialize_event
from yoker.events.types import (
  CommandEvent,
  Event,
  SessionStartEvent,
  TurnEndEvent,
  TurnStartEvent,
)

if TYPE_CHECKING:
  from yoker.base import EventCallback


[docs] class EventReplayAgent: """Agent that replays events from a JSONL file. This class provides the same async interface as Agent but replays previously recorded events instead of calling the LLM. Useful for: - Generating screenshots without LLM costs - Testing event handlers - Debugging session flows Example: agent = EventReplayAgent(Path("session.jsonl")) agent.add_event_handler(ConsoleEventHandler(console)) await agent.process("Hello") # Replays events for "Hello" turn """ def __init__(self, events_path: Path) -> None: """Initialize the replay agent. Args: events_path: Path to the events.jsonl file. Raises: FileNotFoundError: If events file doesn't exist. ValueError: If events file is invalid. """ self.events_path = events_path self.events: list[Event] = [] self.index = 0 self.thinking_enabled = True self._model = "replay" self._handlers: list[EventCallback] = [] # Load events from JSONL with open(events_path) as f: for line in f: entry = json.loads(line) event = deserialize_event(entry) self.events.append(event) # Extract model from first SESSION_START event for event in self.events: if isinstance(event, SessionStartEvent): self._model = event.model self.thinking_enabled = event.thinking_enabled break @property def model(self) -> str: """Return the model name from the recorded session.""" return self._model
[docs] def add_event_handler(self, handler: "EventCallback") -> None: """Register an event handler for replay. Args: handler: Callable that receives Event objects. """ self._handlers.append(handler)
[docs] async def begin_session(self) -> None: """No-op for replay agent - session already in event log.""" pass
[docs] async def end_session(self, reason: str = "quit") -> None: """No-op for replay agent.""" pass
[docs] async def process(self, message: str) -> str: """Replay events for one turn. Finds the matching TURN_START event and replays all events until TURN_END, emitting them to registered handlers. Args: message: The user message (used to find matching turn). Returns: The response text from the replayed turn. """ # Find TURN_START with matching message while self.index < len(self.events): evt = self.events[self.index] self.index += 1 if isinstance(evt, TurnStartEvent): # Check if this matches our message if evt.message == message: break elif self.index == 1: # No TURN_START found yet, just start from current break # Replay events until TURN_END response = "" while self.index < len(self.events): evt = self.events[self.index] # Emit to handlers (supports both sync and async handlers) await self._emit(evt) # Capture response from TURN_END if isinstance(evt, TurnEndEvent): response = evt.response self.index += 1 # Move past TURN_END break self.index += 1 return response
[docs] async def replay_command(self, command: str) -> str: """Replay a command event. Finds the matching COMMAND event and emits it to handlers. Args: command: The command string (used to find matching event). Returns: The command result, or empty string if not found. """ # Find COMMAND event with matching command while self.index < len(self.events): evt = self.events[self.index] self.index += 1 if isinstance(evt, CommandEvent): # Check if this matches our command if evt.command == command: # Emit to handlers await self._emit(evt) return evt.result return "" # Command not found
async def _emit(self, event: Event) -> None: """Emit an event to all registered handlers asynchronously. Supports both sync and async handlers for backward compatibility. Args: event: The event to emit. """ for handler in self._handlers: try: # Check if handler is async: either a coroutine function or an instance # with an async __call__ method. # inspect.iscoroutinefunction(instance) returns False for instances with # async __call__, but inspect.iscoroutinefunction(instance.__call__) # returns True. call_fn = getattr(handler, "__call__", handler) # noqa: B004 if inspect.iscoroutinefunction(call_fn): # Async handler - await it await handler(event) # type: ignore[misc] else: # Sync handler - call directly handler(event) except Exception: # Ignore handler errors during replay pass
__all__ = [ "EventReplayAgent", ]