diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md
new file mode 100644
index 0000000..07787fe
--- /dev/null
+++ b/DEVELOPMENT.md
@@ -0,0 +1,1141 @@
+# DEVELOPMENT
+
+Developer guide for contributing to worker-tetra, a RunPod Serverless worker for remote Python execution.
+
+## Table of Contents
+
+- [Getting Started](#getting-started)
+- [Development Workflow](#development-workflow)
+- [Testing Strategy](#testing-strategy)
+- [Code Quality & Standards](#code-quality--standards)
+- [Architecture Overview](#architecture-overview)
+- [Common Development Tasks](#common-development-tasks)
+- [Docker Development](#docker-development)
+- [Submodule Management](#submodule-management)
+- [CI/CD Pipeline](#cicd-pipeline)
+- [Contributing Guidelines](#contributing-guidelines)
+- [Debugging Guide](#debugging-guide)
+- [Troubleshooting](#troubleshooting)
+
+## Getting Started
+
+### Prerequisites
+
+- Python 3.11+ (3.12 recommended)
+- Docker Desktop (for container testing)
+- `uv` package manager ([installation](https://github.com/astral-sh/uv))
+- Git with submodule support
+
+### Initial Setup
+
+```bash
+# Clone repository with submodules
+git clone --recurse-submodules https://github.com/runpod/worker-tetra.git
+cd worker-tetra
+
+# Initialize project (creates venv, syncs deps, updates submodules)
+make setup
+
+# Activate virtual environment
+source .venv/bin/activate
+
+# Verify setup with tests
+make test
+```
+
+### Environment Variables
+
+Create a `.env` file (gitignored) for local development:
+
+```bash
+# Optional - only needed for RunPod integration testing
+RUNPOD_API_KEY=your_key_here
+
+# Optional - for HuggingFace private models
+HF_TOKEN=your_token_here
+```
+
+## Development Workflow
+
+### Standard Development Cycle
+
+1. **Create feature branch**
+ ```bash
+ git checkout -b feature/TICKET-description
+ ```
+
+2. **Write failing tests first** (TDD approach)
+ ```bash
+ # Add test to tests/unit/test_*.py
+ make test-fast # Should fail
+ ```
+
+3. **Implement feature**
+ ```bash
+ # Edit src/*.py
+ make test-fast # Should pass
+ ```
+
+4. **Run quality checks**
+ ```bash
+ make quality-check # Format, lint, typecheck, coverage, handler tests
+ ```
+
+5. **Commit and push**
+ ```bash
+ git add .
+ git commit -m "feat(component): description"
+ git push origin feature/TICKET-description
+ ```
+
+### Daily Commands
+
+```bash
+# Install/sync dependencies after pulling changes
+make dev
+
+# Run tests during development (fail-fast mode)
+make test-fast
+
+# Format code before committing
+make format
+
+# Check everything before push
+make quality-check
+```
+
+## Testing Strategy
+
+### Test Organization
+
+```
+tests/
+├── unit/ # Fast, isolated component tests
+│ ├── test_function_executor.py
+│ ├── test_class_executor.py
+│ ├── test_dependency_installer.py
+│ └── ...
+├── integration/ # End-to-end workflow tests
+│ ├── test_handler_integration.py
+│ └── test_remote_execution.py
+└── conftest.py # Shared fixtures
+```
+
+### When to Write Each Test Type
+
+**Unit Tests** (`tests/unit/`):
+- Testing individual components in isolation
+- Mocking external dependencies
+- Fast execution (< 100ms per test)
+- Example: Testing `FunctionExecutor.execute()` with mock request
+
+**Integration Tests** (`tests/integration/`):
+- Testing complete workflows
+- Real dependency installation
+- Slower execution (seconds)
+- Example: End-to-end remote function execution
+
+### Test Structure (AAA Pattern)
+
+```python
+def test_feature_behavior():
+ # Arrange - Set up test data
+ executor = FunctionExecutor()
+ request = FunctionRequest(
+ function_name="test_func",
+ function_code="def test_func(): return 42",
+ args=[],
+ kwargs={},
+ )
+
+ # Act - Execute the operation
+ response = executor.execute(request)
+
+ # Assert - Verify expectations
+ assert response.success is True
+ result = cloudpickle.loads(base64.b64decode(response.result))
+ assert result == 42
+```
+
+### Testing Async Functions/Methods
+
+```python
+def test_execute_async_function():
+ """Test async function execution."""
+ request = FunctionRequest(
+ function_name="async_func",
+ function_code="async def async_func(): return 'result'",
+ args=[],
+ kwargs={},
+ )
+
+ response = executor.execute(request)
+
+ assert response.success is True
+ # Async functions are executed with asyncio.run() internally
+```
+
+### Running Tests
+
+```bash
+# Run all tests
+make test
+
+# Run only unit tests
+make test-unit
+
+# Run only integration tests
+make test-integration
+
+# Run with coverage report (HTML in htmlcov/)
+make test-coverage
+
+# Run with fail-fast (stop on first failure)
+make test-fast
+
+# Run specific test file
+uv run pytest tests/unit/test_function_executor.py -xvs
+
+# Run specific test function
+uv run pytest tests/unit/test_function_executor.py::TestFunctionExecution::test_execute_simple_function -xvs
+
+# Test handler with JSON test files
+make test-handler
+```
+
+### Coverage Requirements
+
+- Minimum: 35% total coverage (enforced in CI)
+- Target: 80%+ for new code
+- Critical paths: 90%+ (executors, serialization)
+
+## Code Quality & Standards
+
+### Formatting
+
+```bash
+# Format all code (modifies files)
+make format
+
+# Check formatting without changes
+make format-check
+```
+
+**Rules:**
+- PEP 8 compliance via ruff
+- Line length: 88 characters
+- Single newline at end of files
+- No trailing whitespace
+
+### Linting
+
+```bash
+# Check for issues
+make lint
+
+# Auto-fix fixable issues
+make lint-fix
+```
+
+**Key Rules:**
+- No unused imports
+- No undefined variables
+- No mutable default arguments
+- No bare `except:` clauses
+
+### Type Checking
+
+```bash
+# Run mypy type checker
+make typecheck
+```
+
+**Requirements:**
+- Type hints mandatory for all functions
+- No `Any` types without justification
+- Pydantic models for data validation
+
+Example:
+```python
+def process_data(items: list[dict[str, Any]]) -> pd.DataFrame:
+ """Process items and return DataFrame."""
+ pass
+```
+
+### Pre-Commit Quality Check
+
+Always run before committing:
+
+```bash
+make quality-check
+```
+
+This runs:
+1. Format check
+2. Lint check
+3. Type check
+4. Test suite with coverage
+5. Handler test files
+
+## Architecture Overview
+
+### Component Hierarchy
+
+```mermaid
+graph TB
+ Handler[handler.py
RunPod Entry Point]:::entry
+ RemoteExec[remote_executor.py
Central Orchestrator]:::core
+ DepInst[dependency_installer.py
Package Management]:::support
+ FuncExec[function_executor.py
Function Execution]:::exec
+ ClassExec[class_executor.py
Class/Method Execution]:::exec
+ BaseExec[base_executor.py
Common Interface]:::support
+
+ Handler --> RemoteExec
+ RemoteExec --> DepInst
+ RemoteExec --> FuncExec
+ RemoteExec --> ClassExec
+ FuncExec --> BaseExec
+ ClassExec --> BaseExec
+
+ classDef entry fill:#1976d2,stroke:#0d47a1,stroke-width:3px,color:#fff
+ classDef core fill:#388e3c,stroke:#1b5e20,stroke-width:3px,color:#fff
+ classDef exec fill:#f57c00,stroke:#e65100,stroke-width:3px,color:#fff
+ classDef support fill:#7b1fa2,stroke:#4a148c,stroke-width:3px,color:#fff
+```
+
+### Execution Flow
+
+```mermaid
+sequenceDiagram
+ participant Client as Tetra Client
+ participant Handler as handler.py
+ participant Remote as remote_executor.py
+ participant DepInst as dependency_installer.py
+ participant Executor as function/class_executor.py
+
+ Client->>Handler: FunctionRequest (serialized)
+ Handler->>Remote: Deserialize & route
+ Remote->>DepInst: Install dependencies
+ DepInst-->>Remote: Dependencies ready
+ Remote->>Executor: Execute function/method
+ Executor-->>Remote: FunctionResponse
+ Remote-->>Handler: Serialize response
+ Handler-->>Client: Return result
+```
+
+### Key Patterns
+
+**Composition Over Inheritance:**
+- `RemoteExecutor` composes `DependencyInstaller` and executors
+- Clear separation of concerns
+
+**Async Support:**
+- Detects async functions with `inspect.iscoroutinefunction()`
+- Executes with `asyncio.run()` for async, direct call for sync
+- Supports both in `FunctionExecutor` and `ClassExecutor`
+
+**Serialization:**
+- CloudPickle for function arguments and results
+- Base64 encoding for transport
+- Handles complex Python objects
+
+**Error Handling:**
+- Structured responses via `FunctionResponse`
+- Full traceback capture
+- Combined stdout/stderr/log output
+
+### Detailed Architecture
+
+See [CLAUDE.md](CLAUDE.md) for comprehensive architecture documentation and component details.
+
+## Common Development Tasks
+
+### Adding a New Executor Type
+
+1. **Create executor class**
+ ```python
+ # src/new_executor.py
+ from base_executor import BaseExecutor
+ from remote_execution import FunctionRequest, FunctionResponse
+
+ class NewExecutor(BaseExecutor):
+ def execute(self, request: FunctionRequest) -> FunctionResponse:
+ # Implementation
+ pass
+ ```
+
+2. **Add to RemoteExecutor**
+ ```python
+ # src/remote_executor.py
+ from new_executor import NewExecutor
+
+ def __init__(self):
+ self.new_executor = NewExecutor()
+
+ def execute(self, request: FunctionRequest) -> FunctionResponse:
+ if request.execution_type == "new_type":
+ return self.new_executor.execute(request)
+ ```
+
+3. **Write tests**
+ ```python
+ # tests/unit/test_new_executor.py
+ class TestNewExecutor:
+ def test_execute_basic(self):
+ # AAA pattern
+ pass
+ ```
+
+### Adding System Packages
+
+System packages that require `apt-get` installation:
+
+```python
+# src/constants.py
+LARGE_SYSTEM_PACKAGES = [
+ "ffmpeg",
+ "libsm6",
+ "libxext6",
+ "your-package-here", # Add here
+]
+```
+
+### Writing Handler Test Files
+
+Handler tests validate end-to-end execution:
+
+```bash
+# Create test file
+cat > src/tests/test_my_feature.json << 'EOF'
+{
+ "input": {
+ "function_name": "my_function",
+ "function_code": "def my_function(x): return x * 2",
+ "args": ["Mg=="], # base64(cloudpickle.dumps(3))
+ "kwargs": {},
+ "python_dependencies": [],
+ "system_dependencies": []
+ }
+}
+EOF
+
+# Test locally
+make test-handler
+```
+
+### Modifying Dependency Installation
+
+Edit `src/dependency_installer.py`:
+
+```python
+def install_python_packages(self, packages: list[str]) -> FunctionResponse:
+ # Add custom logic
+ # Uses uv pip install with environment detection
+ pass
+
+def install_system_packages(self, packages: list[str]) -> FunctionResponse:
+ # Add custom logic
+ # Uses apt-get/nala with acceleration
+ pass
+```
+
+### Debugging Remote Execution Failures
+
+1. **Check serialization**
+ ```python
+ # Test argument encoding
+ import cloudpickle, base64
+ arg = "test"
+ encoded = base64.b64encode(cloudpickle.dumps(arg)).decode()
+ decoded = cloudpickle.loads(base64.b64decode(encoded))
+ assert arg == decoded
+ ```
+
+2. **Review response output**
+ ```python
+ response = executor.execute(request)
+ print(response.stdout) # Combined stdout/stderr/logs
+ print(response.error) # Error message + traceback
+ ```
+
+3. **Test in isolation**
+ ```python
+ # Run function locally first
+ exec(function_code, namespace := {})
+ func = namespace[function_name]
+ result = func(*args, **kwargs) # Direct execution
+ ```
+
+## Docker Development
+
+### Building Images
+
+```bash
+# Build both GPU and CPU images
+make build
+
+# Build GPU image only
+make build-gpu
+
+# Build CPU image only
+make build-cpu
+
+# Build and test on macOS (ARM)
+make smoketest-macos-build
+make smoketest-macos
+```
+
+### Image Details
+
+**GPU Image (`Dockerfile`):**
+- Base: `runpod/pytorch:2.8.0-py3.11-cuda12.8.0-devel-ubuntu24.04`
+- Platform: `linux/amd64`
+- CUDA 12.8 support
+- PyTorch 2.8.0 pre-installed
+
+**CPU Image (`Dockerfile-cpu`):**
+- Base: `python:3.11-slim`
+- Platform: `linux/amd64`
+- Minimal footprint
+
+### Testing in Containers
+
+```bash
+# Build image
+make build-cpu
+
+# Run container interactively
+docker run -it --rm \
+ -v $(pwd):/workspace \
+ -e RUNPOD_TEST_INPUT="$(cat src/tests/test_input.json)" \
+ runpod/worker-tetra:dev \
+ /bin/bash
+
+# Inside container, run handler
+cd /workspace
+python handler.py
+```
+
+### Multi-Architecture Builds
+
+CI builds for multiple platforms:
+- GPU: `linux/amd64`
+- CPU: `linux/amd64`, `linux/arm64`
+
+## Submodule Management
+
+### Understanding tetra-rp
+
+The `tetra-rp/` submodule contains the Tetra SDK:
+- Client library with `@remote` decorator
+- Resource management (`LiveServerless`)
+- Protocol definitions
+
+### Updating Submodule
+
+```bash
+# Update to latest tetra-rp
+git submodule update --remote --rebase
+
+# Sync protocol definitions
+make protocols
+
+# Test with updated submodule
+make test
+
+# Commit submodule update
+git add tetra-rp
+git commit -m "chore: update tetra-rp submodule to latest"
+```
+
+### Protocol Synchronization
+
+Protocol definitions must stay in sync:
+
+```bash
+# Copy remote_execution.py from submodule
+make protocols
+
+# Verify no breaking changes
+make test
+```
+
+### Submodule Development Workflow
+
+Working on both worker and SDK:
+
+```bash
+# Make changes in tetra-rp/
+cd tetra-rp
+git checkout -b feature/my-change
+# ... make changes ...
+git commit -m "feat: my change"
+cd ..
+
+# Update worker to use new tetra-rp
+git add tetra-rp
+git commit -m "chore: use tetra-rp feature branch"
+
+# After tetra-rp PR merges, update to main
+cd tetra-rp
+git checkout main
+git pull
+cd ..
+git add tetra-rp
+git commit -m "chore: update tetra-rp to latest main"
+```
+
+## CI/CD Pipeline
+
+### GitHub Actions Workflows
+
+**Primary Workflow (`.github/workflows/ci.yml`):**
+
+```mermaid
+graph LR
+ PR[Pull Request]:::pr --> Test[Test Job
Python 3.9-3.13]:::test
+ PR --> Lint[Lint Job
Ruff + Formatting]:::lint
+ PR --> Docker[Docker Test
CPU Build]:::docker
+
+ Main[Push to Main]:::main --> Test
+ Main --> Lint
+ Main --> Release[Release Please]:::release
+ Main --> DockerMain[Docker Main
Push :main tags]:::dockerpush
+
+ Release --> DockerProd[Docker Prod
Semantic versions]:::dockerpush
+
+ classDef pr fill:#1976d2,stroke:#0d47a1,stroke-width:3px,color:#fff
+ classDef main fill:#388e3c,stroke:#1b5e20,stroke-width:3px,color:#fff
+ classDef test fill:#f57c00,stroke:#e65100,stroke-width:3px,color:#fff
+ classDef lint fill:#7b1fa2,stroke:#4a148c,stroke-width:3px,color:#fff
+ classDef docker fill:#0288d1,stroke:#01579b,stroke-width:3px,color:#fff
+ classDef release fill:#c62828,stroke:#b71c1c,stroke-width:3px,color:#fff
+ classDef dockerpush fill:#2e7d32,stroke:#1b5e20,stroke-width:3px,color:#fff
+```
+
+**Test Job:**
+- Runs on Python 3.9, 3.10, 3.11, 3.12, 3.13
+- Executes `make test-coverage`
+- Requires 35% minimum coverage
+- Tests handler with all `test_*.json` files
+
+**Lint Job:**
+- Python 3.11 only
+- Runs `make format-check` and `make lint`
+
+**Docker Jobs:**
+- Builds GPU and CPU images
+- Pushes `:main` tags on main branch
+- Pushes semantic version tags on release
+
+### Release Process
+
+Automated with [release-please](https://github.com/googleapis/release-please):
+
+1. **Make commits with conventional format**
+ ```bash
+ git commit -m "feat: new feature"
+ git commit -m "fix: bug fix"
+ git commit -m "refactor: code improvement"
+ ```
+
+2. **Release Please creates PR**
+ - Auto-generates changelog
+ - Bumps version in `pyproject.toml`
+ - Updates `CHANGELOG.md`
+
+3. **Merge release PR**
+ - Creates GitHub release
+ - Tags with semantic version
+ - Triggers Docker production builds
+
+4. **Docker images published**
+ - `runpod/worker-tetra:latest`
+ - `runpod/worker-tetra:X.Y.Z`
+ - `runpod/worker-tetra:X.Y`
+ - `runpod/worker-tetra:X`
+
+### Fixing CI Failures Locally
+
+**Test failures:**
+```bash
+# Run exact CI test command
+make test-coverage
+
+# Check coverage report
+open htmlcov/index.html
+```
+
+**Lint failures:**
+```bash
+# Run exact CI lint commands
+make format-check
+make lint
+```
+
+**Docker build failures:**
+```bash
+# Build locally
+make build-cpu
+
+# Test built image
+docker run --rm runpod/worker-tetra:dev python -c "import handler"
+```
+
+## Contributing Guidelines
+
+### Git Workflow
+
+1. **Branch from main**
+ ```bash
+ git checkout main
+ git pull origin main
+ git checkout -b feature/TICKET-description
+ ```
+
+2. **Branch naming conventions**
+ - `feature/TICKET-description` - New features
+ - `fix/TICKET-description` - Bug fixes
+ - `refactor/description` - Code improvements
+ - `perf/description` - Performance improvements
+ - `docs/description` - Documentation
+
+3. **Make commits**
+ ```bash
+ git add .
+ git commit -m "type(scope): subject"
+ ```
+
+### Commit Message Format
+
+Follow [Conventional Commits](https://www.conventionalcommits.org/):
+
+```
+type(scope): subject
+
+Longer description if needed.
+
+- Bullet points for multiple changes
+- Reference issue numbers
+```
+
+**Types:**
+- `feat` - New feature
+- `fix` - Bug fix
+- `refactor` - Code refactoring (included in release notes)
+- `perf` - Performance improvement
+- `test` - Adding/updating tests
+- `docs` - Documentation only
+- `chore` - Maintenance tasks
+- `build` - Build system changes
+
+**Scopes:**
+- `executor` - Executor components
+- `installer` - Dependency installer
+- `handler` - Handler entry point
+- `serialization` - Serialization utils
+- `logging` - Log streaming
+- `ci` - CI/CD changes
+
+**Examples:**
+```bash
+git commit -m "feat(executor): add async function execution support"
+git commit -m "fix(installer): handle missing system packages gracefully"
+git commit -m "refactor(serialization): simplify cloudpickle encoding"
+git commit -m "docs: update DEVELOPMENT.md with testing guide"
+```
+
+### Pull Request Checklist
+
+Before opening PR:
+
+- [ ] All tests pass (`make test`)
+- [ ] Code formatted (`make format`)
+- [ ] No lint errors (`make lint`)
+- [ ] Type hints present (`make typecheck`)
+- [ ] Coverage meets minimum 35% (`make test-coverage`)
+- [ ] Handler tests pass (`make test-handler`)
+- [ ] Commits follow conventional format
+- [ ] PR description explains changes
+
+**PR Template:**
+
+```markdown
+## Summary
+Brief description of changes
+
+## Changes
+- Change 1
+- Change 2
+
+## Testing
+How was this tested?
+
+## Related Issues
+Fixes #123
+```
+
+### Code Review Expectations
+
+**As Author:**
+- Respond to feedback within 24 hours
+- Keep PRs focused and small
+- Update based on review comments
+- Ensure CI passes before requesting review
+
+**As Reviewer:**
+- Review within 48 hours
+- Check for correctness, readability, tests
+- Suggest improvements, don't demand perfection
+- Approve when requirements met
+
+## Debugging Guide
+
+### Debugging Executor Components
+
+**FunctionExecutor Issues:**
+
+```python
+# Enable debug logging
+import logging
+logging.basicConfig(level=logging.DEBUG)
+
+# Test function execution
+from function_executor import FunctionExecutor
+from remote_execution import FunctionRequest
+
+executor = FunctionExecutor()
+request = FunctionRequest(
+ function_name="test",
+ function_code="def test(): return 42",
+ args=[],
+ kwargs={},
+)
+
+response = executor.execute(request)
+print(f"Success: {response.success}")
+print(f"Result: {response.result}")
+print(f"Output: {response.stdout}")
+print(f"Error: {response.error}")
+```
+
+**ClassExecutor Issues:**
+
+```python
+# Test class method execution
+from class_executor import ClassExecutor
+
+executor = ClassExecutor()
+request = FunctionRequest(
+ execution_type="class",
+ class_name="TestClass",
+ class_code="""
+class TestClass:
+ def __init__(self, value):
+ self.value = value
+ def get(self):
+ return self.value
+""",
+ method_name="get",
+ constructor_args=encoded_args,
+ args=[],
+ kwargs={},
+)
+
+response = executor.execute_class_method(request)
+# Check response.instance_id, response.instance_info
+```
+
+### Log Streaming and Output Capture
+
+All executor output is captured:
+
+```python
+# In your test function
+def test_func():
+ print("stdout message") # Captured
+ logging.info("log message") # Captured
+ import sys
+ sys.stderr.write("stderr message\n") # Captured
+ return "result"
+
+# All output available in response.stdout
+response = executor.execute(request)
+assert "stdout message" in response.stdout
+assert "log message" in response.stdout
+assert "stderr message" in response.stdout
+```
+
+### Dependency Installation Issues
+
+**Debug Python packages:**
+
+```python
+from dependency_installer import DependencyInstaller
+
+installer = DependencyInstaller()
+response = installer.install_python_packages(["numpy", "pandas"])
+
+if not response.success:
+ print(f"Installation failed: {response.error}")
+ print(f"Output: {response.stdout}")
+```
+
+**Debug system packages:**
+
+```python
+response = installer.install_system_packages(["ffmpeg", "libsm6"])
+
+if not response.success:
+ print(f"Installation failed: {response.error}")
+ # Check if Docker vs local environment
+ # Check package availability with apt-cache
+```
+
+### Serialization/Deserialization Failures
+
+**Test serialization:**
+
+```python
+import cloudpickle
+import base64
+from serialization_utils import SerializationUtils
+
+# Test argument serialization
+args = [1, 2, 3]
+encoded = [base64.b64encode(cloudpickle.dumps(arg)).decode() for arg in args]
+decoded = SerializationUtils.deserialize_args(encoded)
+assert args == decoded
+
+# Test kwargs serialization
+kwargs = {"key": "value"}
+encoded = {k: base64.b64encode(cloudpickle.dumps(v)).decode()
+ for k, v in kwargs.items()}
+decoded = SerializationUtils.deserialize_kwargs(encoded)
+assert kwargs == decoded
+```
+
+**Handle serialization errors:**
+
+```python
+try:
+ result = cloudpickle.dumps(complex_object)
+except Exception as e:
+ # Some objects can't be pickled (file handles, sockets, etc.)
+ print(f"Serialization failed: {e}")
+ # Simplify the object or use alternative serialization
+```
+
+### Async Execution Problems
+
+**Debug async function execution:**
+
+```python
+import asyncio
+import inspect
+
+async def async_func():
+ await asyncio.sleep(0.1)
+ return "result"
+
+# Check if function is coroutine
+assert inspect.iscoroutinefunction(async_func)
+
+# Execute manually
+result = asyncio.run(async_func())
+assert result == "result"
+
+# Test through executor
+request = FunctionRequest(
+ function_name="async_func",
+ function_code="async def async_func(): return 'result'",
+ args=[],
+ kwargs={},
+)
+response = executor.execute(request)
+# Executor handles asyncio.run() internally
+```
+
+### Common Error Patterns
+
+**Function not found:**
+```python
+# Error: Function 'my_func' not found in the provided code
+# Solution: Ensure function_name matches the actual function name in function_code
+```
+
+**Serialization mismatch:**
+```python
+# Error: Deserialization failed
+# Solution: Ensure args/kwargs are base64(cloudpickle.dumps(value))
+```
+
+**Import errors:**
+```python
+# Error: ModuleNotFoundError: No module named 'X'
+# Solution: Add to python_dependencies in request
+```
+
+**Async not awaited:**
+```python
+# Error: coroutine 'func' was never awaited
+# Solution: Executors handle this automatically; check if inspect.iscoroutinefunction() works
+```
+
+## Troubleshooting
+
+### Common Setup Issues
+
+**uv not found:**
+```bash
+# Install uv
+curl -LsSf https://astral.sh/uv/install.sh | sh
+
+# Or via pip
+pip install uv
+```
+
+**Submodule not initialized:**
+```bash
+# Initialize and update submodules
+git submodule update --init --recursive
+
+# Or clone with submodules
+git clone --recurse-submodules
+```
+
+**Virtual environment issues:**
+```bash
+# Remove and recreate
+rm -rf .venv
+make setup
+source .venv/bin/activate
+```
+
+### Test Failures
+
+**Coverage below 35%:**
+```bash
+# Check which files need coverage
+make test-coverage
+open htmlcov/index.html
+
+# Add tests for uncovered code
+# Focus on executor components (highest value)
+```
+
+**Import errors in tests:**
+```bash
+# Ensure you're running with uv
+uv run pytest tests/
+
+# Or ensure PYTHONPATH includes src/
+export PYTHONPATH=src:$PYTHONPATH
+pytest tests/
+```
+
+**Async test failures:**
+```python
+# Ensure pytest-asyncio is installed
+uv sync --all-groups
+
+# Check pytest config in pyproject.toml
+# asyncio_mode = "auto" should be set
+```
+
+### Docker Build Problems
+
+**Build hangs on tzdata:**
+```dockerfile
+# Fixed in current Dockerfile with:
+ENV DEBIAN_FRONTEND=noninteractive
+```
+
+**Platform mismatch:**
+```bash
+# Specify platform explicitly
+docker build --platform linux/amd64 -f Dockerfile -t test .
+
+# For M1/M2 Mac development
+docker build --platform linux/arm64 -f Dockerfile-cpu -t test .
+```
+
+**Out of disk space:**
+```bash
+# Clean Docker resources
+docker system prune -a
+
+# Remove dangling images
+docker image prune
+```
+
+### Submodule Sync Issues
+
+**Detached HEAD in submodule:**
+```bash
+cd tetra-rp
+git checkout main
+git pull
+cd ..
+git add tetra-rp
+git commit -m "chore: update tetra-rp to latest"
+```
+
+**Protocol out of sync:**
+```bash
+# Re-sync protocol definitions
+make protocols
+
+# Verify tests still pass
+make test
+```
+
+**Merge conflicts in submodule:**
+```bash
+# Update to latest
+git submodule update --remote --rebase
+
+# If conflicts
+cd tetra-rp
+git status # Check conflict
+git rebase --continue # After resolving
+cd ..
+```
+
+### CI/CD Issues
+
+**Tests pass locally but fail in CI:**
+```bash
+# Run exact CI commands
+make test-coverage # For test job
+make format-check && make lint # For lint job
+
+# Check Python version matches CI
+python --version # Should be 3.11+ for lint, 3.9-3.13 for tests
+```
+
+**Docker push fails:**
+- Check Docker Hub credentials in GitHub Secrets
+- Verify `DOCKERHUB_USERNAME` and `DOCKERHUB_TOKEN`
+- Ensure permissions for `runpod/worker-tetra` repository
+
+**Release Please not creating PR:**
+- Ensure commits follow conventional format
+- Check `.release-please-manifest.json` is valid
+- Verify GitHub token has required permissions
+
+---
+
+## Additional Resources
+
+- **Architecture Details:** [CLAUDE.md](CLAUDE.md)
+- **Design Documents:** [docs/](docs/)
+- **Tetra SDK:** [tetra-rp/README.md](tetra-rp/README.md)
+- **Release Process:** [tetra-rp/RELEASE_SYSTEM.md](tetra-rp/RELEASE_SYSTEM.md)
+- **RunPod Docs:** https://docs.runpod.io/
+
+## Getting Help
+
+- **GitHub Issues:** https://github.com/runpod/worker-tetra/issues
+- **RunPod Discord:** https://discord.gg/runpod
+- **Internal:** Slack #tetra-development channel
diff --git a/src/class_executor.py b/src/class_executor.py
index 9347ed7..52ee182 100644
--- a/src/class_executor.py
+++ b/src/class_executor.py
@@ -2,6 +2,7 @@
import logging
import traceback
import uuid
+import inspect
from contextlib import redirect_stdout, redirect_stderr
from datetime import datetime
from typing import Dict, Any, Tuple
@@ -18,11 +19,11 @@ def __init__(self):
self.class_instances: Dict[str, Any] = {}
self.instance_metadata: Dict[str, Dict[str, Any]] = {}
- def execute(self, request: FunctionRequest) -> FunctionResponse:
+ async def execute(self, request: FunctionRequest) -> FunctionResponse:
"""Execute class method."""
- return self.execute_class_method(request)
+ return await self.execute_class_method(request)
- def execute_class_method(self, request: FunctionRequest) -> FunctionResponse:
+ async def execute_class_method(self, request: FunctionRequest) -> FunctionResponse:
"""
Execute a class method with instance management.
"""
@@ -55,8 +56,13 @@ def execute_class_method(self, request: FunctionRequest) -> FunctionResponse:
args = SerializationUtils.deserialize_args(request.args)
kwargs = SerializationUtils.deserialize_kwargs(request.kwargs)
- # Execute the method
- result = method(*args, **kwargs)
+ # Execute the method (handle both sync and async)
+ if inspect.iscoroutinefunction(method):
+ # Async method - await directly
+ result = await method(*args, **kwargs)
+ else:
+ # Sync method - call directly
+ result = method(*args, **kwargs)
# Update instance metadata
self._update_instance_metadata(instance_id)
diff --git a/src/function_executor.py b/src/function_executor.py
index 7f94690..e22909b 100644
--- a/src/function_executor.py
+++ b/src/function_executor.py
@@ -1,6 +1,7 @@
import io
import logging
import traceback
+import inspect
from contextlib import redirect_stdout, redirect_stderr
from typing import Dict, Any
@@ -11,7 +12,7 @@
class FunctionExecutor:
"""Handles execution of individual functions with output capture."""
- def execute(self, request: FunctionRequest) -> FunctionResponse:
+ async def execute(self, request: FunctionRequest) -> FunctionResponse:
"""
Execute a function with full output capture.
@@ -50,8 +51,13 @@ def execute(self, request: FunctionRequest) -> FunctionResponse:
args = SerializationUtils.deserialize_args(request.args)
kwargs = SerializationUtils.deserialize_kwargs(request.kwargs)
- # Execute the function
- result = func(*args, **kwargs)
+ # Execute the function (handle both sync and async)
+ if inspect.iscoroutinefunction(func):
+ # Async function - await directly
+ result = await func(*args, **kwargs)
+ else:
+ # Sync function - call directly
+ result = func(*args, **kwargs)
except Exception as e:
# Combine output streams
diff --git a/src/remote_executor.py b/src/remote_executor.py
index 1577e46..1088e17 100644
--- a/src/remote_executor.py
+++ b/src/remote_executor.py
@@ -93,9 +93,9 @@ async def ExecuteFunction(self, request: FunctionRequest) -> FunctionResponse:
# Execute the function/class
if execution_type == "class":
- result = self.class_executor.execute_class_method(request)
+ result = await self.class_executor.execute_class_method(request)
else:
- result = self.function_executor.execute(request)
+ result = await self.function_executor.execute(request)
# Add all captured system logs to the result
system_logs = get_streamed_logs(clear_buffer=True)
diff --git a/tests/integration/test_handler_integration.py b/tests/integration/test_handler_integration.py
index 0eca974..012a6d5 100644
--- a/tests/integration/test_handler_integration.py
+++ b/tests/integration/test_handler_integration.py
@@ -356,7 +356,8 @@ def bad_function(:
assert "SyntaxError" in result["error"] or "invalid syntax" in result["error"]
@pytest.mark.integration
- def test_remote_executor_direct_execution(self):
+ @pytest.mark.asyncio
+ async def test_remote_executor_direct_execution(self):
"""Test RemoteExecutor direct method calls."""
executor = RemoteExecutor()
@@ -370,7 +371,7 @@ def direct_test():
kwargs={},
)
- result = executor.function_executor.execute(request)
+ result = await executor.function_executor.execute(request)
assert result.success is True
deserialized_result = cloudpickle.loads(base64.b64decode(result.result))
diff --git a/tests/unit/test_class_executor.py b/tests/unit/test_class_executor.py
index 727231d..6ffeb10 100644
--- a/tests/unit/test_class_executor.py
+++ b/tests/unit/test_class_executor.py
@@ -28,7 +28,7 @@ def encode_kwargs(self, **kwargs):
for k, v in kwargs.items()
}
- def test_execute_class_method_basic(self):
+ async def test_execute_class_method_basic(self):
"""Test basic class method execution."""
request = FunctionRequest(
execution_type="class",
@@ -47,14 +47,14 @@ def get_value(self):
kwargs={},
)
- response = self.executor.execute_class_method(request)
+ response = await self.executor.execute_class_method(request)
assert response.success is True
assert response.instance_id is not None
result = cloudpickle.loads(base64.b64decode(response.result))
assert result == "Value: test"
- def test_execute_class_method_with_args(self):
+ async def test_execute_class_method_with_args(self):
"""Test class method execution with method arguments."""
request = FunctionRequest(
execution_type="class",
@@ -73,13 +73,13 @@ def add(self, a, b):
kwargs={},
)
- response = self.executor.execute_class_method(request)
+ response = await self.executor.execute_class_method(request)
assert response.success is True
result = cloudpickle.loads(base64.b64decode(response.result))
assert result == 8
- def test_execute_class_method_default_call(self):
+ async def test_execute_class_method_default_call(self):
"""Test class method execution with default __call__ method."""
request = FunctionRequest(
execution_type="class",
@@ -97,13 +97,13 @@ def __call__(self):
kwargs={},
)
- response = self.executor.execute_class_method(request)
+ response = await self.executor.execute_class_method(request)
assert response.success is True
result = cloudpickle.loads(base64.b64decode(response.result))
assert result == "hello"
- def test_execute_class_method_not_found(self):
+ async def test_execute_class_method_not_found(self):
"""Test execution when method is not found in class."""
request = FunctionRequest(
execution_type="class",
@@ -121,13 +121,13 @@ def existing_method(self):
kwargs={},
)
- response = self.executor.execute_class_method(request)
+ response = await self.executor.execute_class_method(request)
assert response.success is False
assert "missing_method" in response.error
assert "not found" in response.error
- def test_execute_class_method_with_exception(self):
+ async def test_execute_class_method_with_exception(self):
"""Test error handling when class method raises exception."""
request = FunctionRequest(
execution_type="class",
@@ -145,7 +145,7 @@ def error_method(self):
kwargs={},
)
- response = self.executor.execute_class_method(request)
+ response = await self.executor.execute_class_method(request)
assert response.success is False
assert "Method error" in response.error
@@ -165,7 +165,7 @@ def encode_args(self, *args):
base64.b64encode(cloudpickle.dumps(arg)).decode("utf-8") for arg in args
]
- def test_create_new_instance(self):
+ async def test_create_new_instance(self):
"""Test creating a new class instance."""
request = FunctionRequest(
execution_type="class",
@@ -186,7 +186,7 @@ def increment(self):
kwargs={},
)
- response = self.executor.execute_class_method(request)
+ response = await self.executor.execute_class_method(request)
assert response.success is True
assert response.instance_id is not None
@@ -198,7 +198,7 @@ def increment(self):
assert metadata["method_calls"] == 1
assert "created_at" in metadata
- def test_reuse_existing_instance(self):
+ async def test_reuse_existing_instance(self):
"""Test reusing an existing class instance."""
# Create initial instance
initial_request = FunctionRequest(
@@ -223,7 +223,7 @@ def get_count(self):
kwargs={},
)
- first_response = self.executor.execute_class_method(initial_request)
+ first_response = await self.executor.execute_class_method(initial_request)
instance_id = first_response.instance_id
# Reuse the same instance
@@ -238,7 +238,7 @@ def get_count(self):
kwargs={},
)
- second_response = self.executor.execute_class_method(reuse_request)
+ second_response = await self.executor.execute_class_method(reuse_request)
assert second_response.success is True
assert second_response.instance_id == instance_id
@@ -247,7 +247,7 @@ def get_count(self):
result = cloudpickle.loads(base64.b64decode(second_response.result))
assert result == 1
- def test_instance_metadata_tracking(self):
+ async def test_instance_metadata_tracking(self):
"""Test that instance metadata is properly tracked."""
request = FunctionRequest(
execution_type="class",
@@ -257,7 +257,7 @@ class TestClass:
def __init__(self):
pass
- def test_method(self):
+ async def test_method(self):
return "test"
""",
method_name="test_method",
@@ -266,12 +266,12 @@ def test_method(self):
)
# Execute method multiple times
- response1 = self.executor.execute_class_method(request)
+ response1 = await self.executor.execute_class_method(request)
instance_id = response1.instance_id
request.instance_id = instance_id
request.create_new_instance = False
- self.executor.execute_class_method(request)
+ await self.executor.execute_class_method(request)
# Check metadata updates
metadata = self.executor.instance_metadata[instance_id]
@@ -283,14 +283,14 @@ def test_method(self):
last_used_time = datetime.fromisoformat(metadata["last_used"])
assert last_used_time >= created_time
- def test_generate_instance_id(self):
+ async def test_generate_instance_id(self):
"""Test automatic instance ID generation."""
request = FunctionRequest(
execution_type="class",
class_name="TestClass",
class_code="""
class TestClass:
- def test_method(self):
+ async def test_method(self):
return "test"
""",
method_name="test_method",
@@ -298,14 +298,14 @@ def test_method(self):
kwargs={},
)
- response = self.executor.execute_class_method(request)
+ response = await self.executor.execute_class_method(request)
assert response.success is True
assert response.instance_id is not None
assert response.instance_id.startswith("TestClass_")
assert len(response.instance_id.split("_")[1]) == 8 # UUID hex[:8]
- def test_class_not_found_error(self):
+ async def test_class_not_found_error(self):
"""Test error when class is not found in provided code."""
request = FunctionRequest(
execution_type="class",
@@ -320,8 +320,216 @@ def __init__(self):
kwargs={},
)
- response = self.executor.execute_class_method(request)
+ response = await self.executor.execute_class_method(request)
assert response.success is False
assert "MissingClass" in response.error
assert "not found" in response.error
+
+
+class TestAsyncMethodSupport:
+ """Test async method execution support."""
+
+ def setup_method(self):
+ """Setup for each test method."""
+ self.executor = ClassExecutor()
+
+ def encode_args(self, *args):
+ """Helper to encode arguments."""
+ return [
+ base64.b64encode(cloudpickle.dumps(arg)).decode("utf-8") for arg in args
+ ]
+
+ async def test_execute_async_method(self):
+ """Test execution of async method."""
+ request = FunctionRequest(
+ execution_type="class",
+ class_name="AsyncGreeter",
+ class_code="""
+class AsyncGreeter:
+ def __init__(self, greeting):
+ self.greeting = greeting
+
+ async def greet(self, name):
+ return f'{self.greeting}, {name}!'
+""",
+ method_name="greet",
+ constructor_args=self.encode_args("Hello"),
+ args=self.encode_args("World"),
+ kwargs={},
+ )
+
+ response = await self.executor.execute_class_method(request)
+
+ assert response.success is True
+ result = cloudpickle.loads(base64.b64decode(response.result))
+ assert result == "Hello, World!"
+
+ async def test_execute_async_method_with_await(self):
+ """Test async method that uses await."""
+ request = FunctionRequest(
+ execution_type="class",
+ class_name="AsyncWorker",
+ class_code="""
+import asyncio
+
+class AsyncWorker:
+ def __init__(self):
+ self.processed = []
+
+ async def process(self, item, delay=0.01):
+ await asyncio.sleep(delay)
+ self.processed.append(item)
+ return f'Processed: {item}'
+""",
+ method_name="process",
+ args=self.encode_args("task1"),
+ kwargs={},
+ )
+
+ response = await self.executor.execute_class_method(request)
+
+ assert response.success is True
+ result = cloudpickle.loads(base64.b64decode(response.result))
+ assert result == "Processed: task1"
+
+ async def test_execute_async_method_returning_dict(self):
+ """Test async method returning dict (like GPU worker)."""
+ request = FunctionRequest(
+ execution_type="class",
+ class_name="GPUProcessor",
+ class_code="""
+class GPUProcessor:
+ def __init__(self, gpu_id):
+ self.gpu_id = gpu_id
+
+ async def process_batch(self, input_data: dict) -> dict:
+ batch_size = input_data.get("batch_size", 32)
+ return {
+ "status": "success",
+ "gpu_id": self.gpu_id,
+ "batch_size": batch_size,
+ "processed_items": batch_size * 10,
+ }
+""",
+ method_name="process_batch",
+ constructor_args=self.encode_args("cuda:0"),
+ args=self.encode_args({"batch_size": 64}),
+ kwargs={},
+ )
+
+ response = await self.executor.execute_class_method(request)
+
+ assert response.success is True
+ result = cloudpickle.loads(base64.b64decode(response.result))
+ assert result["status"] == "success"
+ assert result["gpu_id"] == "cuda:0"
+ assert result["batch_size"] == 64
+ assert result["processed_items"] == 640
+
+ async def test_async_method_instance_persistence(self):
+ """Test that async methods work with instance persistence."""
+ # Create instance and call async method
+ initial_request = FunctionRequest(
+ execution_type="class",
+ class_name="AsyncCounter",
+ class_code="""
+import asyncio
+
+class AsyncCounter:
+ def __init__(self, start=0):
+ self.count = start
+
+ async def increment(self):
+ await asyncio.sleep(0.01)
+ self.count += 1
+ return self.count
+
+ async def get_count(self):
+ return self.count
+""",
+ method_name="increment",
+ constructor_args=self.encode_args(0),
+ create_new_instance=True,
+ args=[],
+ kwargs={},
+ )
+
+ first_response = await self.executor.execute_class_method(initial_request)
+ instance_id = first_response.instance_id
+
+ assert first_response.success is True
+ result1 = cloudpickle.loads(base64.b64decode(first_response.result))
+ assert result1 == 1
+
+ # Reuse instance with another async method call
+ reuse_request = FunctionRequest(
+ execution_type="class",
+ class_name="AsyncCounter",
+ class_code="# Code not needed for reuse",
+ method_name="get_count",
+ instance_id=instance_id,
+ create_new_instance=False,
+ args=[],
+ kwargs={},
+ )
+
+ second_response = await self.executor.execute_class_method(reuse_request)
+
+ assert second_response.success is True
+ assert second_response.instance_id == instance_id
+ result2 = cloudpickle.loads(base64.b64decode(second_response.result))
+ assert result2 == 1 # Count should be preserved from first call
+
+ async def test_mixed_sync_async_methods(self):
+ """Test class with both sync and async methods."""
+ # First call sync method
+ sync_request = FunctionRequest(
+ execution_type="class",
+ class_name="MixedClass",
+ class_code="""
+import asyncio
+
+class MixedClass:
+ def __init__(self):
+ self.value = 0
+
+ def sync_set(self, val):
+ self.value = val
+ return f'Sync set: {val}'
+
+ async def async_get(self):
+ await asyncio.sleep(0.01)
+ return f'Async get: {self.value}'
+""",
+ method_name="sync_set",
+ constructor_args=[],
+ create_new_instance=True,
+ args=self.encode_args(42),
+ kwargs={},
+ )
+
+ sync_response = await self.executor.execute_class_method(sync_request)
+ instance_id = sync_response.instance_id
+
+ assert sync_response.success is True
+ sync_result = cloudpickle.loads(base64.b64decode(sync_response.result))
+ assert sync_result == "Sync set: 42"
+
+ # Then call async method on same instance
+ async_request = FunctionRequest(
+ execution_type="class",
+ class_name="MixedClass",
+ class_code="# Code not needed for reuse",
+ method_name="async_get",
+ instance_id=instance_id,
+ create_new_instance=False,
+ args=[],
+ kwargs={},
+ )
+
+ async_response = await self.executor.execute_class_method(async_request)
+
+ assert async_response.success is True
+ async_result = cloudpickle.loads(base64.b64decode(async_response.result))
+ assert async_result == "Async get: 42"
diff --git a/tests/unit/test_function_executor.py b/tests/unit/test_function_executor.py
index 815e326..302e292 100644
--- a/tests/unit/test_function_executor.py
+++ b/tests/unit/test_function_executor.py
@@ -27,7 +27,7 @@ def encode_kwargs(self, **kwargs):
for k, v in kwargs.items()
}
- def test_execute_simple_function(self):
+ async def test_execute_simple_function(self):
"""Test basic function execution."""
request = FunctionRequest(
function_name="hello",
@@ -36,13 +36,13 @@ def test_execute_simple_function(self):
kwargs={},
)
- response = self.executor.execute(request)
+ response = await self.executor.execute(request)
assert response.success is True
result = cloudpickle.loads(base64.b64decode(response.result))
assert result == "hello world"
- def test_execute_function_with_args(self):
+ async def test_execute_function_with_args(self):
"""Test function execution with arguments."""
request = FunctionRequest(
function_name="add",
@@ -51,13 +51,13 @@ def test_execute_function_with_args(self):
kwargs={},
)
- response = self.executor.execute(request)
+ response = await self.executor.execute(request)
assert response.success is True
result = cloudpickle.loads(base64.b64decode(response.result))
assert result == 8
- def test_execute_function_with_kwargs(self):
+ async def test_execute_function_with_kwargs(self):
"""Test function execution with keyword arguments."""
request = FunctionRequest(
function_name="greet",
@@ -66,13 +66,13 @@ def test_execute_function_with_kwargs(self):
kwargs=self.encode_kwargs(greeting="Hi"),
)
- response = self.executor.execute(request)
+ response = await self.executor.execute(request)
assert response.success is True
result = cloudpickle.loads(base64.b64decode(response.result))
assert result == "Hi, Alice!"
- def test_execute_function_not_found(self):
+ async def test_execute_function_not_found(self):
"""Test execution when function is not found in code."""
request = FunctionRequest(
function_name="missing_func",
@@ -81,13 +81,13 @@ def test_execute_function_not_found(self):
kwargs={},
)
- response = self.executor.execute(request)
+ response = await self.executor.execute(request)
assert response.success is False
assert "missing_func" in response.result
assert "not found" in response.result
- def test_execute_function_with_exception(self):
+ async def test_execute_function_with_exception(self):
"""Test error handling when function raises exception."""
request = FunctionRequest(
function_name="error_func",
@@ -96,13 +96,13 @@ def test_execute_function_with_exception(self):
kwargs={},
)
- response = self.executor.execute(request)
+ response = await self.executor.execute(request)
assert response.success is False
assert "Test error" in response.error
assert "ValueError" in response.error
- def test_execute_function_with_output_capture(self):
+ async def test_execute_function_with_output_capture(self):
"""Test that stdout, stderr, and logs are captured."""
request = FunctionRequest(
function_name="output_func",
@@ -120,7 +120,7 @@ def output_func():
kwargs={},
)
- response = self.executor.execute(request)
+ response = await self.executor.execute(request)
assert response.success is True
result = cloudpickle.loads(base64.b64decode(response.result))
@@ -130,6 +130,96 @@ def output_func():
assert "log message" in response.stdout
+class TestAsyncFunctionSupport:
+ """Test async function execution support."""
+
+ def setup_method(self):
+ """Setup for each test method."""
+ self.executor = FunctionExecutor()
+
+ def encode_args(self, *args):
+ """Helper to encode arguments."""
+ return [
+ base64.b64encode(cloudpickle.dumps(arg)).decode("utf-8") for arg in args
+ ]
+
+ async def test_execute_async_function(self):
+ """Test execution of async function."""
+ request = FunctionRequest(
+ function_name="async_hello",
+ function_code="async def async_hello():\n return 'async hello world'",
+ args=[],
+ kwargs={},
+ )
+
+ response = await self.executor.execute(request)
+
+ assert response.success is True
+ result = cloudpickle.loads(base64.b64decode(response.result))
+ assert result == "async hello world"
+
+ async def test_execute_async_function_with_args(self):
+ """Test async function with arguments."""
+ request = FunctionRequest(
+ function_name="async_multiply",
+ function_code="async def async_multiply(x, y):\n return x * y",
+ args=self.encode_args(6, 7),
+ kwargs={},
+ )
+
+ response = await self.executor.execute(request)
+
+ assert response.success is True
+ result = cloudpickle.loads(base64.b64decode(response.result))
+ assert result == 42
+
+ async def test_execute_async_function_with_await(self):
+ """Test async function that uses await."""
+ request = FunctionRequest(
+ function_name="async_with_await",
+ function_code="""
+import asyncio
+
+async def async_with_await(delay):
+ await asyncio.sleep(delay)
+ return f'slept for {delay}s'
+""",
+ args=self.encode_args(0.01),
+ kwargs={},
+ )
+
+ response = await self.executor.execute(request)
+
+ assert response.success is True
+ result = cloudpickle.loads(base64.b64decode(response.result))
+ assert result == "slept for 0.01s"
+
+ async def test_execute_async_function_with_dict_return(self):
+ """Test async function returning dict (like GPU worker)."""
+ request = FunctionRequest(
+ function_name="gpu_matrix_multiply",
+ function_code="""
+async def gpu_matrix_multiply(input_data: dict) -> dict:
+ size = input_data.get("matrix_size", 100)
+ return {
+ "status": "success",
+ "matrix_size": size,
+ "result_shape": [size, size],
+ }
+""",
+ args=self.encode_args({"matrix_size": 500}),
+ kwargs={},
+ )
+
+ response = await self.executor.execute(request)
+
+ assert response.success is True
+ result = cloudpickle.loads(base64.b64decode(response.result))
+ assert result["status"] == "success"
+ assert result["matrix_size"] == 500
+ assert result["result_shape"] == [500, 500]
+
+
class TestErrorHandling:
"""Test error handling in function execution."""
@@ -137,7 +227,7 @@ def setup_method(self):
"""Setup for each test method."""
self.executor = FunctionExecutor()
- def test_execute_function_handles_errors(self):
+ async def test_execute_function_handles_errors(self):
"""Test that function execution properly handles errors."""
request = FunctionRequest(
function_name="error_func",
@@ -146,7 +236,7 @@ def test_execute_function_handles_errors(self):
kwargs={},
)
- response = self.executor.execute(request)
+ response = await self.executor.execute(request)
# Verify error was captured
assert response.success is False
diff --git a/uv.lock b/uv.lock
index ef96dd3..7ad246a 100644
--- a/uv.lock
+++ b/uv.lock
@@ -2614,7 +2614,7 @@ wheels = [
[[package]]
name = "worker-tetra"
-version = "0.6.0"
+version = "0.7.0"
source = { virtual = "." }
dependencies = [
{ name = "cloudpickle" },