Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 126 additions & 0 deletions examples/agentic_demo/atropos_gsm8k_grpo_qwen25_0.5b.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Atropos + GSM8K + GRPO
# Optimized for Qwen2.5-0.5B-Instruct on 2x 3090.
# Lightweight, high-concurrency reasoning training.

defaults:
- ../config/deepspeed_zero2@_here_

exp_name: "atropos_gsm8k_grpo_qwen25_0_5b_v21"
seed: 42
max_steps: 100
eval_steps: 0
world_size: 1
num_gpus_per_node: 2

track_with: wandb
tracker_kwargs:
api_key: ${oc.env:WANDB_API_KEY}
project: roll-atropos-integration
name: ${exp_name}

# --- Model Configs ---
pretrain: Qwen/Qwen2.5-0.5B-Instruct
sequence_length: 2048
rollout_batch_size: 64
val_batch_size: 1

actor_train:
training_args:
learning_rate: 1.0e-6
per_device_train_batch_size: 1
gradient_accumulation_steps: 32
device_mapping: "[1]"
strategy_args:
strategy_name: deepspeed_train
strategy_config:
zero_optimization:
stage: 2
offload_optimizer:
device: cpu
pin_memory: false
bf16:
enabled: true
data_args:
template: qwen2_5

actor_infer:
strategy_args:
strategy_name: vllm
strategy_config:
gpu_memory_utilization: 0.4
max_model_len: 2048
VLLM_USE_V1: 0
enforce_eager: true
enable_prefix_caching: false
device_mapping: "[0]"
generating_args:
do_sample: true
temperature: 1.0
top_p: 0.95
max_new_tokens: 512
max_tokens_per_step: 512

reference:
device_mapping: "[1]"
strategy_args:
strategy_name: deepspeed_train
strategy_config:
zero_optimization:
stage: 2
bf16:
enabled: true

# --- RL Configs (GRPO) ---
adv_estimator: "grpo"
batch_adjust_mode: "random_sample"
step_reward_gamma: 1.0
use_kl_loss: true
kl_loss_coef: 0.001
whiten_advantages: true
reward_normalization:
grouping: "traj_group_id"
norm_mean_type: "group"
norm_std_type: "group"
entropy_loss_coef: 0
max_grad_norm: 1.0

# --- Atropos Execution Bridge Config ---
max_actions_per_traj: 1
env_manager_cls: roll.pipeline.agentic.env_manager.agent_native_env_manager.AgentNativeStepEnvManager

custom_envs:
AtroposGSM8KTrain:
env_type: "atropos_env"
max_steps: ${max_actions_per_traj}
max_tokens_per_step: ${max_tokens_per_step}
env_manager_cls: ${env_manager_cls}
agent_system_template: "You are a deep thinking AI, you may use extremely long chains of thought to deeply consider the problem and deliberate with yourself via systematic reasoning processes to help come to a correct solution prior to answering. You should enclose your thoughts and internal monologue inside <think> </think> tags, and then provide your solution or response to the problem.\n\nYou are allocated a maximum of 2048 tokens, please strive to use less.\n\nYou will then provide your answer like this: \\boxed{your answer here}\nIt is important that you provide your answer in the correct format.\nIf you do not, you will not receive credit for your answer.\nSo please end your answer with \\boxed{your answer here}"
agent_template: "{observation}"
env_config:
atropos_env_path: "environments.gsm8k_server:GSM8kEnv"
max_steps: ${max_actions_per_traj}
debug: true
env_config:
group_size: 1
max_token_length: 1024
reward_config:
format_markers:
- marker: "\\boxed{"
reward: 0.3
- marker: "<think>"
reward: 0.2
length_bounty_max: 0.2

train_env_manager:
max_env_num_per_worker: 4
num_env_groups: 4
group_size: 16
tags: [AtroposGSM8KTrain]
num_groups_partition: [4]

val_env_manager:
max_env_num_per_worker: 1
num_env_groups: 1
group_size: 1
tags: [AtroposGSM8KTrain]
num_groups_partition: [1]
13 changes: 13 additions & 0 deletions examples/agentic_demo/run_atropos_gsm8k.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/bash
# Atropos-ROLL Integration: GSM8K + Qwen2.5-0.5B
# Usage: bash examples/agentic_demo/run_atropos_gsm8k.sh

export VLLM_USE_V1=0
export WANDB_MODE=online
export TQDM_DISABLE=1
export HF_DATASETS_OFFLINE=1
export HF_HUB_OFFLINE=1

python examples/start_agentic_pipeline.py \
--config_path agentic_demo \
--config_name atropos_gsm8k_grpo_qwen25_0.5b
65 changes: 45 additions & 20 deletions roll/distributed/scheduler/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def __init__(self, num_gpus_per_node, num_nodes):
node_gpu_num = int(resource.get(current_platform.ray_device_key, 0))
if node_gpu_num >= num_gpus_per_node:
nodes_maybe_used.append(node)
nodes_maybe_used = sorted(nodes_maybe_used, key=lambda n: n["Resources"]["CPU"])
nodes_maybe_used = sorted(nodes_maybe_used, key=lambda n: n["Resources"].get("CPU") or n["Resources"].get("num_cpus", 0))

ray_num_nodes = len(nodes_maybe_used)
if num_nodes is None:
Expand All @@ -39,25 +39,50 @@ def __init__(self, num_gpus_per_node, num_nodes):

if self.gpu_per_node > 0:
assert self.num_gpus <= available_gpu, f"num_gpus {self.num_gpus} > available_gpu {available_gpu}"
bundles = []
for i in range(self.num_nodes):
node = nodes_maybe_used[i]
node_cpu = int(node["Resources"]["CPU"])
bundles.append({current_platform.ray_device_key: self.gpu_per_node, "CPU": max(node_cpu / 2, 1)})

self.placement_groups = [ray.util.placement_group([bundle]) for bundle in bundles]
ray.get([pg.ready() for pg in self.placement_groups])
gpu_ranks = ray.get([
get_visible_gpus.options(
placement_group=pg,
**(
{"num_gpus": self.gpu_per_node}
if current_platform.ray_device_key == "GPU"
else {"resources": {current_platform.ray_device_key: self.gpu_per_node}}
)
).remote(current_platform.device_control_env_var)
for pg in self.placement_groups
])

# --- Defensive Resource Allocation (Support for Ray 2.0+ and Legacy) ---
try:
# 1. Attempt Modern Ray Style (num_cpus / num_gpus)
bundles = []
for i in range(self.num_nodes):
node = nodes_maybe_used[i]
node_cpu = int(node["Resources"].get("num_cpus") or node["Resources"].get("CPU", 0))
bundles.append({"num_gpus": self.gpu_per_node, "num_cpus": max(node_cpu / 2, 1)})

self.placement_groups = [ray.util.placement_group([bundle]) for bundle in bundles]
ray.get([pg.ready() for pg in self.placement_groups])

gpu_ranks = ray.get([
get_visible_gpus.options(
placement_group=pg,
num_gpus=self.gpu_per_node
).remote(current_platform.device_control_env_var)
for pg in self.placement_groups
])
except (ValueError, Exception) as e:
logger.warning(f"Modern Ray resource allocation failed, falling back to legacy style. Error: {e}")

# 2. Fallback to Legacy Style (CPU / GPU)
bundles = []
for i in range(self.num_nodes):
node = nodes_maybe_used[i]
node_cpu = int(node["Resources"].get("CPU") or node["Resources"].get("num_cpus", 0))
bundles.append({current_platform.ray_device_key: self.gpu_per_node, "CPU": max(node_cpu / 2, 1)})

self.placement_groups = [ray.util.placement_group([bundle]) for bundle in bundles]
ray.get([pg.ready() for pg in self.placement_groups])

gpu_ranks = ray.get([
get_visible_gpus.options(
placement_group=pg,
**(
{"num_gpus": self.gpu_per_node}
if current_platform.ray_device_key == "GPU"
else {"resources": {current_platform.ray_device_key: self.gpu_per_node}}
)
).remote(current_platform.device_control_env_var)
for pg in self.placement_groups
])
print(f"gpu ranks: {gpu_ranks}")
self.node_ranks = ray.get(
[get_node_rank.options(placement_group=pg).remote() for pg in self.placement_groups])
Expand Down
5 changes: 5 additions & 0 deletions roll/pipeline/agentic/env/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
except Exception as e:
logger.info(f"Failed to register openreward_env: {e}")

try:
gem.register("atropos_env", entry_point="roll.pipeline.agentic.env.atropos:AtroposEnv")
except Exception as e:
logger.info(f"Failed to register atropos_env: {e}")

try:
# add webshop-minimal to PYTHONPATH
import os
Expand Down
3 changes: 3 additions & 0 deletions roll/pipeline/agentic/env/atropos/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from roll.pipeline.agentic.env.atropos.atropos_env import AtroposEnv

__all__ = ["AtroposEnv"]
Loading