-
Notifications
You must be signed in to change notification settings - Fork 57
Expand file tree
/
Copy pathtest_testing_remote_agents_streaming.py
More file actions
163 lines (132 loc) · 4.85 KB
/
test_testing_remote_agents_streaming.py
File metadata and controls
163 lines (132 loc) · 4.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
"""
Example: Testing an agent that returns streaming responses
This test demonstrates handling agents that stream their responses in chunks
rather than returning a complete message at once. The server uses real LLM streaming.
"""
import asyncio
import json
from aiohttp import web
import aiohttp
import pytest
import pytest_asyncio
import scenario
from openai import AsyncOpenAI
# Base URL for the test server (set during server startup)
base_url = ""
class StreamingAgentAdapter(scenario.AgentAdapter):
"""
Adapter for testing agents that stream responses in chunks.
This adapter:
1. Makes an HTTP POST request to the streaming endpoint
2. Collects all chunks as they arrive
3. Returns the complete response after streaming completes
"""
async def call(self, input: scenario.AgentInput) -> scenario.AgentReturnTypes:
# Request streaming response from your agent
async with aiohttp.ClientSession() as session:
async with session.post(
f"{base_url}/chat/stream",
json={"messages": input.messages},
) as response:
# Collect all chunks into a single response
full_response = ""
# Read stream chunk by chunk
async for chunk in response.content.iter_any():
# Decode chunk and append to full response
full_response += chunk.decode("utf-8")
# Return complete response after all chunks received
return full_response
# OpenAI client for LLM
client = AsyncOpenAI()
async def stream_handler(request: web.Request) -> web.StreamResponse:
"""
HTTP endpoint that streams LLM responses chunk by chunk.
This uses chunked transfer encoding to send the response progressively.
"""
data = await request.json()
messages = data["messages"]
# Determine last user message content
last_msg = messages[-1]
content = last_msg["content"]
if not isinstance(content, str):
content = ""
# Set up streaming response
response = web.StreamResponse()
response.headers["Content-Type"] = "text/plain"
response.headers["Transfer-Encoding"] = "chunked"
await response.prepare(request)
# Stream response using real LLM
stream = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": "You are a helpful weather assistant. Provide brief, friendly responses, immediately. Pretend like you have access to a weather API and make up the weather.",
},
{"role": "user", "content": content},
],
temperature=0.7,
stream=True,
)
# Stream chunks to client
async for chunk in stream:
if chunk.choices[0].delta.content:
await response.write(chunk.choices[0].delta.content.encode("utf-8"))
await response.write_eof()
return response
@pytest_asyncio.fixture
async def test_server():
"""
Start a test HTTP server before tests and shut it down after.
This server simulates a deployed agent endpoint with streaming.
"""
global base_url
# Create web application
app = web.Application()
app.router.add_post("/chat/stream", stream_handler)
# Start server on random available port
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "localhost", 0)
await site.start()
# Get the actual port assigned
server = site._server
assert server is not None
port = server.sockets[0].getsockname()[1] # type: ignore[union-attr]
base_url = f"http://localhost:{port}"
yield
# Cleanup: stop server
await runner.cleanup()
@pytest.mark.flaky(reruns=2)
@pytest.mark.asyncio
async def test_streaming_response(test_server):
"""
Test agent via HTTP endpoint with streaming response.
This test verifies:
- Adapter correctly handles streaming chunks
- Complete response is assembled from chunks
- Agent provides relevant weather information
- Full scenario flow works with streaming
"""
result = await scenario.run(
name="Streaming weather response",
description="User asks about weather and receives streamed response",
agents=[
scenario.UserSimulatorAgent(model="openai/gpt-4o-mini"),
StreamingAgentAdapter(),
scenario.JudgeAgent(
model="openai/gpt-4o-mini",
criteria=[
"Agent should provide weather information",
"Response should be complete and coherent",
],
),
],
script=[
scenario.user("What's the weather forecast in Amsterdam?"),
scenario.agent(),
scenario.judge(),
],
set_id="python-examples",
)
assert result.success