- Overview
- Architecture
- Module Structure
- Configuration
- Client APIs
- Worker Framework
- Models
- LLM/AI Tasks
- HTTP Poll Task
- Wait for Webhook Task
- State Change Events
- Fork/Join with Script
- Events System
- Metrics
- Error Handling
- Examples
The Conductor Rust SDK is a comprehensive, async-first client library for Netflix Conductor workflow orchestration platform. It follows the same architecture as the Python SDK while leveraging Rust's strengths in type safety and async programming.
- Async/Await Support: Built on Tokio runtime for high-performance async I/O
- Batch Polling: Efficiently poll multiple tasks in a single request
- Automatic Retry: Configurable retry logic with exponential backoff for task updates
- Event-Driven Architecture: Publish/subscribe events for monitoring and metrics
- Prometheus Metrics: Built-in metrics collection with HTTP endpoint
- Type-Safe Models: Strongly typed workflow and task definitions
- Hierarchical Configuration: Environment variable support with worker-specific overrides
┌─────────────────────────────────────────────────────────────────────┐
│ Application Layer │
├─────────────────────────────────────────────────────────────────────┤
│ WorkerHost / TaskHandler │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ TaskRunner │ │ TaskRunner │ │ TaskRunner │ ... │
│ │ (Worker 1) │ │ (Worker 2) │ │ (Worker N) │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ └────────────────┼────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────────┐ │
│ │ EventDispatcher │ │
│ │ ┌─────────────────┐ ┌─────────────────┐ │ │
│ │ │ MetricsCollector│ │ Custom Listener │ ... │ │
│ │ └─────────────────┘ └─────────────────┘ │ │
│ └───────────────────────────────────────────────────────────────┘ │
├─────────────────────────────────────────────────────────────────────┤
│ Client Layer │
│ ┌───────────────────────────────────────────────────────────────┐ │
│ │ ConductorClient │ │
│ │ ┌─────────────┐ ┌────────────────┐ ┌──────────────────┐ │ │
│ │ │ TaskClient │ │ WorkflowClient │ │ MetadataClient │ │ │
│ │ └──────┬──────┘ └───────┬────────┘ └────────┬─────────┘ │ │
│ │ └─────────────────┼───────────────────┘ │ │
│ └───────────────────────────┼───────────────────────────────────┘ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────────┐ │
│ │ ApiClient │ │
│ │ • HTTP request/response handling │ │
│ │ • Authentication token management │ │
│ │ • Automatic token refresh │ │
│ │ • Error handling & retries │ │
│ └───────────────────────────────────────────────────────────────┘ │
├─────────────────────────────────────────────────────────────────────┤
│ Conductor Server │
└─────────────────────────────────────────────────────────────────────┘
- Separation of Concerns: Clear boundaries between HTTP, client, worker, and application layers
- Composition over Inheritance: Use traits and composition for extensibility
- Fail-Fast with Recovery: Immediate error reporting with configurable retry mechanisms
- Observable by Default: Built-in events and metrics for monitoring
conductor-rust/
├── src/
│ ├── lib.rs # Public API exports
│ ├── configuration/
│ │ ├── mod.rs
│ │ ├── settings.rs # Configuration struct
│ │ └── worker_config.rs # Worker-specific config
│ ├── client/
│ │ ├── mod.rs
│ │ ├── conductor_client.rs # Main client entry point
│ │ ├── task_client.rs # Task operations
│ │ ├── workflow_client.rs # Workflow operations
│ │ └── metadata_client.rs # Definition management
│ ├── http/
│ │ ├── mod.rs
│ │ └── api_client.rs # Low-level HTTP client
│ ├── worker/
│ │ ├── mod.rs
│ │ ├── worker_trait.rs # Worker trait & FnWorker
│ │ ├── task_runner.rs # Polling loop implementation
│ │ ├── task_handler.rs # Multi-worker management
│ │ └── worker_host.rs # High-level hosting API
│ ├── models/
│ │ ├── mod.rs
│ │ ├── task.rs # Task, TaskStatus, TaskExecLog
│ │ ├── task_def.rs # TaskDef, RetryLogic, TimeoutPolicy
│ │ ├── task_result.rs # TaskResult, TaskResultStatus
│ │ ├── workflow.rs # Workflow, WorkflowStatus
│ │ └── workflow_def.rs # WorkflowDef, WorkflowTask, TaskType
│ ├── events/
│ │ ├── mod.rs
│ │ ├── dispatcher.rs # EventDispatcher
│ │ └── task_runner_events.rs # Event types & listener trait
│ ├── metrics/
│ │ ├── mod.rs
│ │ ├── settings.rs # MetricsSettings
│ │ └── collector.rs # MetricsCollector (Prometheus)
│ └── error.rs # ConductorError, Result<T>
├── examples/
│ ├── hello_world.rs # Basic usage
│ ├── async_workers.rs # Multiple workers with metrics
│ └── dynamic_workflow.rs # Dynamic workflow execution
└── tests/
└── integration_tests.rs # Integration tests
use conductor::Configuration;
use std::time::Duration;
// From environment variables
let config = Configuration::from_env();
// Programmatic configuration
let config = Configuration::new("http://localhost:8080/api")
.with_auth("key", "secret")
.with_timeout(Duration::from_secs(30))
.with_debug(true);| Variable | Description | Default |
|---|---|---|
CONDUCTOR_SERVER_URL |
Conductor server API URL | http://localhost:8080/api |
CONDUCTOR_AUTH_KEY |
Authentication key | None |
CONDUCTOR_AUTH_SECRET |
Authentication secret | None |
CONDUCTOR_DEBUG |
Enable debug logging | false |
CONDUCTOR_TIMEOUT_SECS |
Request timeout (seconds) | 30 |
Worker-specific settings can be configured via environment variables with hierarchical precedence:
- Worker-specific:
CONDUCTOR_WORKER_{WORKER_NAME}_{PROPERTY} - Global:
CONDUCTOR_WORKER_ALL_{PROPERTY} - Code defaults
| Variable Pattern | Description |
|---|---|
CONDUCTOR_WORKER_ALL_POLL_INTERVAL |
Default poll interval (ms) |
CONDUCTOR_WORKER_ALL_THREAD_COUNT |
Default concurrent executions |
CONDUCTOR_WORKER_ALL_DOMAIN |
Default task domain |
CONDUCTOR_WORKER_{NAME}_POLL_INTERVAL |
Worker-specific poll interval |
CONDUCTOR_WORKER_{NAME}_THREAD_COUNT |
Worker-specific thread count |
CONDUCTOR_WORKER_{NAME}_DOMAIN |
Worker-specific domain |
The main entry point providing access to all API clients.
use conductor::{ConductorClient, Configuration};
let client = ConductorClient::new(Configuration::from_env())?;
// Access specialized clients
let task_client = client.task_client();
let workflow_client = client.workflow_client();
let metadata_client = client.metadata_client();Operations for task polling and updates.
| Method | Description |
|---|---|
poll_task() |
Poll for a single task |
batch_poll() |
Poll for multiple tasks |
update_task() |
Update task result |
update_task_with_retry() |
Update with automatic retry |
get_task() |
Get task by ID |
get_tasks_in_progress() |
Get running tasks for a type |
add_task_log() |
Add execution log |
get_task_logs() |
Get task logs |
get_queue_sizes() |
Get queue depths |
remove_task_from_queue() |
Remove task from queue |
// Batch poll for tasks
let tasks = task_client
.batch_poll("my_task", Some("worker-1"), None, 10, Duration::from_secs(1))
.await?;
// Update task with retry
let result = TaskResult::completed(&task.task_id, &task.workflow_instance_id)
.with_output_value("result", "done");
task_client.update_task_with_retry(&result, 3).await?;Operations for workflow execution and lifecycle management.
| Method | Description |
|---|---|
start_workflow() |
Start workflow asynchronously |
execute_workflow() |
Start and wait for completion |
get_workflow() |
Get workflow by ID |
get_workflow_status() |
Get status with optional output |
terminate_workflow() |
Terminate running workflow |
pause_workflow() |
Pause workflow |
resume_workflow() |
Resume paused workflow |
retry_workflow() |
Retry failed workflow |
restart_workflow() |
Restart from beginning |
rerun_workflow() |
Rerun from specific task |
update_variables() |
Update workflow variables |
skip_task() |
Skip a task in workflow |
search_workflows() |
Search workflows |
get_running_workflows() |
Get running workflow IDs |
delete_workflow() |
Delete workflow execution |
test_workflow() |
Dry run with mocked outputs |
use conductor::models::StartWorkflowRequest;
// Start workflow
let request = StartWorkflowRequest::new("my_workflow")
.with_version(1)
.with_input_value("name", "John");
let workflow_id = workflow_client.start_workflow(&request).await?;
// Execute and wait
let workflow = workflow_client
.execute_workflow(&request, Duration::from_secs(30))
.await?;Operations for managing workflow and task definitions.
| Method | Description |
|---|---|
register_workflow_def() |
Register workflow definition |
update_workflow_def() |
Update workflow definition |
register_or_update_workflow_def() |
Upsert workflow definition |
get_workflow_def() |
Get workflow definition |
get_all_workflow_def_versions() |
Get all versions |
get_all_workflow_defs() |
List all definitions |
delete_workflow_def() |
Delete workflow definition |
register_task_def() |
Register task definition |
register_task_defs() |
Register multiple tasks |
update_task_def() |
Update task definition |
get_task_def() |
Get task definition |
get_all_task_defs() |
List all task definitions |
delete_task_def() |
Delete task definition |
task_def_exists() |
Check if task exists |
workflow_def_exists() |
Check if workflow exists |
use conductor::models::{TaskDef, WorkflowDef, WorkflowTask, TimeoutPolicy};
// Register task definition
let task_def = TaskDef::new("my_task")
.with_description("My task")
.with_retry(3, RetryLogic::Fixed, 10)
.with_timeout(60, TimeoutPolicy::TimeOutWf);
metadata_client.register_task_def(&task_def).await?;
// Register workflow definition
let workflow_def = WorkflowDef::new("my_workflow")
.with_version(1)
.with_task(WorkflowTask::simple("my_task", "task_ref_1"));
metadata_client.register_workflow_def(&workflow_def).await?;The core trait for implementing task workers.
use async_trait::async_trait;
use conductor::{Worker, Task, error::Result, worker::WorkerOutput};
#[async_trait]
pub trait Worker: Send + Sync {
/// Task type this worker handles
fn task_definition_name(&self) -> &str;
/// Execute the task
async fn execute(&self, task: &Task) -> Result<WorkerOutput>;
// Optional overrides with defaults
fn identity(&self) -> String { /* hostname-pid */ }
fn domain(&self) -> Option<&str> { None }
fn poll_interval_millis(&self) -> u64 { 100 }
fn thread_count(&self) -> usize { 1 }
}pub enum WorkerOutput {
/// Task completed with output data
Completed(HashMap<String, Value>),
/// Task failed with error message
Failed(String),
/// Task still in progress (for long-running tasks)
InProgress(TaskInProgress),
}
// Convenience constructors
WorkerOutput::completed_with_result("result_value")
WorkerOutput::completed(hashmap!{"key" => "value"})
WorkerOutput::failed("Error message")
WorkerOutput::in_progress(60) // callback after 60 secondsFunction-based worker for simple use cases.
use conductor::worker::FnWorker;
let worker = FnWorker::new("my_task", |task: Task| async move {
let input = task.get_input_string("name").unwrap_or_default();
Ok(WorkerOutput::completed_with_result(format!("Hello, {}!", input)))
})
.with_thread_count(4)
.with_poll_interval_millis(500)
.with_domain("my-domain");For a more declarative style similar to Python's @worker_task decorator, enable the macros feature:
[dependencies]
conductor-rust = { version = "0.1", features = ["macros"] }Then use the #[worker] attribute macro:
use conductor_macros::worker;
// Simple worker with automatic parameter extraction
#[worker(name = "greet", thread_count = 5)]
async fn greet(name: String) -> String {
format!("Hello, {}!", name)
}
// Worker with multiple parameters
#[worker(name = "calculate", poll_interval = 200)]
async fn calculate(a: i32, b: i32) -> serde_json::Value {
serde_json::json!({ "sum": a + b })
}
// Worker with full Task access
#[worker(name = "process", domain = "premium")]
async fn process(task: Task) -> WorkerOutput {
let data = task.get_input_string("data").unwrap();
WorkerOutput::completed_with_result(data)
}
// Worker with Result return type
#[worker(name = "validate")]
async fn validate(value: String) -> Result<String, String> {
if value.is_empty() {
Err("Value cannot be empty".to_string())
} else {
Ok(format!("Valid: {}", value))
}
}
// Usage - macro generates {fn_name}_worker() functions
let mut handler = TaskHandler::new(config)?;
handler.add_worker(greet_worker());
handler.add_worker(calculate_worker());
handler.add_worker(process_worker());
handler.add_worker(validate_worker());Generate JSON Schemas from Rust types for input/output validation:
use conductor::schema::generate_schema;
use schemars::JsonSchema;
#[derive(JsonSchema)]
struct OrderInput {
order_id: String,
amount: f64,
items: Vec<String>,
}
#[derive(JsonSchema)]
struct OrderOutput {
status: String,
processed_at: String,
}
// Generate schema with strict mode (additionalProperties: false)
let input_schema = generate_schema::<OrderInput>(true);
let output_schema = generate_schema::<OrderOutput>(true);
// Attach schemas to worker - auto-registered when register_task_def=true
let worker = FnWorker::new("process_order", handler)
.with_input_schema(input_schema)
.with_output_schema(output_schema);
// Or use the convenience methods
let worker = FnWorker::new("process_order", handler)
.with_input_schema_from::<OrderInput>(true)
.with_output_schema_from::<OrderOutput>(true);Access task metadata including poll_count for long-running tasks:
use conductor::worker::TaskContext;
let worker = FnWorker::new("long_task", |task: Task| async move {
let ctx = TaskContext::from_task(&task);
// Access metadata
println!("Poll count: {}", ctx.poll_count());
println!("Retry count: {}", ctx.retry_count());
println!("Task ID: {}", ctx.task_id());
println!("Workflow ID: {}", ctx.workflow_instance_id());
// Convenience methods
if ctx.is_first_poll() {
// Initialize task
}
if ctx.is_retry() {
// Handle retry differently
}
if ctx.poll_count() < 5 {
Ok(WorkerOutput::in_progress(30))
} else {
Ok(WorkerOutput::completed_with_result("done"))
}
});Manages multiple workers with lifecycle control.
use conductor::{Configuration, worker::TaskHandler};
let mut handler = TaskHandler::builder(Configuration::from_env())
.worker(worker1)
.worker(worker2)
.metrics(MetricsSettings::default().with_http_port(9090))
.build()?;
handler.start().await?;
// Control individual workers
handler.pause_worker("task_type_1");
handler.resume_worker("task_type_1");
// Control all workers
handler.pause_all();
handler.resume_all();
// Graceful shutdown
handler.stop().await?;High-level API with automatic shutdown handling.
use conductor::{WorkerHost, Configuration};
let host = WorkerHost::builder(Configuration::from_env())
.worker(my_worker)
.with_metrics(MetricsSettings::default().with_http_port(9090))
.start()
.await?;
// Runs until Ctrl+C
host.wait_for_shutdown().await?;let (builder, shutdown_tx) = WorkerHost::builder(config)
.worker(my_worker)
.with_shutdown_channel();
let host = builder.start().await?;
// Later, trigger shutdown programmatically
shutdown_tx.send(()).ok();| Type | Description |
|---|---|
Task |
Task instance with input/output data |
TaskStatus |
Scheduled, InProgress, Completed, Failed, FailedWithTerminalError, Canceled, Skipped, TimedOut |
TaskExecLog |
Task execution log entry |
TaskResult |
Result submitted back to Conductor |
TaskResultStatus |
Completed, Failed, FailedWithTerminalError, InProgress |
TaskInProgress |
Marker for long-running tasks |
TaskDef |
Task definition/metadata |
RetryLogic |
Fixed, ExponentialBackoff, LinearBackoff |
TimeoutPolicy |
Retry, TimeOutWf, AlertOnly |
| Type | Description |
|---|---|
Workflow |
Workflow execution instance |
WorkflowStatus |
Running, Completed, Failed, TimedOut, Terminated, Paused |
StartWorkflowRequest |
Request to start a workflow |
WorkflowDef |
Workflow definition |
WorkflowTask |
Task within a workflow definition |
TaskType |
Simple, SubWorkflow, Fork, Join, Switch, DoWhile, Http, Inline, HttpPoll, WaitForWebhook, LlmTextComplete, LlmChatComplete, LlmGenerateEmbeddings, LlmSearchIndex, LlmIndexText, LlmIndexDocument, LlmGetEmbeddings, GetDocument, etc. |
SubWorkflowParams |
Parameters for subworkflow tasks |
WorkflowTimeoutPolicy |
TimeOutWf, AlertOnly |
StateChangeConfig |
Configuration for task state change events |
StateChangeEvent |
Event dispatched on task state transitions |
ChatMessage |
Message for LLM chat completions |
All models support fluent builder patterns:
// Task Definition
let task_def = TaskDef::new("process_order")
.with_description("Process customer orders")
.with_retry(3, RetryLogic::ExponentialBackoff, 10)
.with_timeout(300, TimeoutPolicy::TimeOutWf)
.with_rate_limit(100, 60)
.with_owner("team@example.com");
// Workflow Definition
let workflow_def = WorkflowDef::new("order_workflow")
.with_version(1)
.with_description("Order processing workflow")
.with_task(WorkflowTask::simple("validate_order", "validate_ref"))
.with_task(WorkflowTask::simple("process_order", "process_ref"))
.with_task(WorkflowTask::simple("notify_customer", "notify_ref"))
.with_output_param("orderId", "${process_ref.output.orderId}")
.with_timeout(3600, WorkflowTimeoutPolicy::TimeOutWf);
// Workflow Task Variants
WorkflowTask::simple("task_name", "reference_name")
WorkflowTask::sub_workflow("ref_name", "child_workflow")
WorkflowTask::http("ref_name", "https://api.example.com/endpoint")
WorkflowTask::inline("ref_name", "function e() { return $.input * 2; }")
WorkflowTask::wait("wait_ref")The SDK supports Orkes AI/LLM tasks for building AI-powered workflows. These tasks integrate with various LLM providers (OpenAI, Azure, Anthropic, etc.) through Orkes' AI integration layer.
| Task Type | Description |
|---|---|
LlmTextComplete |
Text completion using an LLM model |
LlmChatComplete |
Chat completion with conversation context |
LlmGenerateEmbeddings |
Generate vector embeddings from text |
LlmSearchIndex |
Search a vector database index |
LlmIndexText |
Index text into a vector database |
LlmIndexDocument |
Index a document into a vector database |
LlmGetEmbeddings |
Query existing embeddings |
GetDocument |
Fetch document content from URL |
use conductor::models::{WorkflowDef, WorkflowTask};
let workflow = WorkflowDef::new("text_completion_workflow")
.with_task(
WorkflowTask::llm_text_complete("completion_ref", "my_llm_provider", "gpt-4")
.with_instructions_template("Summarize the following text: ${workflow.input.text}")
.with_temperature(0.7)
.with_max_tokens(500)
);use conductor::models::{WorkflowDef, WorkflowTask, ChatMessage};
let workflow = WorkflowDef::new("chat_workflow")
.with_task(
WorkflowTask::llm_chat_complete("chat_ref", "my_llm_provider", "gpt-4")
.with_messages(vec![
ChatMessage::system("You are a helpful assistant."),
ChatMessage::user("${workflow.input.question}"),
])
.with_temperature(0.8)
.with_max_tokens(1000)
);// Generate embeddings
let embed_task = WorkflowTask::llm_generate_embeddings(
"embed_ref",
"my_embedding_provider",
"text-embedding-ada-002"
)
.with_input_param("text", "${workflow.input.document}");
// Index text to vector DB
let index_task = WorkflowTask::llm_index_text(
"index_ref",
"my_embedding_provider",
"text-embedding-ada-002",
"my_vector_db",
"my_index"
)
.with_namespace("documents")
.with_input_param("text", "${workflow.input.document}");
// Search vector DB
let search_task = WorkflowTask::llm_search_index(
"search_ref",
"my_embedding_provider",
"text-embedding-ada-002",
"my_vector_db",
"my_index"
)
.with_namespace("documents")
.with_max_results(5)
.with_input_param("query", "${workflow.input.query}");Use prompt variables for dynamic prompt templates:
let task = WorkflowTask::llm_text_complete("ref", "provider", "model")
.with_instructions_template("Translate to ${target_language}: ${text}")
.with_prompt_variable("target_language", "${workflow.input.language}")
.with_prompt_variable("text", "${workflow.input.content}");The HTTP Poll task repeatedly calls an HTTP endpoint until a termination condition is met.
use conductor::models::{WorkflowDef, WorkflowTask};
let workflow = WorkflowDef::new("polling_workflow")
.with_task(
WorkflowTask::http_poll("poll_ref", "https://api.example.com/status/${workflow.input.job_id}")
.with_method("GET")
.with_polling_interval(5) // Poll every 5 seconds
.with_polling_strategy("FIXED")
.with_termination_condition("(function(){ return $.output.body.status === 'COMPLETED'; })();")
);| Parameter | Description |
|---|---|
polling_interval |
Milliseconds between polls |
polling_strategy |
FIXED or EXPONENTIAL_BACKOFF |
termination_condition |
JavaScript expression that returns true when polling should stop |
The Wait for Webhook task pauses workflow execution until an external webhook is received.
use conductor::models::{WorkflowDef, WorkflowTask};
use serde_json::json;
let workflow = WorkflowDef::new("webhook_workflow")
.with_task(
WorkflowTask::wait_for_webhook("webhook_ref")
.with_matches(json!({
"type": "payment_received",
"order_id": "${workflow.input.order_id}"
}))
);When a webhook is received that matches the criteria, the workflow continues with the webhook payload as the task output.
Configure tasks to emit events on state transitions for audit logging, monitoring, or triggering external systems.
| Event Type | Description |
|---|---|
OnScheduled |
Task is scheduled for execution |
OnStart |
Task execution begins |
OnCompleted |
Task completes successfully |
OnFailed |
Task fails |
OnCancelled |
Task is cancelled |
use conductor::models::{WorkflowTask, StateChangeConfig, StateChangeEvent, StateChangeEventType};
let task = WorkflowTask::simple("process_order", "process_ref")
.with_state_change(
StateChangeConfig::new()
.with_event(StateChangeEvent::new(StateChangeEventType::OnStart)
.with_type("task:audit")
.with_payload(json!({
"task": "${task.taskDefName}",
"workflow": "${workflow.workflowType}",
"timestamp": "${task.startTime}"
})))
.with_event(StateChangeEvent::new(StateChangeEventType::OnCompleted)
.with_type("task:audit")
.with_payload(json!({
"task": "${task.taskDefName}",
"status": "COMPLETED",
"duration_ms": "${task.endTime - task.startTime}"
})))
.with_event(StateChangeEvent::new(StateChangeEventType::OnFailed)
.with_type("task:audit")
.with_payload(json!({
"task": "${task.taskDefName}",
"status": "FAILED",
"error": "${task.reasonForIncompletion}"
})))
);Customize join behavior using a JavaScript evaluator script:
use conductor::models::{WorkflowDef, WorkflowTask};
// Fork with dynamic join completion
let fork_tasks = vec![
vec![WorkflowTask::simple("task_a", "task_a_ref")],
vec![WorkflowTask::simple("task_b", "task_b_ref")],
vec![WorkflowTask::simple("task_c", "task_c_ref")],
];
let workflow = WorkflowDef::new("fork_join_script_workflow")
.with_task(WorkflowTask::fork_join("fork_ref", fork_tasks, "join_ref"))
.with_task(
WorkflowTask::join("join_ref", vec!["task_a_ref", "task_b_ref", "task_c_ref"])
.with_join_script("(function(){ return $.joinOn.length >= 2; })();")
);The join script receives $.joinOn containing completed task references and returns true when enough tasks have completed.
| Event | Description | Fields |
|---|---|---|
PollStarted |
Polling begins | task_type, worker_id, poll_count |
PollCompleted |
Polling succeeds | task_type, worker_id, duration, tasks_received |
PollFailure |
Polling fails | task_type, worker_id, duration, error |
TaskExecutionStarted |
Task execution begins | task_type, task_id, workflow_instance_id, worker_id |
TaskExecutionCompleted |
Task execution succeeds | task_type, task_id, duration, output_size_bytes |
TaskExecutionFailure |
Task execution fails | task_type, task_id, duration, error, is_retryable |
TaskUpdateFailure |
Task update fails | task_type, task_id, error, retry_count |
use conductor::events::{TaskRunnerEventsListener, PollCompleted, TaskExecutionCompleted};
struct MyListener;
impl TaskRunnerEventsListener for MyListener {
fn on_poll_completed(&self, event: &PollCompleted) {
println!("Polled {} tasks in {:?}", event.tasks_received, event.duration);
}
fn on_task_execution_completed(&self, event: &TaskExecutionCompleted) {
println!("Task {} completed in {:?}", event.task_id, event.duration);
}
}
// Register listener
handler.add_event_listener(Arc::new(MyListener));┌──────────────┐ ┌──────────────┐ ┌──────────────────┐
│ TaskRunner │───▶│EventDispatcher│───▶│ Listener 1 │
│ │ │ │ │ (MetricsCollector)│
│ • poll │ │ • publish_* │ └──────────────────┘
│ • execute │ │ │ ┌──────────────────┐
│ • update │ │ │───▶│ Listener 2 │
└──────────────┘ └──────────────┘ │ (Custom Logger) │
└──────────────────┘
use conductor::MetricsSettings;
let settings = MetricsSettings::default()
.with_http_port(9090) // Serve metrics on :9090/metrics
.with_metrics_path("/metrics")
.with_namespace("conductor")
.with_update_interval(Duration::from_secs(5));| Metric | Type | Labels | Description |
|---|---|---|---|
conductor_task_poll_total |
Counter | task_type |
Total poll attempts |
conductor_task_poll_error_total |
Counter | task_type, error_type |
Poll errors |
conductor_task_execute_error_total |
Counter | task_type, error_type |
Execution errors |
conductor_task_update_error_total |
Counter | task_type |
Update errors |
conductor_task_paused_total |
Counter | task_type |
Polls while paused |
conductor_task_poll_time_seconds |
Histogram | task_type, status |
Poll latency |
conductor_task_execute_time_seconds |
Histogram | task_type, status |
Execution time |
conductor_task_result_size_bytes |
Gauge | task_type |
Result payload size |
conductor_active_workers |
Gauge | task_type |
Active worker count |
// Via HTTP endpoint (if configured)
// GET http://localhost:9090/metrics
// Programmatically
if let Some(collector) = handler.metrics_collector() {
let metrics_text = collector.gather();
println!("{}", metrics_text);
}pub enum ConductorError {
Http(reqwest::Error), // HTTP request failed
Json(serde_json::Error), // Serialization failed
Config(String), // Configuration error
Auth(String), // Authentication error
TaskExecution(String), // Task execution error
TaskNotFound(String), // Task not found
WorkflowNotFound(String), // Workflow not found
Workflow(String), // Workflow error
Worker(String), // Worker error
Timeout(String), // Timeout error
Server { status, message }, // Server error (5xx)
Api { message, code }, // API error
Internal(String), // Internal error
Io(std::io::Error), // IO error
Channel(String), // Channel error
}use conductor::error::{ConductorError, Result};
async fn process_task(task: &Task) -> Result<WorkerOutput> {
// Errors automatically converted via From trait
let data: MyData = task.get_input("data")
.ok_or_else(|| ConductorError::task_execution("Missing 'data' input"))?;
// Check if error is retryable
match some_operation().await {
Err(e) if e.is_retryable() => {
// Return in-progress to retry later
Ok(WorkerOutput::in_progress(30))
}
Err(e) => Err(e),
Ok(result) => Ok(WorkerOutput::completed_with_result(result)),
}
}pub type Result<T> = std::result::Result<T, ConductorError>;The SDK includes 26 examples covering all major features. Run examples with:
cargo run --example <example_name>
cargo run --example worker_macro_example --features macros| Example | Description |
|---|---|
hello_world |
Basic workflow with simple worker |
async_workers |
Multiple concurrent workers |
dynamic_workflow |
Build workflows programmatically |
http_task_example |
HTTP task integration |
sub_workflow_example |
Sub-workflow composition |
fork_join_example |
Parallel execution with fork/join |
switch_task_example |
Conditional branching |
do_while_example |
Loop constructs |
terminate_example |
Workflow termination |
set_variable_example |
Workflow variables |
inline_task_example |
Inline JavaScript tasks |
wait_task_example |
Wait/human tasks |
event_driven_example |
Event-driven workers |
metrics_example |
Prometheus metrics |
json_schema_example |
Input/output validation |
worker_macro_example |
#[worker] macro (requires macros feature) |
openai_helloworld |
LLM text completion |
llm_chat_example |
LLM chat with prompts |
vector_db_example |
RAG with vector DB |
http_poll_example |
HTTP polling task |
wait_for_webhook_example |
Webhook integration |
sync_state_update_example |
Synchronous state updates |
workflow_rerun_example |
Rerun from specific task |
fork_join_script_example |
Fork/join with completion script |
task_status_audit_example |
State change audit events |
dynamic_task_example |
Dynamic task routing |
use conductor::{
Configuration, WorkerHost,
models::{StartWorkflowRequest, WorkflowDef, WorkflowTask, TaskDef},
worker::{FnWorker, WorkerOutput},
};
#[tokio::main]
async fn main() -> conductor::error::Result<()> {
let config = Configuration::from_env();
let client = conductor::ConductorClient::new(config.clone())?;
// Register task
client.metadata_client()
.register_task_def(&TaskDef::new("greet"))
.await?;
// Register workflow
client.metadata_client()
.register_workflow_def(
&WorkflowDef::new("greeting_workflow")
.with_task(WorkflowTask::simple("greet", "greet_ref"))
)
.await?;
// Create worker
let worker = FnWorker::new("greet", |task| async move {
let name = task.get_input_string("name").unwrap_or_default();
Ok(WorkerOutput::completed_with_result(format!("Hello, {}!", name)))
});
// Start worker host
let host = WorkerHost::builder(config)
.worker(worker)
.start()
.await?;
// Start a workflow
let workflow_id = client.workflow_client()
.start_workflow(
&StartWorkflowRequest::new("greeting_workflow")
.with_input_value("name", "World")
)
.await?;
println!("Started workflow: {}", workflow_id);
host.wait_for_shutdown().await
}use conductor::{
Configuration, MetricsSettings,
worker::{FnWorker, TaskHandler, WorkerOutput},
};
#[tokio::main]
async fn main() -> conductor::error::Result<()> {
let config = Configuration::from_env();
let mut handler = TaskHandler::builder(config)
.worker(FnWorker::new("task_a", |_| async {
Ok(WorkerOutput::completed_with_result("A done"))
}).with_thread_count(4))
.worker(FnWorker::new("task_b", |_| async {
Ok(WorkerOutput::completed_with_result("B done"))
}).with_thread_count(2))
.metrics(MetricsSettings::default().with_http_port(9090))
.build()?;
handler.start().await?;
// Metrics available at http://localhost:9090/metrics
tokio::signal::ctrl_c().await?;
handler.stop().await
}FnWorker::new("long_task", |task| async move {
let progress: i32 = task.get_input("progress").unwrap_or(0);
if progress < 100 {
// Report progress and callback in 10 seconds
Ok(WorkerOutput::InProgress(
TaskInProgress::new(10)
.with_output_value("progress", progress + 10)
.with_output_value("status", "Processing...")
))
} else {
Ok(WorkerOutput::completed_with_result("Complete!"))
}
})use async_trait::async_trait;
use conductor::{Worker, Task, error::Result, worker::WorkerOutput};
struct OrderProcessor {
db_pool: DatabasePool,
}
#[async_trait]
impl Worker for OrderProcessor {
fn task_definition_name(&self) -> &str {
"process_order"
}
fn thread_count(&self) -> usize {
8
}
fn poll_interval_millis(&self) -> u64 {
200
}
async fn execute(&self, task: &Task) -> Result<WorkerOutput> {
let order_id: String = task.get_input("orderId")
.ok_or_else(|| ConductorError::task_execution("Missing orderId"))?;
// Process order...
let result = self.db_pool.process(&order_id).await?;
Ok(WorkerOutput::completed_with_result(result))
}
}| Feature | Rust SDK | Python SDK |
|---|---|---|
| Async Runtime | Tokio | asyncio |
| Type Safety | Compile-time | Runtime |
| Worker Trait | async_trait |
abc.ABC |
| Configuration | Configuration |
Configuration |
| Environment Variables | Same naming | Same naming |
| Event System | EventDispatcher |
EventDispatcher |
| Metrics | Prometheus | Prometheus |
| Batch Polling | Yes | Yes |
| Auto Retry | Yes | Yes |
All public types in this SDK are designed to be Send + Sync:
ConductorClient,TaskClient,WorkflowClient,MetadataClient- All cloneable and thread-safeTaskHandler- Thread-safe, workers can be paused/resumed from any threadEventDispatcher- Thread-safe registration and publishingMetricsCollector- Thread-safe metric updates
Workers are wrapped in Arc<dyn Worker> and can be safely shared across threads.
- Batch Polling: Use batch polling to reduce HTTP overhead when processing many tasks
- Thread Count: Set appropriate
thread_countper worker based on task complexity - Poll Interval: Balance between responsiveness and server load
- Connection Pooling: The HTTP client maintains a connection pool internally
- Semaphore-based Concurrency: Task execution is controlled by semaphores to prevent overload
This SDK is provided under the same license as the Conductor project.