Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@
import asyncio
import inspect
import logging
import sys
from collections.abc import Callable, Sequence
from typing import Any

if sys.version_info >= (3, 13):
from warnings import deprecated # type: ignore # pragma: no cover
else:
from typing_extensions import deprecated # type: ignore # pragma: no cover

from agent_framework import Message, SupportsAgentRun
from agent_framework._workflows._agent_executor import AgentExecutor, AgentExecutorRequest, AgentExecutorResponse
from agent_framework._workflows._agent_utils import resolve_agent_id
Expand Down Expand Up @@ -206,8 +212,8 @@ def summarize(results: list[AgentExecutorResponse]) -> str:
# Enable checkpoint persistence so runs can resume
workflow = ConcurrentBuilder(participants=[agent1, agent2, agent3], checkpoint_storage=storage).build()
# Enable request info before aggregation
workflow = ConcurrentBuilder(participants=[agent1, agent2]).with_request_info().build()
# Enable human-in-the-loop before aggregation
workflow = ConcurrentBuilder(participants=[agent1, agent2]).with_human_in_the_loop().build()
"""

def __init__(
Expand Down Expand Up @@ -317,28 +323,27 @@ async def summarize(results: list[AgentExecutorResponse], ctx: WorkflowContext[N

return self

def with_request_info(
def with_human_in_the_loop(
self,
*,
agents: Sequence[str | SupportsAgentRun] | None = None,
) -> "ConcurrentBuilder":
"""Enable request info after agent participant responses.
"""Enable human-in-the-loop (HITL) pausing after agent participant responses.
This enables human-in-the-loop (HIL) scenarios for the concurrent orchestration.
When enabled, the workflow pauses after each agent participant runs, emitting
a request_info event (type='request_info') that allows the caller to review the conversation and optionally
inject guidance for the agent participant to iterate. The caller provides input via
a request_info event (type='request_info') so the caller can review the conversation
and optionally inject guidance for the agent to iterate. The caller provides input via
the standard response_handler/request_info pattern.
Simulated flow with HIL:
Input -> [Agent Participant <-> Request Info] -> [Agent Participant <-> Request Info] -> ...
Flow with HITL enabled:
Input -> [Agent <-> Human Review] -> [Agent <-> Human Review] -> ...
Note: This is only available for agent participants. Executor participants can incorporate
request info handling in their own implementation if desired.
Args:
agents: Optional list of agents names or agent factories to enable request info for.
If None, enables HIL for all agent participants.
agents: Optional list of agent names or agent instances to enable HITL for.
If None, enables HITL for all agent participants.
Returns:
Self for fluent chaining
Expand All @@ -350,6 +355,15 @@ def with_request_info(

return self

@deprecated("with_request_info() is deprecated; use with_human_in_the_loop() instead.")
def with_request_info(
self,
*,
agents: Sequence[str | SupportsAgentRun] | None = None,
) -> "ConcurrentBuilder":
"""Deprecated: use with_human_in_the_loop() instead."""
return self.with_human_in_the_loop(agents=agents)

def _resolve_participants(self) -> list[Executor]:
"""Resolve participant instances into Executor objects."""
if not self._participants:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
from dataclasses import dataclass
from typing import Any, ClassVar, cast

if sys.version_info >= (3, 13):
from warnings import deprecated # type: ignore # pragma: no cover
else:
from typing_extensions import deprecated # type: ignore # pragma: no cover

from agent_framework import Agent, AgentSession, Message, SupportsAgentRun
from agent_framework._workflows._agent_executor import AgentExecutor, AgentExecutorRequest, AgentExecutorResponse
from agent_framework._workflows._agent_utils import resolve_agent_id
Expand Down Expand Up @@ -852,24 +857,23 @@ def with_checkpointing(self, checkpoint_storage: CheckpointStorage) -> GroupChat
self._checkpoint_storage = checkpoint_storage
return self

def with_request_info(self, *, agents: Sequence[str | SupportsAgentRun] | None = None) -> GroupChatBuilder:
"""Enable request info after agent participant responses.
def with_human_in_the_loop(self, *, agents: Sequence[str | SupportsAgentRun] | None = None) -> GroupChatBuilder:
"""Enable human-in-the-loop (HITL) pausing after agent participant responses.

This enables human-in-the-loop (HIL) scenarios for the group chat orchestration.
When enabled, the workflow pauses after each agent participant runs, emitting
a request_info event (type='request_info') that allows the caller to review the conversation and optionally
inject guidance for the agent participant to iterate. The caller provides input via
a request_info event (type='request_info') so the caller can review the conversation
and optionally inject guidance for the agent to iterate. The caller provides input via
the standard response_handler/request_info pattern.

Simulated flow with HIL:
Input -> Orchestrator -> [Participant <-> Request Info] -> Orchestrator -> [Participant <-> Request Info] -> ...
Flow with HITL enabled:
Input -> Orchestrator -> [Participant <-> Human Review] -> Orchestrator -> [Participant <-> Human Review] -> ...

Note: This is only available for agent participants. Executor participants can incorporate
request info handling in their own implementation if desired.

Args:
agents: Optional list of agents names to enable request info for.
If None, enables HIL for all agent participants.
agents: Optional list of agent names or agent instances to enable HITL for.
If None, enables HITL for all agent participants.

Returns:
Self for fluent chaining
Expand All @@ -881,6 +885,11 @@ def with_request_info(self, *, agents: Sequence[str | SupportsAgentRun] | None =

return self

@deprecated("with_request_info() is deprecated; use with_human_in_the_loop() instead.")
def with_request_info(self, *, agents: Sequence[str | SupportsAgentRun] | None = None) -> GroupChatBuilder:
"""Deprecated: use with_human_in_the_loop() instead."""
return self.with_human_in_the_loop(agents=agents)

def _resolve_orchestrator(self, participants: Sequence[Executor]) -> Executor:
"""Determine the orchestrator to use for the workflow.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,13 @@ def __init__(
Args:
agent: The agent protocol to use for generating responses.
context_mode: The mode for providing context to the agent.

Note:
``propagate_request=True`` is set on the inner WorkflowExecutor so that
human-in-the-loop pause events bubble up through any parent workflow rather
than being intercepted internally. This means when this orchestration is itself
nested (e.g. used as a sub-workflow via WorkflowExecutor), HITL pauses surface
to the outermost caller automatically.
"""
self._context_mode: Literal["full", "last_agent", "custom"] | None = context_mode
self._description = agent.description
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,15 @@
"""

import logging
import sys
from collections.abc import Sequence
from typing import Any, Literal

if sys.version_info >= (3, 13):
from warnings import deprecated # type: ignore # pragma: no cover
else:
from typing_extensions import deprecated # type: ignore # pragma: no cover

from agent_framework import Message, SupportsAgentRun
from agent_framework._workflows._agent_executor import (
AgentExecutor,
Expand Down Expand Up @@ -128,13 +134,13 @@ class SequentialBuilder:
# Enable checkpoint persistence
workflow = SequentialBuilder(participants=[agent1, agent2], checkpoint_storage=storage).build()

# Enable request info for mid-workflow feedback (pauses before each agent)
workflow = SequentialBuilder(participants=[agent1, agent2]).with_request_info().build()
# Enable human-in-the-loop for mid-workflow feedback (pauses before each agent)
workflow = SequentialBuilder(participants=[agent1, agent2]).with_human_in_the_loop().build()

# Enable request info only for specific agents
# Enable human-in-the-loop only for specific agents
workflow = (
SequentialBuilder(participants=[agent1, agent2, agent3])
.with_request_info(agents=[agent2]) # Only pause before agent2
.with_human_in_the_loop(agents=[agent2]) # Only pause before agent2
.build()
)
"""
Expand Down Expand Up @@ -191,28 +197,27 @@ def _set_participants(self, participants: Sequence[SupportsAgentRun | Executor])

self._participants = list(participants)

def with_request_info(
def with_human_in_the_loop(
self,
*,
agents: Sequence[str | SupportsAgentRun] | None = None,
) -> "SequentialBuilder":
"""Enable request info after agent participant responses.
"""Enable human-in-the-loop (HITL) pausing after agent participant responses.

This enables human-in-the-loop (HIL) scenarios for the sequential orchestration.
When enabled, the workflow pauses after each agent participant runs, emitting
a request_info event (type='request_info') that allows the caller to review the conversation and optionally
inject guidance for the agent participant to iterate. The caller provides input via
a request_info event (type='request_info') so the caller can review the conversation
and optionally inject guidance for the agent to iterate. The caller provides input via
the standard response_handler/request_info pattern.

Simulated flow with HIL:
Input -> [Agent Participant <-> Request Info] -> [Agent Participant <-> Request Info] -> ...
Flow with HITL enabled:
Input -> [Agent <-> Human Review] -> [Agent <-> Human Review] -> ...

Note: This is only available for agent participants. Executor participants can incorporate
request info handling in their own implementation if desired.

Args:
agents: Optional list of agents names or agent factories to enable request info for.
If None, enables HIL for all agent participants.
agents: Optional list of agent names or agent instances to enable HITL for.
If None, enables HITL for all agent participants.

Returns:
Self for fluent chaining
Expand All @@ -224,6 +229,15 @@ def with_request_info(

return self

@deprecated("with_request_info() is deprecated; use with_human_in_the_loop() instead.")
def with_request_info(
self,
*,
agents: Sequence[str | SupportsAgentRun] | None = None,
) -> "SequentialBuilder":
"""Deprecated: use with_human_in_the_loop() instead."""
return self.with_human_in_the_loop(agents=agents)

def _resolve_participants(self) -> list[Executor]:
"""Resolve participant instances into Executor objects."""
if not self._participants:
Expand Down
35 changes: 25 additions & 10 deletions python/packages/orchestrations/tests/test_group_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -731,8 +731,8 @@ async def test_group_chat_checkpoint_runtime_overrides_buildtime() -> None:
assert len(buildtime_checkpoints) == 0, "Build-time storage should have no checkpoints when overridden"


async def test_group_chat_with_request_info_filtering():
"""Test that with_request_info(agents=[...]) only pauses before specified agents run."""
async def test_group_chat_with_human_in_the_loop_filtering():
"""Test that with_human_in_the_loop(agents=[...]) only pauses before specified agents run."""
# Create agents - we want to verify only beta triggers pause
alpha = StubAgent("alpha", "response from alpha")
beta = StubAgent("beta", "response from beta")
Expand All @@ -757,7 +757,7 @@ async def selector(state: GroupChatState) -> str:
selection_func=selector,
orchestrator_name="manager",
)
.with_request_info(agents=["beta"]) # Only pause before beta runs
.with_human_in_the_loop(agents=["beta"]) # Only pause before beta runs
.build()
)

Expand Down Expand Up @@ -788,8 +788,8 @@ async def selector(state: GroupChatState) -> str:
assert len(outputs) == 1


async def test_group_chat_with_request_info_no_filter_pauses_all():
"""Test that with_request_info() without agents pauses before all participants."""
async def test_group_chat_with_human_in_the_loop_no_filter_pauses_all():
"""Test that with_human_in_the_loop() without agents pauses before all participants."""
# Create agents
alpha = StubAgent("alpha", "response from alpha")

Expand All @@ -811,7 +811,7 @@ async def selector(state: GroupChatState) -> str:
selection_func=selector,
orchestrator_name="manager",
)
.with_request_info() # No filter - pause for all
.with_human_in_the_loop() # No filter - pause for all
.build()
)

Expand All @@ -827,19 +827,34 @@ async def selector(state: GroupChatState) -> str:
assert request_events[0].source_executor_id == "alpha"


def test_group_chat_builder_with_request_info_returns_self():
"""Test that with_request_info() returns self for method chaining."""
def test_group_chat_builder_with_human_in_the_loop_returns_self():
"""Test that with_human_in_the_loop() returns self for method chaining."""
agent = StubAgent("test", "response")
builder = GroupChatBuilder(participants=[agent])
result = builder.with_request_info()
result = builder.with_human_in_the_loop()
assert result is builder

# Also test with agents parameter
builder2 = GroupChatBuilder(participants=[agent])
result2 = builder2.with_request_info(agents=["test"])
result2 = builder2.with_human_in_the_loop(agents=["test"])
assert result2 is builder2


def test_group_chat_builder_with_request_info_deprecated():
"""Test that with_request_info() emits DeprecationWarning and still works."""
import warnings

agent = StubAgent("test", "response")
builder = GroupChatBuilder(participants=[agent])
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
result = builder.with_request_info()
assert result is builder
assert len(w) == 1
assert issubclass(w[0].category, DeprecationWarning)
assert "with_human_in_the_loop" in str(w[0].message)


# region Orchestrator Factory Tests


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""
Sample: Request Info with ConcurrentBuilder

This sample demonstrates using the `.with_request_info()` method to pause a
This sample demonstrates using the `.with_human_in_the_loop()` method to pause a
ConcurrentBuilder workflow for specific agents, allowing human review and
modification of individual agent outputs before aggregation.

Expand All @@ -12,7 +12,7 @@
allowing review and steering of their results.

Demonstrate:
- Configuring request info with `.with_request_info()` for specific agents
- Configuring request info with `.with_human_in_the_loop()` for specific agents
- Reviewing output from individual agents during concurrent execution
- Injecting human guidance for specific agents before aggregation

Expand Down Expand Up @@ -191,7 +191,7 @@ async def main() -> None:
ConcurrentBuilder(participants=[technical_analyst, business_analyst, user_experience_analyst])
.with_aggregator(aggregate_with_synthesis)
# Only enable request info for the technical analyst agent
.with_request_info(agents=["technical_analyst"])
.with_human_in_the_loop(agents=["technical_analyst"])
.build()
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""
Sample: Request Info with GroupChatBuilder

This sample demonstrates using the `.with_request_info()` method to pause a
This sample demonstrates using the `.with_human_in_the_loop()` method to pause a
GroupChatBuilder workflow BEFORE specific participants speak. By using the
`agents=` filter parameter, you can target only certain participants rather
than pausing before every turn.
Expand All @@ -13,7 +13,7 @@
specific participants speak, allowing human input to steer their response.

Demonstrate:
- Configuring request info with `.with_request_info(agents=[...])`
- Configuring request info with `.with_human_in_the_loop(agents=[...])`
- Using agent filtering to reduce interruptions
- Steering agent behavior with pre-agent human input

Expand Down Expand Up @@ -158,7 +158,7 @@ async def main() -> None:
max_rounds=6,
orchestrator_agent=orchestrator,
)
.with_request_info(agents=[pragmatist]) # Only pause before pragmatist speaks
.with_human_in_the_loop(agents=[pragmatist]) # Only pause before pragmatist speaks
.build()
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""
Sample: Request Info with SequentialBuilder

This sample demonstrates using the `.with_request_info()` method to pause a
This sample demonstrates using the `.with_human_in_the_loop()` method to pause a
SequentialBuilder workflow AFTER each agent runs, allowing external input
(e.g., human feedback) for review and optional iteration.

Expand All @@ -12,7 +12,7 @@
using the standard request_info pattern for consistency.

Demonstrate:
- Configuring request info with `.with_request_info()`
- Configuring request info with `.with_human_in_the_loop()`
- Handling request_info events with AgentInputRequest data
- Injecting responses back into the workflow via run(responses=..., stream=True)

Expand Down Expand Up @@ -129,7 +129,7 @@ async def main() -> None:
workflow = (
SequentialBuilder(participants=[drafter, editor, finalizer])
# Only enable request info for the editor agent
.with_request_info(agents=["editor"])
.with_human_in_the_loop(agents=["editor"])
.build()
)

Expand Down
Loading