Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 124 additions & 0 deletions config/simulation.json5
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@
{
type: "SimplePaths",
},
{
type: "EpisodicMemoryInput",
},
],
cortex_llm: {
type: "OpenAILLM",
Expand Down Expand Up @@ -76,6 +79,9 @@
{
type: "UnitreeGo2State",
},
{
type: "EpisodicMemoryWriter",
},
{
type: "UnitreeGo2LidarLocalization",
},
Expand Down Expand Up @@ -142,6 +148,31 @@
},
priority: 0,
},
{
hook_type: "on_entry",
handler_type: "function",
handler_config: {
module_name: "episodic_hook",
function: "start_episodic_hook",
context: {
announce: false,
},
},
priority: 0,
},
{
hook_type: "on_exit",
handler_type: "function",
handler_config: {
module_name: "episodic_hook",
function: "stop_episodic_hook",
context: {
announce: false,
},
},
timeout_seconds: 10,
priority: 0,
},
],
},

Expand Down Expand Up @@ -196,6 +227,9 @@
{
type: "SimplePaths",
},
{
type: "EpisodicMemoryInput",
},
],
cortex_llm: {
type: "OpenAILLM",
Expand Down Expand Up @@ -235,6 +269,9 @@
{
type: "UnitreeGo2State",
},
{
type: "EpisodicMemoryWriter",
},
{
type: "UnitreeGo2LidarLocalization",
},
Expand Down Expand Up @@ -297,6 +334,31 @@
},
priority: 0,
},
{
hook_type: "on_entry",
handler_type: "function",
handler_config: {
module_name: "episodic_hook",
function: "start_episodic_hook",
context: {
announce: false,
},
},
priority: 0,
},
{
hook_type: "on_exit",
handler_type: "function",
handler_config: {
module_name: "episodic_hook",
function: "stop_episodic_hook",
context: {
announce: false,
},
},
timeout_seconds: 10,
priority: 0,
},
],
},

Expand Down Expand Up @@ -330,6 +392,9 @@
{
type: "SimplePaths",
},
{
type: "EpisodicMemoryInput",
},
],
agent_actions: [
{
Expand All @@ -351,6 +416,9 @@
{
type: "UnitreeGo2State",
},
{
type: "EpisodicMemoryWriter",
},
{
type: "UnitreeGo2LidarLocalization",
},
Expand All @@ -376,6 +444,31 @@
message: "I'm ready to chat! What would you like to talk about?",
},
},
{
hook_type: "on_entry",
handler_type: "function",
handler_config: {
module_name: "episodic_hook",
function: "start_episodic_hook",
context: {
announce: false,
},
},
priority: 0,
},
{
hook_type: "on_exit",
handler_type: "function",
handler_config: {
module_name: "episodic_hook",
function: "stop_episodic_hook",
context: {
announce: false,
},
},
timeout_seconds: 10,
priority: 0,
},
],
},

Expand Down Expand Up @@ -421,6 +514,9 @@
{
type: "SimplePaths",
},
{
type: "EpisodicMemoryInput",
},
],
action_execution_mode: "concurrent",
agent_actions: [
Expand Down Expand Up @@ -457,6 +553,9 @@
{
type: "UnitreeGo2State",
},
{
type: "EpisodicMemoryWriter",
},
{
type: "UnitreeGo2LidarLocalization",
},
Expand Down Expand Up @@ -489,6 +588,31 @@
message: "Security patrol complete. All areas checked.",
},
},
{
hook_type: "on_entry",
handler_type: "function",
handler_config: {
module_name: "episodic_hook",
function: "start_episodic_hook",
context: {
announce: false,
},
},
priority: 0,
},
{
hook_type: "on_exit",
handler_type: "function",
handler_config: {
module_name: "episodic_hook",
function: "stop_episodic_hook",
context: {
announce: false,
},
},
timeout_seconds: 10,
priority: 0,
},
],
},
},
Expand Down
45 changes: 45 additions & 0 deletions src/backgrounds/plugins/episodic_memory_writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import logging

from backgrounds.base import Background, BackgroundConfig
from providers.episodic_memory_provider import EpisodicMemoryProvider


class EpisodicMemoryWriterConfig(BackgroundConfig):
"""Configuration for EpisodicMemoryWriter. No extra fields needed."""

pass


class EpisodicMemoryWriter(Background[EpisodicMemoryWriterConfig]):
"""
Background task that periodically flushes episodic memories to disk.

BackgroundOrchestrator wraps run() in its own while-not-stop loop,
so run() must NOT contain its own while loop.
Pattern: flush once -> sleep(interval) -> return.
"""

FLUSH_INTERVAL_SECONDS: float = 30.0

def __init__(self, config: EpisodicMemoryWriterConfig):
super().__init__(config)
self.provider = EpisodicMemoryProvider()
logging.info("EpisodicMemoryWriter initialized")

def run(self) -> None:
"""Flush pending episodes to disk, then sleep until next interval."""
try:
self.provider.save_to_disk()
logging.debug("EpisodicMemoryWriter: flush completed")
except Exception as e:
logging.error(f"EpisodicMemoryWriter: flush failed: {e}")

self.sleep(self.FLUSH_INTERVAL_SECONDS)

def stop(self) -> None:
"""Final flush when the background task is stopped."""
logging.info("EpisodicMemoryWriter: stopping, performing final flush")
try:
self.provider.save_to_disk()
except Exception as e:
logging.error(f"EpisodicMemoryWriter: final flush failed: {e}")
64 changes: 64 additions & 0 deletions src/hooks/episodic_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import logging
from typing import Any, Dict

from pydantic import BaseModel, ConfigDict, Field

from providers.elevenlabs_tts_provider import ElevenLabsTTSProvider
from providers.episodic_memory_provider import EpisodicMemoryProvider


class StartEpisodicHookContext(BaseModel):
"""Context for starting episodic memory hook."""

mode: str = Field(default="default", description="The mode being entered")
announce: bool = Field(default=False, description="Whether to announce via TTS")
model_config = ConfigDict(extra="allow")


class StopEpisodicHookContext(BaseModel):
"""Context for stopping episodic memory hook."""

mode: str = Field(default="default", description="The mode being exited")
announce: bool = Field(default=False, description="Whether to announce via TTS")
model_config = ConfigDict(extra="allow")


async def start_episodic_hook(context: Dict[str, Any]):
"""Hook called when a mode with episodic memory is entered."""
ctx = StartEpisodicHookContext(**context)
provider = EpisodicMemoryProvider()
provider._current_mode = ctx.mode
provider.load_mode(ctx.mode)
logging.info(f"episodic_hook: start — loaded episodes for mode '{ctx.mode}'")
if ctx.announce:
try:
tts = ElevenLabsTTSProvider()
tts.add_pending_message("I remember our past interactions.")
except Exception as e:
logging.warning(f"episodic_hook: TTS announce failed: {e}")
return {
"status": "success",
"message": f"Episodic memory loaded for mode '{ctx.mode}'",
}


async def stop_episodic_hook(context: Dict[str, Any]):
"""Hook called when a mode with episodic memory is exited."""
ctx = StopEpisodicHookContext(**context)
provider = EpisodicMemoryProvider()
try:
provider.save_to_disk()
logging.info(f"episodic_hook: stop — flushed episodes for mode '{ctx.mode}'")
except Exception as e:
logging.error(f"episodic_hook: flush failed: {e}")
raise
if ctx.announce:
try:
tts = ElevenLabsTTSProvider()
tts.add_pending_message("I've saved our memories.")
except Exception as e:
logging.warning(f"episodic_hook: TTS announce failed: {e}")
return {
"status": "success",
"message": f"Episodic memory flushed for mode '{ctx.mode}'",
}
Loading
Loading