Production-ready LangGraph implementation for intelligent travel planning with multi-API integration, async execution, and human-in-the-loop workflows.
User Query: "I'm planning a 4-day trip from Paris to New York..."
System Response:
- ✅ Triggers customer information form
- 🔄 Parallel API calls to Amadeus (flights, hotels, activities)
- 📦 Generates 3 budget-tiered packages
- 💬 Natural language presentation
- Multi-Agent Architecture: Specialized agents for flights, hotels, and activities
- Async Parallel Execution: Simultaneous API calls for optimal performance
- Intelligent Analysis: LLM-powered natural language understanding
- Budget-Aware Planning: Automatic package generation (Budget/Balanced/Premium)
- Human-in-the-Loop: Mid-conversation form collection for customer details
✈️ Amadeus: Flights, hotels, activities search- 🏨 Hotelbeds: Enhanced hotel inventory (optional)
- 💬 Twilio: SMS notifications (optional)
- 📊 CRM: HubSpot by default (easily customizable)
- Type-safe with Pydantic models
- Comprehensive error handling
- Async/await throughout
- State persistence via checkpointing
- Location auto-conversion (city names → IATA codes)
- Python 3.9 or higher
- API keys for required services (see API Setup)
# Clone the repository
git clone https://github.com/HarimxChoi/langgraph-travel-agent.git
cd langgraph-travel-agent
# Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\\Scripts\\activate
# Install dependencies
pip install -r requirements.txt
# Configure environment
cp .env.example .env
# Edit .env with your API keys
Option 1: Web Interface (Recommended)
- Start both backend and frontend (see above)
- Navigate to http://localhost:3000
- Type: "Find me a flight from NYC to Paris next Monday"
Option 2: Python API (for developers)
import asyncio
from langchain_core.messages import HumanMessage
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
from backend.travel_agent import build_enhanced_graph
async def main():
async with AsyncSqliteSaver.from_conn_string(".langgraph_checkpoints.sqlite") as saver:
graph = build_enhanced_graph(checkpointer=saver)
config = {"configurable": {"thread_id": "python-demo"}}
response = await graph.ainvoke({
"messages": [HumanMessage(content="Find flights to Tokyo")]
}, config=config)
print(response["messages"][-1].content)
asyncio.run(main())如果你想对比“当前仓库的表单轮询式 HITL”与 langgraph 原生暂停/恢复机制,见示例:
python examples/langgraph_hitl_poc.py --demo交互模式与更多说明见 examples/README.md。
┌─────────────────────────────────────────────────┐
│ User Request (Natural Language) │
└─────────────────┬───────────────────────────────┘
↓
┌─────────────────────────────────────────────────┐
│ call_model_and_tools Node │
│ • Extract TravelPlan with LLM │
│ • Convert locations (city → codes) │
└─────────────────┬───────────────────────────────┘
↓
[Need Customer Info?]
├─ YES ─────────────────────┐
│ ↓
│ ┌─────────────────────────┐
│ │ Display Customer Form │
│ │ (HITL - Human in Loop) │
│ └─────────┬───────────────┘
│ ↓
│ User Fills Form & Submits
│ ↓
└─ NO ──────────> [Continuation=True]
↓
┌──────────────────────────┴─────────────────┐
│ Parallel Tool Execution │
├─ Search Flights (Amadeus) │
├─ Search Hotels (Amadeus + Hotelbeds) │
└─ Search Activities (Amadeus) │
└──────────────────┬─────────────────────────┘
↓
┌─────────────────────────────────────────────┐
│ synthesize_results Node │
│ • Parse all tool results │
│ • Generate packages (if full_plan + budget) │
│ • Create final LLM response │
│ • Send to CRM (HubSpot) │
└─────────────────┬───────────────────────────┘
↓
┌─────────────────────────────────────────────┐
│ Final Response to User │
└─────────────────────────────────────────────┘
Key Components:
- Human-in-the-Loop (HITL): Triggers customer info form mid-conversation
- Parallel Execution: All API calls run simultaneously for speed
- Multi-Provider Search: Hotels queried from both Amadeus + Hotelbeds
- Intelligent Packaging: LLM generates Budget/Balanced/Premium packages when budget provided
- CRM Integration: Auto-sends finalized plans to HubSpot
Travel Request
↓
LLM Analysis (Extract TravelPlan)
↓
Intent Detection
├─ full_plan ──────┬─────────────────┬───────────────────┐
├─ flights_only ───┤ │ │
├─ hotels_only ────┤ │ │
└─ activities_only─┤ │ │
↓ ↓ ↓
┌─────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ search_flights │ │ search_hotels │ │ search_activities│
│ │ │ ├─ Amadeus API │ │ │
│ Amadeus API │ │ └─ Hotelbeds API │ │ Amadeus API │
└────────┬────────┘ └────────┬─────────┘ └─────────┬────────┘
│ │ │
└───────────────────┴─────────────────────┘
↓
[IF full_plan + budget exists]
↓
┌─────────────────────────┐
│ generate_travel_packages│
│ • Budget tier │
│ • Balanced tier │
│ • Premium tier │
└────────┬────────────────┘
↓
┌─────────────────────┐
│ Final LLM Response │
└─────────────────────┘
Tool Triggers by Intent:
full_plan→ Flights + Hotels + Activitiesflights_only→ Flightshotels_only→ Hotelsactivities_only→ Activities
Core State Fields:
| Field | Type | Purpose |
|---|---|---|
messages |
List[AnyMessage] |
Full conversation history (auto-accumulated) |
travel_plan |
TravelPlan |
Structured trip extracted by LLM (origin, destination, dates, budget, intent) |
customer_info |
Dict |
User details from HITL form (name, email, phone, budget) |
current_step |
str |
Workflow stage: "initial" → "collecting_info" → "synthesizing" → "complete" |
form_to_display |
str |
UI trigger: "customer_info" signals frontend to show form |
is_continuation |
bool |
Session flag: True after form submission to bypass re-collection |
original_request |
str |
First user message preserved for CRM context |
Unused (Reserved for Extensions):
user_preferences: For future personalizationerrors: For error accumulation patternstrip_details: For additional metadata
cd frontend/travel-widget
# Install dependencies
npm install
# Configure API endpoint (if needed)
echo "REACT_APP_API_URL=http://localhost:8000" > .env
# Start development server
npm start
# React app runs on http://localhost:3000Terminal 1 - Backend:
cd backend
python main.py
# Server runs on http://localhost:8000Terminal 2 - Frontend:
cd frontend/travel-widget
npm start
# React app runs on http://localhost:3000DEEPSEEK_API_KEY=your_key_here
# Sign up: https://developers.amadeus.com/register
AMADEUS_API_KEY=your_key_here
AMADEUS_API_SECRET=your_secret_here
# Sign up: https://developer.hotelbeds.com/
HOTELBEDS_API_KEY=your_key_here
HOTELBEDS_API_SECRET=your_secret_here
# Get API key: https://app.hubspot.com/integrations-settings/api-key
HUBSPOT_API_KEY=your_key_here
import asyncio
from langchain_core.messages import HumanMessage
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
from backend.travel_agent import build_enhanced_graph
async def run_full_trip_example():
async with AsyncSqliteSaver.from_conn_string(".langgraph_checkpoints.sqlite") as saver:
graph = build_enhanced_graph(checkpointer=saver)
config = {"configurable": {"thread_id": "full-trip-demo"}}
response = await graph.ainvoke({
"messages": [HumanMessage(
content="Plan a 7-day honeymoon to Bali from NYC, departing May 15, budget $5000"
)],
"customer_info": {
"name": "John Doe",
"email": "john@example.com",
"phone": "+1234567890",
"budget": "5000"
}
}, config=config)
# System returns 3 packages: Budget, Balanced, Premium
packages = response["messages"][-1].content
print(packages)
asyncio.run(run_full_trip_example())# Reuse the same `graph` instance and `config` from Example 1.
response = await graph.ainvoke({
'messages': [HumanMessage(
content="Find business class flights from Seoul to Paris on June 10, returning June 20"
)]
}, config=config)
# Returns top 3 flight options sorted by relevanceresponse = await graph.ainvoke({
'messages': [HumanMessage(
content="4-star hotels in Tokyo for 3 nights, checking in July 1"
)]
}, config=config)
# Returns combined results from Amadeus + Hotelbedsresponse = await graph.ainvoke({
'messages': [HumanMessage(
content="What are the top activities in Rome?"
)]
}, config=config)
# Returns activities with pricing near city centerimport asyncio
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
from backend.travel_agent import build_enhanced_graph
from langchain_core.messages import HumanMessage
async def persistence_demo():
async with AsyncSqliteSaver.from_conn_string(".langgraph_checkpoints.sqlite") as saver:
graph = build_enhanced_graph(checkpointer=saver)
config = {"configurable": {"thread_id": "user_123"}}
response1 = await graph.ainvoke({
'messages': [HumanMessage(content="I want to visit Japan")]
}, config)
response2 = await graph.ainvoke({
'messages': [HumanMessage(content="For 10 days with $3000 budget")]
}, config)
print(response2["messages"][-1].content)
asyncio.run(persistence_demo())from langchain_core.tools import tool
@tool
async def search_restaurants(city: str, cuisine: str) -> List[dict]:
"""Search for restaurants in a city"""
# Your implementation
return results
# Add to tools list
tools = [
search_flights,
search_and_compare_hotels,
search_activities_by_city,
search_restaurants, # New tool
send_sms_notification,
send_to_hubspot
]# simple_server.py
import asyncio
from fastapi import FastAPI
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
from langchain_core.messages import HumanMessage
from backend.travel_agent import build_enhanced_graph
app = FastAPI()
@app.on_event("startup")
async def startup_event():
app.state.checkpointer = await AsyncSqliteSaver.from_conn_string(".langgraph_checkpoints.sqlite").__aenter__()
app.state.graph = build_enhanced_graph(checkpointer=app.state.checkpointer)
@app.on_event("shutdown")
async def shutdown_event():
saver = getattr(app.state, "checkpointer", None)
if saver:
await saver.__aexit__(None, None, None)
@app.post("/chat")
async def chat(message: str):
response = await app.state.graph.ainvoke({
'messages': [HumanMessage(content=message)]
}, config={"configurable": {"thread_id": "fastapi-demo"}})
return {"response": response['messages'][-1].content}
# Run: uvicorn simple_server:app --reload# Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8000"]
# Build and run
docker build -t travel-agent .
docker run -p 8000:8000 --env-file .env travel-agent
- Use Redis for Checkpointing
from langgraph.checkpoint.redis import RedisSaver
checkpointer = RedisSaver.from_conn_info(
host="localhost",
port=6379,
db=0
)
graph = build_enhanced_graph(checkpointer)- Add Rate Limiting
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
@app.post("/chat")
@limiter.limit("10/minute")
async def chat(request: Request, message: str):
# Your code- Implement Authentication
from fastapi.security import HTTPBearer
security = HTTPBearer()
@app.post("/chat")
async def chat(
message: str,
credentials: HTTPAuthorizationCredentials = Depends(security)
):
# Verify credentials
# Your code- Add Monitoring
# Add Sentry for error tracking
import sentry_sdk
sentry_sdk.init(dsn="your_sentry_dsn")
# Add Prometheus metrics
from prometheus_client import Counter, Histogram
request_count = Counter('requests_total', 'Total requests')
request_duration = Histogram('request_duration_seconds', 'Request duration')- Parallel Tool Execution: Already implemented via
asyncio.gather - Connection Pooling: Use
httpx.AsyncClientwith connection limits - Caching: Add Redis cache for repeated searches
- Timeout Configuration: Set appropriate timeouts for external APIs
async with httpx.AsyncClient(
timeout=15.0,
limits=httpx.Limits(max_connections=100)
) as client:
# Your code- Analysis Phase: 1-2 seconds (LLM inference)
- Tool Execution: 3-8 seconds (parallel API calls)
- Package Generation: 1-2 seconds (LLM synthesis)
- Total Response Time: 5-12 seconds typical
---
## 📝 License
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
---
## 🙏 Acknowledgments
- Built with [LangGraph](https://github.com/langchain-ai/langgraph)
- Powered by [Amadeus Travel APIs](https://developers.amadeus.com/)
