This document describes Ruff's concurrency primitives and their internal implementation.
Last Updated: January 27, 2026
Version: v0.9.0
- Overview
- Threading Model
- Async/Await Architecture
- Promises
- Channels
- Spawn Blocks
- Generators
- Concurrency Patterns
- Best Practices
- Performance Considerations
Ruff provides multiple concurrency primitives to handle different use cases:
- Async/Await: Promise-based asynchronous execution (currently synchronous, see Phase 5 in ROADMAP)
- Spawn Blocks: True parallel execution with OS threads
- Channels: Thread-safe message passing between concurrent tasks
- Generators: Lazy evaluation with cooperative multitasking (yield/resume)
| Feature | Type | OS Threads | Use Case |
|---|---|---|---|
| Async/Await | Concurrent | No (currently) | I/O-bound tasks |
| Spawn Blocks | Parallel | Yes | CPU-bound tasks |
| Generators | Concurrent | No | Lazy sequences |
| Channels | Either | Yes | Message passing |
Ruff uses Rust's standard library threading model with Arc<Mutex<>> for shared mutable state.
All shared data uses Arc (Atomic Reference Counting) for safe cross-thread ownership:
// Environment shared across threads
pub struct Environment {
parent: Option<Arc<Mutex<Environment>>>,
variables: HashMap<String, Value>,
functions: HashMap<String, Value>,
}
// Captured closure environment
Value::Function(
params,
body,
Some(Arc::Mutex<Environment>>) // Shared environment
)- Channel:
Arc<Mutex<(Sender<Value>, Receiver<Value>)>> - Promise:
Arc<Mutex<Receiver<Result<Value, String>>>> - Generator:
Arc<Mutex<Environment>>for state - Database Connection:
Arc<Mutex<Connection>>
Important: Async/await is currently synchronous - it wraps results in Promises but doesn't provide true concurrent I/O. Phase 5 (Tokio integration) will add true asynchronous execution.
Syntax:
async func fetch_data(url) {
# Function body
return data
}
AST Representation:
// src/ast.rs
pub enum Stmt {
FuncDef {
name: String,
params: Vec<String>,
body: Vec<Stmt>,
is_async: bool, // Marks async functions
},
// ...
}Runtime Value:
// src/interpreter/value.rs
pub enum Value {
AsyncFunction(
Vec<String>, // Parameters
LeakyFunctionBody, // Function body
Option<Arc<Mutex<Environment>>>, // Captured closure env
),
// ...
}Syntax:
result := await fetch_data("https://api.example.com")
Evaluation (src/interpreter/mod.rs:3876-3920):
Expr::Await(promise_expr) => {
let promise_value = self.eval_expr(promise_expr);
match promise_value {
Value::Promise { receiver, is_polled, cached_result } => {
// Check cache first (promises are single-use)
{
let cached = cached_result.lock().unwrap();
if let Some(ref result) = *cached {
return match result {
Ok(v) => v.clone(),
Err(e) => Value::Error(e.clone()),
};
}
}
// Poll the promise (blocks until result available)
let mut polled = is_polled.lock().unwrap();
if !*polled {
*polled = true;
drop(polled);
let result = receiver.lock().unwrap().recv();
match result {
Ok(Ok(value)) => {
// Cache result
*cached_result.lock().unwrap() = Some(Ok(value.clone()));
value
}
Ok(Err(err)) => {
*cached_result.lock().unwrap() = Some(Err(err.clone()));
Value::Error(err)
}
Err(_) => Value::Error("Promise channel closed".to_string()),
}
} else {
// Already polled - return cached result
let cached = cached_result.lock().unwrap();
match &*cached {
Some(Ok(v)) => v.clone(),
Some(Err(e)) => Value::Error(e.clone()),
None => Value::Error("Promise already consumed".to_string()),
}
}
}
other => other, // Not a promise, return as-is
}
}Promises represent the eventual result of an asynchronous operation.
// src/interpreter/value.rs:367-371
Value::Promise {
receiver: Arc<Mutex<std::sync::mpsc::Receiver<Result<Value, String>>>>,
is_polled: Arc<Mutex<bool>>,
cached_result: Arc<Mutex<Option<Result<Value, String>>>>,
}Fields:
receiver: Channel to receive async resultis_polled: Ensures promise is only polled oncecached_result: Stores result after polling (promises are single-use)
When calling an async function:
# Define async function
async func compute() {
return 42
}
# Call returns a Promise
promise := compute() # Returns Promise immediately
# Await to get result
result := await promise # Blocks until result available
Implementation:
// When calling async function
let (sender, receiver) = std::sync::mpsc::channel();
// Spawn thread to execute async body
std::thread::spawn(move || {
let result = execute_async_body(body, env);
let _ = sender.send(Ok(result));
});
// Return promise immediately
Value::Promise {
receiver: Arc::new(Mutex::new(receiver)),
is_polled: Arc::new(Mutex::new(false)),
cached_result: Arc::new(Mutex::new(None)),
}┌─────────────┐
│ Created │ ◄─── async function called
└──────┬──────┘
│
▼ await expression
┌─────────────┐
│ Polling │ ◄─── blocking on channel
└──────┬──────┘
│
▼ result received
┌─────────────┐
│ Resolved │ ◄─── result cached
└─────────────┘
Channels provide thread-safe message passing using Rust's std::sync::mpsc (multi-producer, single-consumer).
Syntax:
ch := channel() # Create new channel
Implementation (src/interpreter/native_functions/concurrency.rs:10-17):
use std::sync::mpsc;
let (sender, receiver) = mpsc::channel();
let channel = Arc::new(Mutex::new((sender, receiver)));
Value::Channel(channel)Syntax:
ch.send(42)
ch.send("hello")
ch.send([1, 2, 3])
Implementation:
// Extract sender from channel
let (sender, _) = channel_mutex.lock().unwrap();
sender.send(value).map_err(|_| "Channel closed")?;Syntax:
value := ch.receive() # Blocks until message available
Implementation:
// Extract receiver from channel
let (_, receiver) = channel_mutex.lock().unwrap();
match receiver.recv() {
Ok(value) => value,
Err(_) => Value::Error("Channel closed".to_string()),
}# Create channel
ch := channel()
# Producer thread
spawn {
for i in range(5) {
ch.send(i)
}
}
# Consumer (main thread)
for i in range(5) {
value := ch.receive()
print("Received: ${value}")
}
Output:
Received: 0
Received: 1
Received: 2
Received: 3
Received: 4
Spawn blocks execute code in true OS threads, enabling CPU-bound parallelism.
spawn {
# Code executes in separate thread
print("Hello from thread!")
}
// src/ast.rs
pub enum Stmt {
Spawn { body: Vec<Stmt> },
// ...
}Evaluation (src/interpreter/mod.rs:2391-2420):
Stmt::Spawn { body } => {
let body_clone = body.clone();
// Spawn OS thread
std::thread::spawn(move || {
// Create isolated interpreter for thread
let mut thread_interpreter = Interpreter::new();
// Execute body statements
for stmt in body_clone.iter() {
thread_interpreter.eval_stmt(stmt);
}
});
// Main thread continues immediately (non-blocking)
}- Isolation: Spawned code runs in isolated environment (no access to parent scope)
- Non-blocking: Main thread continues immediately
- No return value: Spawn blocks don't return values (use channels for communication)
- OS threads: Each spawn creates a real OS thread (not green threads)
ch := channel()
# Background computation
spawn {
result := expensive_computation()
ch.send(result)
}
# Main thread does other work
do_other_work()
# Get result when ready
result := ch.receive()
Generators provide lazy evaluation with cooperative multitasking using yield.
Syntax:
gen range(n) {
i := 0
while i < n {
yield i
i := i + 1
}
}
// src/ast.rs
pub enum Stmt {
GeneratorDef {
name: String,
params: Vec<String>,
body: Vec<Stmt>,
},
// ...
}
pub enum Expr {
Yield(Option<Box<Expr>>),
// ...
}Generator Definition (before calling):
Value::GeneratorDef(
Vec<String>, // Parameters
LeakyFunctionBody, // Body with yield points
)Generator Instance (after calling):
Value::Generator {
params: Vec<String>,
body: LeakyFunctionBody,
env: Arc<Mutex<Environment>>, // Persistent state
pc: usize, // Program counter (resume position)
is_exhausted: bool, // No more values to yield
}Evaluation:
Expr::Yield(value_expr) => {
let yielded = if let Some(expr) = value_expr {
self.eval_expr(expr)
} else {
Value::Null
};
// Signal to generator executor to pause and return value
Value::Return(Box::new(yielded))
}┌─────────────┐
│ Created │ ◄─── gen_instance := generator()
└──────┬──────┘
│
▼ .next() called
┌─────────────┐
│ Running │ ◄─── Execute until yield
└──────┬──────┘
│
▼ yield value
┌─────────────┐
│ Suspended │ ◄─── Save pc and env
└──────┬──────┘
│
▼ .next() called again
┌─────────────┐
│ Resumed │ ◄─── Continue from pc
└──────┬──────┘
│
▼ No more yields
┌─────────────┐
│ Exhausted │ ◄─── is_exhausted = true
└─────────────┘
gen := range(5)
loop {
value := gen.next()
if value == null {
break
}
print(value)
}
Output:
0
1
2
3
4
Use Case: Distribute work across multiple threads, collect results.
func fan_out_fan_in(items) {
ch := channel()
# Fan-out: Spawn worker threads
for item in items {
spawn {
result := process(item)
ch.send(result)
}
}
# Fan-in: Collect results
results := []
for i in range(len(items)) {
results.push(ch.receive())
}
return results
}
Use Case: Chain processing stages with channels.
func pipeline(data) {
ch1 := channel()
ch2 := channel()
# Stage 1: Input
spawn {
for item in data {
ch1.send(item)
}
}
# Stage 2: Transform
spawn {
loop {
item := ch1.receive()
if item == null { break }
ch2.send(transform(item))
}
}
# Stage 3: Output
results := []
for i in range(len(data)) {
results.push(ch2.receive())
}
return results
}
Use Case: Apply async function to array items concurrently.
async func async_map(items, async_fn) {
promises := []
for item in items {
promise := async_fn(item)
promises.push(promise)
}
results := []
for promise in promises {
results.push(await promise)
}
return results
}
# Usage
async func fetch(url) {
return http_get(url)
}
urls := ["https://api1.com", "https://api2.com", "https://api3.com"]
results := await async_map(urls, fetch)
Use Case: Limit concurrent tasks to avoid resource exhaustion.
func worker_pool(tasks, num_workers) {
task_ch := channel()
result_ch := channel()
# Spawn workers
for i in range(num_workers) {
spawn {
loop {
task := task_ch.receive()
if task == null { break }
result := execute_task(task)
result_ch.send(result)
}
}
}
# Send tasks
spawn {
for task in tasks {
task_ch.send(task)
}
}
# Collect results
results := []
for i in range(len(tasks)) {
results.push(result_ch.receive())
}
return results
}
- Async/Await: I/O-bound operations (HTTP, files, databases)
- Spawn: CPU-bound operations (computation, image processing)
- Generators: Lazy sequences, infinite streams
- Channels: Communication between concurrent tasks
Bad (prone to race conditions):
counter := 0 # Shared between threads
spawn {
counter := counter + 1 # Race condition!
}
spawn {
counter := counter + 1 # Race condition!
}
Good (use channels):
ch := channel()
spawn {
ch.send(1)
}
spawn {
ch.send(1)
}
counter := ch.receive() + ch.receive() # Safe: 2
Ensure receivers don't block forever:
ch := channel()
spawn {
for i in range(10) {
ch.send(i)
}
ch.send(null) # Signal completion
}
loop {
value := ch.receive()
if value == null { break }
process(value)
}
async func fetch_data(url) {
result := http_get(url)
if type(result) == "Error" {
return Err(result)
}
return Ok(result)
}
promise := fetch_data("https://api.example.com")
result := await promise
match result {
Ok(data) => print("Success: ${data}"),
Err(error) => print("Failed: ${error}"),
}
Don't spawn unbounded threads:
# Bad: Could spawn 1000 threads!
for i in range(1000) {
spawn {
process(i)
}
}
# Good: Use worker pool pattern
results := worker_pool(range(1000), 10) # Max 10 workers
- Creating OS threads is expensive (~100µs per thread)
- Consider worker pools for many small tasks
- Generators have near-zero overhead (no threads)
mpscchannels are lock-free and very fast (~50ns per message)- Prefer channels over shared mutable state
- Batch messages to reduce overhead
v0.9.0: Async functions execute synchronously - no true concurrency benefit yet.
Workaround: Use spawn for parallel I/O:
ch := channel()
spawn {
result := http_get("https://api1.com")
ch.send(result)
}
spawn {
result := http_get("https://api2.com")
ch.send(result)
}
# Now truly concurrent
result1 := ch.receive()
result2 := ch.receive()
Future (Phase 5): Tokio integration will provide true async I/O (10-100x faster for I/O-bound workloads).
- Generators have minimal overhead (just function call + state save)
- Perfect for large or infinite sequences
- No memory for unyielded values (lazy evaluation)
Example:
# Eager: Allocates 1 million integers in memory
nums := range(1000000)
# Lazy: Generates one at a time
gen nums := range(1000000)
- Deadlocks: Two threads waiting for each other
- Race Conditions: Shared mutable state without synchronization
- Channel Leaks: Receiver blocked forever (sender died)
Add logging:
spawn {
print("[Thread ${thread_id()}] Starting work")
result := do_work()
print("[Thread ${thread_id()}] Completed: ${result}")
ch.send(result)
}
Use timeouts (future feature):
# Will be added in Phase 5
result := ch.receive_timeout(5000) # 5 second timeout
if result == null {
print("ERROR: Timeout waiting for result")
}
Test with smaller workloads:
# Debug with 10 items first
# worker_pool(range(1000000), 100)
worker_pool(range(10), 2)
- Tokio integration for true async I/O
- Non-blocking HTTP, file, database operations
- 10-100x faster for I/O-bound workloads
- Async timeout support
- Select expressions: Wait on multiple channels
- Async iterators:
async for item in async_stream { ... } - Cancellation: Cancel async tasks mid-execution
- Task priorities: High/low priority task scheduling
- ARCHITECTURE.md - System architecture overview
- MEMORY.md - Memory management and ownership
- ROADMAP.md - Planned concurrency features
- examples/concurrency/ - Concurrency examples
Questions? Open an issue on GitHub or check the documentation.