ROS2 package providing an intelligent conversational agent framework for robots using language models (LLMs) and tools via the Model Context Protocol (MCP).
This package offers a flexible framework for implementing conversational agents in ROS2 using LangGraph workflows and Ollama as the backend for running LLM models locally. The architecture is designed with extensibility in mind, allowing developers to create custom use cases by inheriting from base classes.
-
Base Classes: Abstract base classes for creating custom agents
LangGraphBase: Base class for LangGraph workflow managementLangGraphRosBase: Base class for ROS2 integration
-
Utilities:
Ollama: Client for Ollama LLM server with MCP integration- MCP Client: Tool retrieval and execution via Model Context Protocol
- Prompt System: Jinja2 templates for customizable prompts
The agent operates as a ROS2 service:
- Service (default:
agent_service): Receives user queries and returns agent responses- Service Type:
llm_interactions_msgs/UserQueryResponse - Request Fields:
user_query(string): The user's question or commanduser_name(string): Optional username for personalization
- Response Fields:
response_text(string): The agent's generated response
- Service Type:
langgraph_base_ros/
├── test/
│ └── test_langgraph.py # Unit tests for the package
├── resource/
│ └── langgraph_base_ros # Resource files for ROS2
├── langgraph_base_ros/
│ ├── langgraph_base.py # Base class for LangGraph workflows
│ ├── langgraph_ros_base.py # Base class for ROS2 integration
│ ├── ollama_utils.py # Ollama client utilities
├── package.xml
├── requirements.txt
├── README.md
├── setup.cfg
└── setup.py
-
Ollama installed and running:
# Install Ollama (if not installed) curl -fsSL https://ollama.com/install.sh | sh # Download the model ollama pull qwen3:0.6b # Verify Ollama is running ollama list
-
Configured MCP server:
- Ensure the MCP server specified in
langgraph_mcp.jsonis accessible - Server must implement the MCP protocol correctly
- A fake MCP server (
fake_mcp_server.py) is provided for testing
- Ensure the MCP server specified in
-
Python virtual environment with dependencies:
python -m venv agent-venv source agent-venv/bin/activate pip install -r requirements.txtcd uv uv sync source .venv/bin/activate
-
Environment variables (optional, for LangSmith tracing): Edit the
.envfile in the package:LANGSMITH_API_KEY=your_api_key LANGSMITH_TRACING=true LANGSMITH_PROJECT=project_name
-
Building:
cd ~/colcon_ws source /opt/ros/humble/setup.bash colcon build --packages-select pisito_agent --symlink-install source install/setup.bash
Abstract base class for implementing LangGraph workflows. Provides common functionality and attributes for conversation management.
Key Features:
- Workflow state management
- Step and message counting
- Logging abstraction
- Abstract
make_graph()method for custom workflow definition
Usage:
from langgraph_base_ros.langgraph_base import LangGraphBase
class CustomWorkflow(LangGraphBase):
async def make_graph(self):
# Define your custom workflow here
workflow = StateGraph(Messages)
# Add nodes, edges, etc.
self.graph = workflow.compile()Abstract base class for ROS2 integration. Handles ROS2 parameter management, Ollama agent initialization, and MCP client setup.
Key Features:
- ROS2 parameter declaration and retrieval
- Async Ollama agent initialization
- MCP client configuration and tool retrieval
- Abstract
agent_callback()method for custom service handling - Abstract
build_graph()method for workflow compilation based on the defined LangGraph workflow - Persistent asyncio event loop management
Usage:
from langgraph_base_ros.langgraph_ros_base import LangGraphRosBase
class CustomRosAgent(LangGraphRosBase):
def __init__(self):
super().__init__()
# Initialize your custom workflow manager
self.graph_manager = CustomWorkflow(...)
self.build_graph(self.graph_manager)
# Create your service/subscriber/etc.
self.create_service(...)
def agent_callback(self, request, response):
# Handle incoming requests
return responseCreate a class inheriting from LangGraphBase:
from langgraph_base_ros.langgraph_base import LangGraphBase
from langgraph.graph import START, StateGraph, END
class MyCustomWorkflow(LangGraphBase):
async def my_custom_node(self, state):
# Your custom logic
return state
def my_decision_function(self, state):
# Decision logic for conditional edges
return "next_node"
async def make_graph(self):
workflow = StateGraph(Messages)
# Add nodes
workflow.add_node('my_node', self.my_custom_node)
# Add edges
workflow.add_edge(START, 'my_node')
workflow.add_conditional_edges(
'my_node',
self.my_decision_function,
{'next_node': 'my_node', 'finish': END}
)
self.graph = workflow.compile()Create a class inheriting from LangGraphRosBase:
from langgraph_base_ros.langgraph_ros_base import LangGraphRosBase
from your_workflow_module import MyCustomWorkflow
from your_msgs.srv import YourServiceType
class MyCustomAgent(LangGraphRosBase):
def __init__(self):
super().__init__()
# Initialize your workflow
self.graph_manager = MyCustomWorkflow(
logger=self.get_logger(),
ollama_agent=self.ollama_agent,
max_steps=self.max_steps
)
self.build_graph()
# Create your service
self.srv = self.create_service(
YourServiceType,
'your_service_name',
self.agent_callback
)
def agent_callback(self, request, response):
# Process request using your workflow
result = self.loop.run_until_complete(
self.graph_manager.graph.ainvoke(initial_state)
)
response.field = result['messages'][-1]['content']
return response
def build_graph(self) -> None:
# Initialize and compile the LangGraph workflow
try:
self.loop.run_until_complete(self.graph_manager.make_graph())
except Exception as e:
self.get_logger().error(f'Failed to create LangGraph workflow: {e}')
raise
self.get_logger().info('MyCustomWorkflow graph created successfully...')Apache License 2.0
Oscar Pons Fernandez (opfernandez@uma.es)