From 316a0b256b8fef002eee4e612355968544663ac4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dean=20Qui=C3=B1anola?= Date: Thu, 13 Nov 2025 21:39:31 -0800 Subject: [PATCH 1/6] feat(executor): add async function and method execution support Add support for executing both sync and async functions/methods in FunctionExecutor and ClassExecutor. Uses inspect.iscoroutinefunction() to detect async callables and asyncio.run() to execute them. Changes: - FunctionExecutor: Added async function support with detection and execution - ClassExecutor: Added async method support with same pattern - Comprehensive test coverage for async execution patterns including: - Basic async functions/methods - Async with await operations - Dict returns (GPU worker pattern) - Instance persistence across async calls - Mixed sync/async methods in same class --- src/class_executor.py | 11 +- src/function_executor.py | 11 +- tests/unit/test_class_executor.py | 208 +++++++++++++++++++++++++++ tests/unit/test_function_executor.py | 90 ++++++++++++ uv.lock | 2 +- 5 files changed, 317 insertions(+), 5 deletions(-) diff --git a/src/class_executor.py b/src/class_executor.py index 9347ed7..1f9e44a 100644 --- a/src/class_executor.py +++ b/src/class_executor.py @@ -2,6 +2,8 @@ import logging import traceback import uuid +import asyncio +import inspect from contextlib import redirect_stdout, redirect_stderr from datetime import datetime from typing import Dict, Any, Tuple @@ -55,8 +57,13 @@ def execute_class_method(self, request: FunctionRequest) -> FunctionResponse: args = SerializationUtils.deserialize_args(request.args) kwargs = SerializationUtils.deserialize_kwargs(request.kwargs) - # Execute the method - result = method(*args, **kwargs) + # Execute the method (handle both sync and async) + if inspect.iscoroutinefunction(method): + # Async method - need to await it + result = asyncio.run(method(*args, **kwargs)) + else: + # Sync method - call directly + result = method(*args, **kwargs) # Update instance metadata self._update_instance_metadata(instance_id) diff --git a/src/function_executor.py b/src/function_executor.py index 7f94690..e42fdf4 100644 --- a/src/function_executor.py +++ b/src/function_executor.py @@ -1,6 +1,8 @@ import io import logging import traceback +import asyncio +import inspect from contextlib import redirect_stdout, redirect_stderr from typing import Dict, Any @@ -50,8 +52,13 @@ def execute(self, request: FunctionRequest) -> FunctionResponse: args = SerializationUtils.deserialize_args(request.args) kwargs = SerializationUtils.deserialize_kwargs(request.kwargs) - # Execute the function - result = func(*args, **kwargs) + # Execute the function (handle both sync and async) + if inspect.iscoroutinefunction(func): + # Async function - need to await it + result = asyncio.run(func(*args, **kwargs)) + else: + # Sync function - call directly + result = func(*args, **kwargs) except Exception as e: # Combine output streams diff --git a/tests/unit/test_class_executor.py b/tests/unit/test_class_executor.py index 727231d..835dd36 100644 --- a/tests/unit/test_class_executor.py +++ b/tests/unit/test_class_executor.py @@ -325,3 +325,211 @@ def __init__(self): assert response.success is False assert "MissingClass" in response.error assert "not found" in response.error + + +class TestAsyncMethodSupport: + """Test async method execution support.""" + + def setup_method(self): + """Setup for each test method.""" + self.executor = ClassExecutor() + + def encode_args(self, *args): + """Helper to encode arguments.""" + return [ + base64.b64encode(cloudpickle.dumps(arg)).decode("utf-8") for arg in args + ] + + def test_execute_async_method(self): + """Test execution of async method.""" + request = FunctionRequest( + execution_type="class", + class_name="AsyncGreeter", + class_code=""" +class AsyncGreeter: + def __init__(self, greeting): + self.greeting = greeting + + async def greet(self, name): + return f'{self.greeting}, {name}!' +""", + method_name="greet", + constructor_args=self.encode_args("Hello"), + args=self.encode_args("World"), + kwargs={}, + ) + + response = self.executor.execute_class_method(request) + + assert response.success is True + result = cloudpickle.loads(base64.b64decode(response.result)) + assert result == "Hello, World!" + + def test_execute_async_method_with_await(self): + """Test async method that uses await.""" + request = FunctionRequest( + execution_type="class", + class_name="AsyncWorker", + class_code=""" +import asyncio + +class AsyncWorker: + def __init__(self): + self.processed = [] + + async def process(self, item, delay=0.01): + await asyncio.sleep(delay) + self.processed.append(item) + return f'Processed: {item}' +""", + method_name="process", + args=self.encode_args("task1"), + kwargs={}, + ) + + response = self.executor.execute_class_method(request) + + assert response.success is True + result = cloudpickle.loads(base64.b64decode(response.result)) + assert result == "Processed: task1" + + def test_execute_async_method_returning_dict(self): + """Test async method returning dict (like GPU worker).""" + request = FunctionRequest( + execution_type="class", + class_name="GPUProcessor", + class_code=""" +class GPUProcessor: + def __init__(self, gpu_id): + self.gpu_id = gpu_id + + async def process_batch(self, input_data: dict) -> dict: + batch_size = input_data.get("batch_size", 32) + return { + "status": "success", + "gpu_id": self.gpu_id, + "batch_size": batch_size, + "processed_items": batch_size * 10, + } +""", + method_name="process_batch", + constructor_args=self.encode_args("cuda:0"), + args=self.encode_args({"batch_size": 64}), + kwargs={}, + ) + + response = self.executor.execute_class_method(request) + + assert response.success is True + result = cloudpickle.loads(base64.b64decode(response.result)) + assert result["status"] == "success" + assert result["gpu_id"] == "cuda:0" + assert result["batch_size"] == 64 + assert result["processed_items"] == 640 + + def test_async_method_instance_persistence(self): + """Test that async methods work with instance persistence.""" + # Create instance and call async method + initial_request = FunctionRequest( + execution_type="class", + class_name="AsyncCounter", + class_code=""" +import asyncio + +class AsyncCounter: + def __init__(self, start=0): + self.count = start + + async def increment(self): + await asyncio.sleep(0.01) + self.count += 1 + return self.count + + async def get_count(self): + return self.count +""", + method_name="increment", + constructor_args=self.encode_args(0), + create_new_instance=True, + args=[], + kwargs={}, + ) + + first_response = self.executor.execute_class_method(initial_request) + instance_id = first_response.instance_id + + assert first_response.success is True + result1 = cloudpickle.loads(base64.b64decode(first_response.result)) + assert result1 == 1 + + # Reuse instance with another async method call + reuse_request = FunctionRequest( + execution_type="class", + class_name="AsyncCounter", + class_code="# Code not needed for reuse", + method_name="get_count", + instance_id=instance_id, + create_new_instance=False, + args=[], + kwargs={}, + ) + + second_response = self.executor.execute_class_method(reuse_request) + + assert second_response.success is True + assert second_response.instance_id == instance_id + result2 = cloudpickle.loads(base64.b64decode(second_response.result)) + assert result2 == 1 # Count should be preserved from first call + + def test_mixed_sync_async_methods(self): + """Test class with both sync and async methods.""" + # First call sync method + sync_request = FunctionRequest( + execution_type="class", + class_name="MixedClass", + class_code=""" +import asyncio + +class MixedClass: + def __init__(self): + self.value = 0 + + def sync_set(self, val): + self.value = val + return f'Sync set: {val}' + + async def async_get(self): + await asyncio.sleep(0.01) + return f'Async get: {self.value}' +""", + method_name="sync_set", + constructor_args=[], + create_new_instance=True, + args=self.encode_args(42), + kwargs={}, + ) + + sync_response = self.executor.execute_class_method(sync_request) + instance_id = sync_response.instance_id + + assert sync_response.success is True + sync_result = cloudpickle.loads(base64.b64decode(sync_response.result)) + assert sync_result == "Sync set: 42" + + # Then call async method on same instance + async_request = FunctionRequest( + execution_type="class", + class_name="MixedClass", + class_code="# Code not needed for reuse", + method_name="async_get", + instance_id=instance_id, + create_new_instance=False, + args=[], + kwargs={}, + ) + + async_response = self.executor.execute_class_method(async_request) + + assert async_response.success is True + async_result = cloudpickle.loads(base64.b64decode(async_response.result)) + assert async_result == "Async get: 42" diff --git a/tests/unit/test_function_executor.py b/tests/unit/test_function_executor.py index 815e326..5e3935d 100644 --- a/tests/unit/test_function_executor.py +++ b/tests/unit/test_function_executor.py @@ -130,6 +130,96 @@ def output_func(): assert "log message" in response.stdout +class TestAsyncFunctionSupport: + """Test async function execution support.""" + + def setup_method(self): + """Setup for each test method.""" + self.executor = FunctionExecutor() + + def encode_args(self, *args): + """Helper to encode arguments.""" + return [ + base64.b64encode(cloudpickle.dumps(arg)).decode("utf-8") for arg in args + ] + + def test_execute_async_function(self): + """Test execution of async function.""" + request = FunctionRequest( + function_name="async_hello", + function_code="async def async_hello():\n return 'async hello world'", + args=[], + kwargs={}, + ) + + response = self.executor.execute(request) + + assert response.success is True + result = cloudpickle.loads(base64.b64decode(response.result)) + assert result == "async hello world" + + def test_execute_async_function_with_args(self): + """Test async function with arguments.""" + request = FunctionRequest( + function_name="async_multiply", + function_code="async def async_multiply(x, y):\n return x * y", + args=self.encode_args(6, 7), + kwargs={}, + ) + + response = self.executor.execute(request) + + assert response.success is True + result = cloudpickle.loads(base64.b64decode(response.result)) + assert result == 42 + + def test_execute_async_function_with_await(self): + """Test async function that uses await.""" + request = FunctionRequest( + function_name="async_with_await", + function_code=""" +import asyncio + +async def async_with_await(delay): + await asyncio.sleep(delay) + return f'slept for {delay}s' +""", + args=self.encode_args(0.01), + kwargs={}, + ) + + response = self.executor.execute(request) + + assert response.success is True + result = cloudpickle.loads(base64.b64decode(response.result)) + assert result == "slept for 0.01s" + + def test_execute_async_function_with_dict_return(self): + """Test async function returning dict (like GPU worker).""" + request = FunctionRequest( + function_name="gpu_matrix_multiply", + function_code=""" +async def gpu_matrix_multiply(input_data: dict) -> dict: + size = input_data.get("matrix_size", 100) + return { + "status": "success", + "matrix_size": size, + "result_shape": [size, size], + } +""", + args=self.encode_args({"matrix_size": 500}), + kwargs={}, + ) + + response = self.executor.execute(request) + + assert response.success is True + result = cloudpickle.loads(base64.b64decode(response.result)) + assert result["status"] == "success" + assert result["matrix_size"] == 500 + assert result["result_shape"] == [500, 500] + + class TestErrorHandling: """Test error handling in function execution.""" diff --git a/uv.lock b/uv.lock index ef96dd3..7ad246a 100644 --- a/uv.lock +++ b/uv.lock @@ -2614,7 +2614,7 @@ wheels = [ [[package]] name = "worker-tetra" -version = "0.6.0" +version = "0.7.0" source = { virtual = "." } dependencies = [ { name = "cloudpickle" }, From 180d4e993b751488c2a8bfd8483f9c5a5e628b80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dean=20Qui=C3=B1anola?= Date: Thu, 13 Nov 2025 21:58:58 -0800 Subject: [PATCH 2/6] docs: DEVELOPMENT.md guide --- DEVELOPMENT.md | 1141 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1141 insertions(+) create mode 100644 DEVELOPMENT.md diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md new file mode 100644 index 0000000..07787fe --- /dev/null +++ b/DEVELOPMENT.md @@ -0,0 +1,1141 @@ +# DEVELOPMENT + +Developer guide for contributing to worker-tetra, a RunPod Serverless worker for remote Python execution. + +## Table of Contents + +- [Getting Started](#getting-started) +- [Development Workflow](#development-workflow) +- [Testing Strategy](#testing-strategy) +- [Code Quality & Standards](#code-quality--standards) +- [Architecture Overview](#architecture-overview) +- [Common Development Tasks](#common-development-tasks) +- [Docker Development](#docker-development) +- [Submodule Management](#submodule-management) +- [CI/CD Pipeline](#cicd-pipeline) +- [Contributing Guidelines](#contributing-guidelines) +- [Debugging Guide](#debugging-guide) +- [Troubleshooting](#troubleshooting) + +## Getting Started + +### Prerequisites + +- Python 3.11+ (3.12 recommended) +- Docker Desktop (for container testing) +- `uv` package manager ([installation](https://github.com/astral-sh/uv)) +- Git with submodule support + +### Initial Setup + +```bash +# Clone repository with submodules +git clone --recurse-submodules https://github.com/runpod/worker-tetra.git +cd worker-tetra + +# Initialize project (creates venv, syncs deps, updates submodules) +make setup + +# Activate virtual environment +source .venv/bin/activate + +# Verify setup with tests +make test +``` + +### Environment Variables + +Create a `.env` file (gitignored) for local development: + +```bash +# Optional - only needed for RunPod integration testing +RUNPOD_API_KEY=your_key_here + +# Optional - for HuggingFace private models +HF_TOKEN=your_token_here +``` + +## Development Workflow + +### Standard Development Cycle + +1. **Create feature branch** + ```bash + git checkout -b feature/TICKET-description + ``` + +2. **Write failing tests first** (TDD approach) + ```bash + # Add test to tests/unit/test_*.py + make test-fast # Should fail + ``` + +3. **Implement feature** + ```bash + # Edit src/*.py + make test-fast # Should pass + ``` + +4. **Run quality checks** + ```bash + make quality-check # Format, lint, typecheck, coverage, handler tests + ``` + +5. **Commit and push** + ```bash + git add . + git commit -m "feat(component): description" + git push origin feature/TICKET-description + ``` + +### Daily Commands + +```bash +# Install/sync dependencies after pulling changes +make dev + +# Run tests during development (fail-fast mode) +make test-fast + +# Format code before committing +make format + +# Check everything before push +make quality-check +``` + +## Testing Strategy + +### Test Organization + +``` +tests/ +├── unit/ # Fast, isolated component tests +│ ├── test_function_executor.py +│ ├── test_class_executor.py +│ ├── test_dependency_installer.py +│ └── ... +├── integration/ # End-to-end workflow tests +│ ├── test_handler_integration.py +│ └── test_remote_execution.py +└── conftest.py # Shared fixtures +``` + +### When to Write Each Test Type + +**Unit Tests** (`tests/unit/`): +- Testing individual components in isolation +- Mocking external dependencies +- Fast execution (< 100ms per test) +- Example: Testing `FunctionExecutor.execute()` with mock request + +**Integration Tests** (`tests/integration/`): +- Testing complete workflows +- Real dependency installation +- Slower execution (seconds) +- Example: End-to-end remote function execution + +### Test Structure (AAA Pattern) + +```python +def test_feature_behavior(): + # Arrange - Set up test data + executor = FunctionExecutor() + request = FunctionRequest( + function_name="test_func", + function_code="def test_func(): return 42", + args=[], + kwargs={}, + ) + + # Act - Execute the operation + response = executor.execute(request) + + # Assert - Verify expectations + assert response.success is True + result = cloudpickle.loads(base64.b64decode(response.result)) + assert result == 42 +``` + +### Testing Async Functions/Methods + +```python +def test_execute_async_function(): + """Test async function execution.""" + request = FunctionRequest( + function_name="async_func", + function_code="async def async_func(): return 'result'", + args=[], + kwargs={}, + ) + + response = executor.execute(request) + + assert response.success is True + # Async functions are executed with asyncio.run() internally +``` + +### Running Tests + +```bash +# Run all tests +make test + +# Run only unit tests +make test-unit + +# Run only integration tests +make test-integration + +# Run with coverage report (HTML in htmlcov/) +make test-coverage + +# Run with fail-fast (stop on first failure) +make test-fast + +# Run specific test file +uv run pytest tests/unit/test_function_executor.py -xvs + +# Run specific test function +uv run pytest tests/unit/test_function_executor.py::TestFunctionExecution::test_execute_simple_function -xvs + +# Test handler with JSON test files +make test-handler +``` + +### Coverage Requirements + +- Minimum: 35% total coverage (enforced in CI) +- Target: 80%+ for new code +- Critical paths: 90%+ (executors, serialization) + +## Code Quality & Standards + +### Formatting + +```bash +# Format all code (modifies files) +make format + +# Check formatting without changes +make format-check +``` + +**Rules:** +- PEP 8 compliance via ruff +- Line length: 88 characters +- Single newline at end of files +- No trailing whitespace + +### Linting + +```bash +# Check for issues +make lint + +# Auto-fix fixable issues +make lint-fix +``` + +**Key Rules:** +- No unused imports +- No undefined variables +- No mutable default arguments +- No bare `except:` clauses + +### Type Checking + +```bash +# Run mypy type checker +make typecheck +``` + +**Requirements:** +- Type hints mandatory for all functions +- No `Any` types without justification +- Pydantic models for data validation + +Example: +```python +def process_data(items: list[dict[str, Any]]) -> pd.DataFrame: + """Process items and return DataFrame.""" + pass +``` + +### Pre-Commit Quality Check + +Always run before committing: + +```bash +make quality-check +``` + +This runs: +1. Format check +2. Lint check +3. Type check +4. Test suite with coverage +5. Handler test files + +## Architecture Overview + +### Component Hierarchy + +```mermaid +graph TB + Handler[handler.py
RunPod Entry Point]:::entry + RemoteExec[remote_executor.py
Central Orchestrator]:::core + DepInst[dependency_installer.py
Package Management]:::support + FuncExec[function_executor.py
Function Execution]:::exec + ClassExec[class_executor.py
Class/Method Execution]:::exec + BaseExec[base_executor.py
Common Interface]:::support + + Handler --> RemoteExec + RemoteExec --> DepInst + RemoteExec --> FuncExec + RemoteExec --> ClassExec + FuncExec --> BaseExec + ClassExec --> BaseExec + + classDef entry fill:#1976d2,stroke:#0d47a1,stroke-width:3px,color:#fff + classDef core fill:#388e3c,stroke:#1b5e20,stroke-width:3px,color:#fff + classDef exec fill:#f57c00,stroke:#e65100,stroke-width:3px,color:#fff + classDef support fill:#7b1fa2,stroke:#4a148c,stroke-width:3px,color:#fff +``` + +### Execution Flow + +```mermaid +sequenceDiagram + participant Client as Tetra Client + participant Handler as handler.py + participant Remote as remote_executor.py + participant DepInst as dependency_installer.py + participant Executor as function/class_executor.py + + Client->>Handler: FunctionRequest (serialized) + Handler->>Remote: Deserialize & route + Remote->>DepInst: Install dependencies + DepInst-->>Remote: Dependencies ready + Remote->>Executor: Execute function/method + Executor-->>Remote: FunctionResponse + Remote-->>Handler: Serialize response + Handler-->>Client: Return result +``` + +### Key Patterns + +**Composition Over Inheritance:** +- `RemoteExecutor` composes `DependencyInstaller` and executors +- Clear separation of concerns + +**Async Support:** +- Detects async functions with `inspect.iscoroutinefunction()` +- Executes with `asyncio.run()` for async, direct call for sync +- Supports both in `FunctionExecutor` and `ClassExecutor` + +**Serialization:** +- CloudPickle for function arguments and results +- Base64 encoding for transport +- Handles complex Python objects + +**Error Handling:** +- Structured responses via `FunctionResponse` +- Full traceback capture +- Combined stdout/stderr/log output + +### Detailed Architecture + +See [CLAUDE.md](CLAUDE.md) for comprehensive architecture documentation and component details. + +## Common Development Tasks + +### Adding a New Executor Type + +1. **Create executor class** + ```python + # src/new_executor.py + from base_executor import BaseExecutor + from remote_execution import FunctionRequest, FunctionResponse + + class NewExecutor(BaseExecutor): + def execute(self, request: FunctionRequest) -> FunctionResponse: + # Implementation + pass + ``` + +2. **Add to RemoteExecutor** + ```python + # src/remote_executor.py + from new_executor import NewExecutor + + def __init__(self): + self.new_executor = NewExecutor() + + def execute(self, request: FunctionRequest) -> FunctionResponse: + if request.execution_type == "new_type": + return self.new_executor.execute(request) + ``` + +3. **Write tests** + ```python + # tests/unit/test_new_executor.py + class TestNewExecutor: + def test_execute_basic(self): + # AAA pattern + pass + ``` + +### Adding System Packages + +System packages that require `apt-get` installation: + +```python +# src/constants.py +LARGE_SYSTEM_PACKAGES = [ + "ffmpeg", + "libsm6", + "libxext6", + "your-package-here", # Add here +] +``` + +### Writing Handler Test Files + +Handler tests validate end-to-end execution: + +```bash +# Create test file +cat > src/tests/test_my_feature.json << 'EOF' +{ + "input": { + "function_name": "my_function", + "function_code": "def my_function(x): return x * 2", + "args": ["Mg=="], # base64(cloudpickle.dumps(3)) + "kwargs": {}, + "python_dependencies": [], + "system_dependencies": [] + } +} +EOF + +# Test locally +make test-handler +``` + +### Modifying Dependency Installation + +Edit `src/dependency_installer.py`: + +```python +def install_python_packages(self, packages: list[str]) -> FunctionResponse: + # Add custom logic + # Uses uv pip install with environment detection + pass + +def install_system_packages(self, packages: list[str]) -> FunctionResponse: + # Add custom logic + # Uses apt-get/nala with acceleration + pass +``` + +### Debugging Remote Execution Failures + +1. **Check serialization** + ```python + # Test argument encoding + import cloudpickle, base64 + arg = "test" + encoded = base64.b64encode(cloudpickle.dumps(arg)).decode() + decoded = cloudpickle.loads(base64.b64decode(encoded)) + assert arg == decoded + ``` + +2. **Review response output** + ```python + response = executor.execute(request) + print(response.stdout) # Combined stdout/stderr/logs + print(response.error) # Error message + traceback + ``` + +3. **Test in isolation** + ```python + # Run function locally first + exec(function_code, namespace := {}) + func = namespace[function_name] + result = func(*args, **kwargs) # Direct execution + ``` + +## Docker Development + +### Building Images + +```bash +# Build both GPU and CPU images +make build + +# Build GPU image only +make build-gpu + +# Build CPU image only +make build-cpu + +# Build and test on macOS (ARM) +make smoketest-macos-build +make smoketest-macos +``` + +### Image Details + +**GPU Image (`Dockerfile`):** +- Base: `runpod/pytorch:2.8.0-py3.11-cuda12.8.0-devel-ubuntu24.04` +- Platform: `linux/amd64` +- CUDA 12.8 support +- PyTorch 2.8.0 pre-installed + +**CPU Image (`Dockerfile-cpu`):** +- Base: `python:3.11-slim` +- Platform: `linux/amd64` +- Minimal footprint + +### Testing in Containers + +```bash +# Build image +make build-cpu + +# Run container interactively +docker run -it --rm \ + -v $(pwd):/workspace \ + -e RUNPOD_TEST_INPUT="$(cat src/tests/test_input.json)" \ + runpod/worker-tetra:dev \ + /bin/bash + +# Inside container, run handler +cd /workspace +python handler.py +``` + +### Multi-Architecture Builds + +CI builds for multiple platforms: +- GPU: `linux/amd64` +- CPU: `linux/amd64`, `linux/arm64` + +## Submodule Management + +### Understanding tetra-rp + +The `tetra-rp/` submodule contains the Tetra SDK: +- Client library with `@remote` decorator +- Resource management (`LiveServerless`) +- Protocol definitions + +### Updating Submodule + +```bash +# Update to latest tetra-rp +git submodule update --remote --rebase + +# Sync protocol definitions +make protocols + +# Test with updated submodule +make test + +# Commit submodule update +git add tetra-rp +git commit -m "chore: update tetra-rp submodule to latest" +``` + +### Protocol Synchronization + +Protocol definitions must stay in sync: + +```bash +# Copy remote_execution.py from submodule +make protocols + +# Verify no breaking changes +make test +``` + +### Submodule Development Workflow + +Working on both worker and SDK: + +```bash +# Make changes in tetra-rp/ +cd tetra-rp +git checkout -b feature/my-change +# ... make changes ... +git commit -m "feat: my change" +cd .. + +# Update worker to use new tetra-rp +git add tetra-rp +git commit -m "chore: use tetra-rp feature branch" + +# After tetra-rp PR merges, update to main +cd tetra-rp +git checkout main +git pull +cd .. +git add tetra-rp +git commit -m "chore: update tetra-rp to latest main" +``` + +## CI/CD Pipeline + +### GitHub Actions Workflows + +**Primary Workflow (`.github/workflows/ci.yml`):** + +```mermaid +graph LR + PR[Pull Request]:::pr --> Test[Test Job
Python 3.9-3.13]:::test + PR --> Lint[Lint Job
Ruff + Formatting]:::lint + PR --> Docker[Docker Test
CPU Build]:::docker + + Main[Push to Main]:::main --> Test + Main --> Lint + Main --> Release[Release Please]:::release + Main --> DockerMain[Docker Main
Push :main tags]:::dockerpush + + Release --> DockerProd[Docker Prod
Semantic versions]:::dockerpush + + classDef pr fill:#1976d2,stroke:#0d47a1,stroke-width:3px,color:#fff + classDef main fill:#388e3c,stroke:#1b5e20,stroke-width:3px,color:#fff + classDef test fill:#f57c00,stroke:#e65100,stroke-width:3px,color:#fff + classDef lint fill:#7b1fa2,stroke:#4a148c,stroke-width:3px,color:#fff + classDef docker fill:#0288d1,stroke:#01579b,stroke-width:3px,color:#fff + classDef release fill:#c62828,stroke:#b71c1c,stroke-width:3px,color:#fff + classDef dockerpush fill:#2e7d32,stroke:#1b5e20,stroke-width:3px,color:#fff +``` + +**Test Job:** +- Runs on Python 3.9, 3.10, 3.11, 3.12, 3.13 +- Executes `make test-coverage` +- Requires 35% minimum coverage +- Tests handler with all `test_*.json` files + +**Lint Job:** +- Python 3.11 only +- Runs `make format-check` and `make lint` + +**Docker Jobs:** +- Builds GPU and CPU images +- Pushes `:main` tags on main branch +- Pushes semantic version tags on release + +### Release Process + +Automated with [release-please](https://github.com/googleapis/release-please): + +1. **Make commits with conventional format** + ```bash + git commit -m "feat: new feature" + git commit -m "fix: bug fix" + git commit -m "refactor: code improvement" + ``` + +2. **Release Please creates PR** + - Auto-generates changelog + - Bumps version in `pyproject.toml` + - Updates `CHANGELOG.md` + +3. **Merge release PR** + - Creates GitHub release + - Tags with semantic version + - Triggers Docker production builds + +4. **Docker images published** + - `runpod/worker-tetra:latest` + - `runpod/worker-tetra:X.Y.Z` + - `runpod/worker-tetra:X.Y` + - `runpod/worker-tetra:X` + +### Fixing CI Failures Locally + +**Test failures:** +```bash +# Run exact CI test command +make test-coverage + +# Check coverage report +open htmlcov/index.html +``` + +**Lint failures:** +```bash +# Run exact CI lint commands +make format-check +make lint +``` + +**Docker build failures:** +```bash +# Build locally +make build-cpu + +# Test built image +docker run --rm runpod/worker-tetra:dev python -c "import handler" +``` + +## Contributing Guidelines + +### Git Workflow + +1. **Branch from main** + ```bash + git checkout main + git pull origin main + git checkout -b feature/TICKET-description + ``` + +2. **Branch naming conventions** + - `feature/TICKET-description` - New features + - `fix/TICKET-description` - Bug fixes + - `refactor/description` - Code improvements + - `perf/description` - Performance improvements + - `docs/description` - Documentation + +3. **Make commits** + ```bash + git add . + git commit -m "type(scope): subject" + ``` + +### Commit Message Format + +Follow [Conventional Commits](https://www.conventionalcommits.org/): + +``` +type(scope): subject + +Longer description if needed. + +- Bullet points for multiple changes +- Reference issue numbers +``` + +**Types:** +- `feat` - New feature +- `fix` - Bug fix +- `refactor` - Code refactoring (included in release notes) +- `perf` - Performance improvement +- `test` - Adding/updating tests +- `docs` - Documentation only +- `chore` - Maintenance tasks +- `build` - Build system changes + +**Scopes:** +- `executor` - Executor components +- `installer` - Dependency installer +- `handler` - Handler entry point +- `serialization` - Serialization utils +- `logging` - Log streaming +- `ci` - CI/CD changes + +**Examples:** +```bash +git commit -m "feat(executor): add async function execution support" +git commit -m "fix(installer): handle missing system packages gracefully" +git commit -m "refactor(serialization): simplify cloudpickle encoding" +git commit -m "docs: update DEVELOPMENT.md with testing guide" +``` + +### Pull Request Checklist + +Before opening PR: + +- [ ] All tests pass (`make test`) +- [ ] Code formatted (`make format`) +- [ ] No lint errors (`make lint`) +- [ ] Type hints present (`make typecheck`) +- [ ] Coverage meets minimum 35% (`make test-coverage`) +- [ ] Handler tests pass (`make test-handler`) +- [ ] Commits follow conventional format +- [ ] PR description explains changes + +**PR Template:** + +```markdown +## Summary +Brief description of changes + +## Changes +- Change 1 +- Change 2 + +## Testing +How was this tested? + +## Related Issues +Fixes #123 +``` + +### Code Review Expectations + +**As Author:** +- Respond to feedback within 24 hours +- Keep PRs focused and small +- Update based on review comments +- Ensure CI passes before requesting review + +**As Reviewer:** +- Review within 48 hours +- Check for correctness, readability, tests +- Suggest improvements, don't demand perfection +- Approve when requirements met + +## Debugging Guide + +### Debugging Executor Components + +**FunctionExecutor Issues:** + +```python +# Enable debug logging +import logging +logging.basicConfig(level=logging.DEBUG) + +# Test function execution +from function_executor import FunctionExecutor +from remote_execution import FunctionRequest + +executor = FunctionExecutor() +request = FunctionRequest( + function_name="test", + function_code="def test(): return 42", + args=[], + kwargs={}, +) + +response = executor.execute(request) +print(f"Success: {response.success}") +print(f"Result: {response.result}") +print(f"Output: {response.stdout}") +print(f"Error: {response.error}") +``` + +**ClassExecutor Issues:** + +```python +# Test class method execution +from class_executor import ClassExecutor + +executor = ClassExecutor() +request = FunctionRequest( + execution_type="class", + class_name="TestClass", + class_code=""" +class TestClass: + def __init__(self, value): + self.value = value + def get(self): + return self.value +""", + method_name="get", + constructor_args=encoded_args, + args=[], + kwargs={}, +) + +response = executor.execute_class_method(request) +# Check response.instance_id, response.instance_info +``` + +### Log Streaming and Output Capture + +All executor output is captured: + +```python +# In your test function +def test_func(): + print("stdout message") # Captured + logging.info("log message") # Captured + import sys + sys.stderr.write("stderr message\n") # Captured + return "result" + +# All output available in response.stdout +response = executor.execute(request) +assert "stdout message" in response.stdout +assert "log message" in response.stdout +assert "stderr message" in response.stdout +``` + +### Dependency Installation Issues + +**Debug Python packages:** + +```python +from dependency_installer import DependencyInstaller + +installer = DependencyInstaller() +response = installer.install_python_packages(["numpy", "pandas"]) + +if not response.success: + print(f"Installation failed: {response.error}") + print(f"Output: {response.stdout}") +``` + +**Debug system packages:** + +```python +response = installer.install_system_packages(["ffmpeg", "libsm6"]) + +if not response.success: + print(f"Installation failed: {response.error}") + # Check if Docker vs local environment + # Check package availability with apt-cache +``` + +### Serialization/Deserialization Failures + +**Test serialization:** + +```python +import cloudpickle +import base64 +from serialization_utils import SerializationUtils + +# Test argument serialization +args = [1, 2, 3] +encoded = [base64.b64encode(cloudpickle.dumps(arg)).decode() for arg in args] +decoded = SerializationUtils.deserialize_args(encoded) +assert args == decoded + +# Test kwargs serialization +kwargs = {"key": "value"} +encoded = {k: base64.b64encode(cloudpickle.dumps(v)).decode() + for k, v in kwargs.items()} +decoded = SerializationUtils.deserialize_kwargs(encoded) +assert kwargs == decoded +``` + +**Handle serialization errors:** + +```python +try: + result = cloudpickle.dumps(complex_object) +except Exception as e: + # Some objects can't be pickled (file handles, sockets, etc.) + print(f"Serialization failed: {e}") + # Simplify the object or use alternative serialization +``` + +### Async Execution Problems + +**Debug async function execution:** + +```python +import asyncio +import inspect + +async def async_func(): + await asyncio.sleep(0.1) + return "result" + +# Check if function is coroutine +assert inspect.iscoroutinefunction(async_func) + +# Execute manually +result = asyncio.run(async_func()) +assert result == "result" + +# Test through executor +request = FunctionRequest( + function_name="async_func", + function_code="async def async_func(): return 'result'", + args=[], + kwargs={}, +) +response = executor.execute(request) +# Executor handles asyncio.run() internally +``` + +### Common Error Patterns + +**Function not found:** +```python +# Error: Function 'my_func' not found in the provided code +# Solution: Ensure function_name matches the actual function name in function_code +``` + +**Serialization mismatch:** +```python +# Error: Deserialization failed +# Solution: Ensure args/kwargs are base64(cloudpickle.dumps(value)) +``` + +**Import errors:** +```python +# Error: ModuleNotFoundError: No module named 'X' +# Solution: Add to python_dependencies in request +``` + +**Async not awaited:** +```python +# Error: coroutine 'func' was never awaited +# Solution: Executors handle this automatically; check if inspect.iscoroutinefunction() works +``` + +## Troubleshooting + +### Common Setup Issues + +**uv not found:** +```bash +# Install uv +curl -LsSf https://astral.sh/uv/install.sh | sh + +# Or via pip +pip install uv +``` + +**Submodule not initialized:** +```bash +# Initialize and update submodules +git submodule update --init --recursive + +# Or clone with submodules +git clone --recurse-submodules +``` + +**Virtual environment issues:** +```bash +# Remove and recreate +rm -rf .venv +make setup +source .venv/bin/activate +``` + +### Test Failures + +**Coverage below 35%:** +```bash +# Check which files need coverage +make test-coverage +open htmlcov/index.html + +# Add tests for uncovered code +# Focus on executor components (highest value) +``` + +**Import errors in tests:** +```bash +# Ensure you're running with uv +uv run pytest tests/ + +# Or ensure PYTHONPATH includes src/ +export PYTHONPATH=src:$PYTHONPATH +pytest tests/ +``` + +**Async test failures:** +```python +# Ensure pytest-asyncio is installed +uv sync --all-groups + +# Check pytest config in pyproject.toml +# asyncio_mode = "auto" should be set +``` + +### Docker Build Problems + +**Build hangs on tzdata:** +```dockerfile +# Fixed in current Dockerfile with: +ENV DEBIAN_FRONTEND=noninteractive +``` + +**Platform mismatch:** +```bash +# Specify platform explicitly +docker build --platform linux/amd64 -f Dockerfile -t test . + +# For M1/M2 Mac development +docker build --platform linux/arm64 -f Dockerfile-cpu -t test . +``` + +**Out of disk space:** +```bash +# Clean Docker resources +docker system prune -a + +# Remove dangling images +docker image prune +``` + +### Submodule Sync Issues + +**Detached HEAD in submodule:** +```bash +cd tetra-rp +git checkout main +git pull +cd .. +git add tetra-rp +git commit -m "chore: update tetra-rp to latest" +``` + +**Protocol out of sync:** +```bash +# Re-sync protocol definitions +make protocols + +# Verify tests still pass +make test +``` + +**Merge conflicts in submodule:** +```bash +# Update to latest +git submodule update --remote --rebase + +# If conflicts +cd tetra-rp +git status # Check conflict +git rebase --continue # After resolving +cd .. +``` + +### CI/CD Issues + +**Tests pass locally but fail in CI:** +```bash +# Run exact CI commands +make test-coverage # For test job +make format-check && make lint # For lint job + +# Check Python version matches CI +python --version # Should be 3.11+ for lint, 3.9-3.13 for tests +``` + +**Docker push fails:** +- Check Docker Hub credentials in GitHub Secrets +- Verify `DOCKERHUB_USERNAME` and `DOCKERHUB_TOKEN` +- Ensure permissions for `runpod/worker-tetra` repository + +**Release Please not creating PR:** +- Ensure commits follow conventional format +- Check `.release-please-manifest.json` is valid +- Verify GitHub token has required permissions + +--- + +## Additional Resources + +- **Architecture Details:** [CLAUDE.md](CLAUDE.md) +- **Design Documents:** [docs/](docs/) +- **Tetra SDK:** [tetra-rp/README.md](tetra-rp/README.md) +- **Release Process:** [tetra-rp/RELEASE_SYSTEM.md](tetra-rp/RELEASE_SYSTEM.md) +- **RunPod Docs:** https://docs.runpod.io/ + +## Getting Help + +- **GitHub Issues:** https://github.com/runpod/worker-tetra/issues +- **RunPod Discord:** https://discord.gg/runpod +- **Internal:** Slack #tetra-development channel From aa7586429a7ae8bc07a3d9272b7819a4c5526d84 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dean=20Qui=C3=B1anola?= Date: Thu, 13 Nov 2025 22:23:14 -0800 Subject: [PATCH 3/6] fix(executor): handle nested event loop contexts in async execution Address PR #42 Copilot feedback about asyncio.run() usage. The previous implementation used asyncio.run() unconditionally, which raises RuntimeError when called from within a running event loop (e.g., nested async calls). This could break in scenarios where: - Handler becomes async in the future - Users nest remote function calls - Execution happens in unexpected async contexts Changes: - Detect running event loop with asyncio.get_running_loop() - Use asyncio.run() when no loop is running (current path) - Use loop.run_until_complete() when loop exists - Applies to both FunctionExecutor and ClassExecutor This future-proofs the async execution logic while maintaining backward compatibility with current synchronous execution context. Addresses: https://github.com/runpod-workers/worker-tetra/pull/42#discussion_r2525869536 Addresses: https://github.com/runpod-workers/worker-tetra/pull/42#discussion_r2525869573 --- src/class_executor.py | 11 +++++++++-- src/function_executor.py | 11 +++++++++-- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/src/class_executor.py b/src/class_executor.py index 1f9e44a..39f7d9a 100644 --- a/src/class_executor.py +++ b/src/class_executor.py @@ -59,8 +59,15 @@ def execute_class_method(self, request: FunctionRequest) -> FunctionResponse: # Execute the method (handle both sync and async) if inspect.iscoroutinefunction(method): - # Async method - need to await it - result = asyncio.run(method(*args, **kwargs)) + # Async method - check for running event loop + try: + loop = asyncio.get_running_loop() + except RuntimeError: + # No running event loop - safe to use asyncio.run() + result = asyncio.run(method(*args, **kwargs)) + else: + # Running inside an event loop - use run_until_complete() + result = loop.run_until_complete(method(*args, **kwargs)) else: # Sync method - call directly result = method(*args, **kwargs) diff --git a/src/function_executor.py b/src/function_executor.py index e42fdf4..22b8d86 100644 --- a/src/function_executor.py +++ b/src/function_executor.py @@ -54,8 +54,15 @@ def execute(self, request: FunctionRequest) -> FunctionResponse: # Execute the function (handle both sync and async) if inspect.iscoroutinefunction(func): - # Async function - need to await it - result = asyncio.run(func(*args, **kwargs)) + # Async function - check for running event loop + try: + loop = asyncio.get_running_loop() + except RuntimeError: + # No running event loop - safe to use asyncio.run() + result = asyncio.run(func(*args, **kwargs)) + else: + # Running inside an event loop - use run_until_complete() + result = loop.run_until_complete(func(*args, **kwargs)) else: # Sync function - call directly result = func(*args, **kwargs) From dcee174ffccde3ddd349d9c848b1700aa952f3c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dean=20Qui=C3=B1anola?= Date: Thu, 13 Nov 2025 23:47:42 -0800 Subject: [PATCH 4/6] fix(executor): convert executors to async to prevent event loop deadlock MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address PR #42 Copilot feedback (round 2) about async/await architecture. Root cause: Handler is async (RunPod supports async handlers) which means an event loop is running when executors execute. Using loop.run_until_complete() from within a running loop causes deadlock. Solution: Make entire execution chain async with proper await: - handler (async) → ExecuteFunction (async) → executors (async) → user funcs Changes: - FunctionExecutor.execute(): Made async, await async functions directly - ClassExecutor.execute_class_method(): Made async, await async methods directly - RemoteExecutor: Added await to executor calls - All tests: Made async and added await to executor calls This eliminates event loop conflicts by using proper async/await chain instead of trying to manage event loops manually. Works correctly with RunPod's async handler support. Fixes: https://github.com/runpod-workers/worker-tetra/pull/42#discussion_r2525869536 Fixes: https://github.com/runpod-workers/worker-tetra/pull/42#discussion_r2525869573 Addresses: Copilot follow-up comments about loop.run_until_complete() deadlock --- src/class_executor.py | 17 ++----- src/function_executor.py | 13 ++--- src/remote_executor.py | 4 +- tests/unit/test_class_executor.py | 72 ++++++++++++++-------------- tests/unit/test_function_executor.py | 44 ++++++++--------- 5 files changed, 68 insertions(+), 82 deletions(-) diff --git a/src/class_executor.py b/src/class_executor.py index 39f7d9a..76cd275 100644 --- a/src/class_executor.py +++ b/src/class_executor.py @@ -20,11 +20,11 @@ def __init__(self): self.class_instances: Dict[str, Any] = {} self.instance_metadata: Dict[str, Dict[str, Any]] = {} - def execute(self, request: FunctionRequest) -> FunctionResponse: + async def execute(self, request: FunctionRequest) -> FunctionResponse: """Execute class method.""" - return self.execute_class_method(request) + return await self.execute_class_method(request) - def execute_class_method(self, request: FunctionRequest) -> FunctionResponse: + async def execute_class_method(self, request: FunctionRequest) -> FunctionResponse: """ Execute a class method with instance management. """ @@ -59,15 +59,8 @@ def execute_class_method(self, request: FunctionRequest) -> FunctionResponse: # Execute the method (handle both sync and async) if inspect.iscoroutinefunction(method): - # Async method - check for running event loop - try: - loop = asyncio.get_running_loop() - except RuntimeError: - # No running event loop - safe to use asyncio.run() - result = asyncio.run(method(*args, **kwargs)) - else: - # Running inside an event loop - use run_until_complete() - result = loop.run_until_complete(method(*args, **kwargs)) + # Async method - await directly + result = await method(*args, **kwargs) else: # Sync method - call directly result = method(*args, **kwargs) diff --git a/src/function_executor.py b/src/function_executor.py index 22b8d86..1b70634 100644 --- a/src/function_executor.py +++ b/src/function_executor.py @@ -13,7 +13,7 @@ class FunctionExecutor: """Handles execution of individual functions with output capture.""" - def execute(self, request: FunctionRequest) -> FunctionResponse: + async def execute(self, request: FunctionRequest) -> FunctionResponse: """ Execute a function with full output capture. @@ -54,15 +54,8 @@ def execute(self, request: FunctionRequest) -> FunctionResponse: # Execute the function (handle both sync and async) if inspect.iscoroutinefunction(func): - # Async function - check for running event loop - try: - loop = asyncio.get_running_loop() - except RuntimeError: - # No running event loop - safe to use asyncio.run() - result = asyncio.run(func(*args, **kwargs)) - else: - # Running inside an event loop - use run_until_complete() - result = loop.run_until_complete(func(*args, **kwargs)) + # Async function - await directly + result = await func(*args, **kwargs) else: # Sync function - call directly result = func(*args, **kwargs) diff --git a/src/remote_executor.py b/src/remote_executor.py index 1577e46..1088e17 100644 --- a/src/remote_executor.py +++ b/src/remote_executor.py @@ -93,9 +93,9 @@ async def ExecuteFunction(self, request: FunctionRequest) -> FunctionResponse: # Execute the function/class if execution_type == "class": - result = self.class_executor.execute_class_method(request) + result = await self.class_executor.execute_class_method(request) else: - result = self.function_executor.execute(request) + result = await self.function_executor.execute(request) # Add all captured system logs to the result system_logs = get_streamed_logs(clear_buffer=True) diff --git a/tests/unit/test_class_executor.py b/tests/unit/test_class_executor.py index 835dd36..6ffeb10 100644 --- a/tests/unit/test_class_executor.py +++ b/tests/unit/test_class_executor.py @@ -28,7 +28,7 @@ def encode_kwargs(self, **kwargs): for k, v in kwargs.items() } - def test_execute_class_method_basic(self): + async def test_execute_class_method_basic(self): """Test basic class method execution.""" request = FunctionRequest( execution_type="class", @@ -47,14 +47,14 @@ def get_value(self): kwargs={}, ) - response = self.executor.execute_class_method(request) + response = await self.executor.execute_class_method(request) assert response.success is True assert response.instance_id is not None result = cloudpickle.loads(base64.b64decode(response.result)) assert result == "Value: test" - def test_execute_class_method_with_args(self): + async def test_execute_class_method_with_args(self): """Test class method execution with method arguments.""" request = FunctionRequest( execution_type="class", @@ -73,13 +73,13 @@ def add(self, a, b): kwargs={}, ) - response = self.executor.execute_class_method(request) + response = await self.executor.execute_class_method(request) assert response.success is True result = cloudpickle.loads(base64.b64decode(response.result)) assert result == 8 - def test_execute_class_method_default_call(self): + async def test_execute_class_method_default_call(self): """Test class method execution with default __call__ method.""" request = FunctionRequest( execution_type="class", @@ -97,13 +97,13 @@ def __call__(self): kwargs={}, ) - response = self.executor.execute_class_method(request) + response = await self.executor.execute_class_method(request) assert response.success is True result = cloudpickle.loads(base64.b64decode(response.result)) assert result == "hello" - def test_execute_class_method_not_found(self): + async def test_execute_class_method_not_found(self): """Test execution when method is not found in class.""" request = FunctionRequest( execution_type="class", @@ -121,13 +121,13 @@ def existing_method(self): kwargs={}, ) - response = self.executor.execute_class_method(request) + response = await self.executor.execute_class_method(request) assert response.success is False assert "missing_method" in response.error assert "not found" in response.error - def test_execute_class_method_with_exception(self): + async def test_execute_class_method_with_exception(self): """Test error handling when class method raises exception.""" request = FunctionRequest( execution_type="class", @@ -145,7 +145,7 @@ def error_method(self): kwargs={}, ) - response = self.executor.execute_class_method(request) + response = await self.executor.execute_class_method(request) assert response.success is False assert "Method error" in response.error @@ -165,7 +165,7 @@ def encode_args(self, *args): base64.b64encode(cloudpickle.dumps(arg)).decode("utf-8") for arg in args ] - def test_create_new_instance(self): + async def test_create_new_instance(self): """Test creating a new class instance.""" request = FunctionRequest( execution_type="class", @@ -186,7 +186,7 @@ def increment(self): kwargs={}, ) - response = self.executor.execute_class_method(request) + response = await self.executor.execute_class_method(request) assert response.success is True assert response.instance_id is not None @@ -198,7 +198,7 @@ def increment(self): assert metadata["method_calls"] == 1 assert "created_at" in metadata - def test_reuse_existing_instance(self): + async def test_reuse_existing_instance(self): """Test reusing an existing class instance.""" # Create initial instance initial_request = FunctionRequest( @@ -223,7 +223,7 @@ def get_count(self): kwargs={}, ) - first_response = self.executor.execute_class_method(initial_request) + first_response = await self.executor.execute_class_method(initial_request) instance_id = first_response.instance_id # Reuse the same instance @@ -238,7 +238,7 @@ def get_count(self): kwargs={}, ) - second_response = self.executor.execute_class_method(reuse_request) + second_response = await self.executor.execute_class_method(reuse_request) assert second_response.success is True assert second_response.instance_id == instance_id @@ -247,7 +247,7 @@ def get_count(self): result = cloudpickle.loads(base64.b64decode(second_response.result)) assert result == 1 - def test_instance_metadata_tracking(self): + async def test_instance_metadata_tracking(self): """Test that instance metadata is properly tracked.""" request = FunctionRequest( execution_type="class", @@ -257,7 +257,7 @@ class TestClass: def __init__(self): pass - def test_method(self): + async def test_method(self): return "test" """, method_name="test_method", @@ -266,12 +266,12 @@ def test_method(self): ) # Execute method multiple times - response1 = self.executor.execute_class_method(request) + response1 = await self.executor.execute_class_method(request) instance_id = response1.instance_id request.instance_id = instance_id request.create_new_instance = False - self.executor.execute_class_method(request) + await self.executor.execute_class_method(request) # Check metadata updates metadata = self.executor.instance_metadata[instance_id] @@ -283,14 +283,14 @@ def test_method(self): last_used_time = datetime.fromisoformat(metadata["last_used"]) assert last_used_time >= created_time - def test_generate_instance_id(self): + async def test_generate_instance_id(self): """Test automatic instance ID generation.""" request = FunctionRequest( execution_type="class", class_name="TestClass", class_code=""" class TestClass: - def test_method(self): + async def test_method(self): return "test" """, method_name="test_method", @@ -298,14 +298,14 @@ def test_method(self): kwargs={}, ) - response = self.executor.execute_class_method(request) + response = await self.executor.execute_class_method(request) assert response.success is True assert response.instance_id is not None assert response.instance_id.startswith("TestClass_") assert len(response.instance_id.split("_")[1]) == 8 # UUID hex[:8] - def test_class_not_found_error(self): + async def test_class_not_found_error(self): """Test error when class is not found in provided code.""" request = FunctionRequest( execution_type="class", @@ -320,7 +320,7 @@ def __init__(self): kwargs={}, ) - response = self.executor.execute_class_method(request) + response = await self.executor.execute_class_method(request) assert response.success is False assert "MissingClass" in response.error @@ -340,7 +340,7 @@ def encode_args(self, *args): base64.b64encode(cloudpickle.dumps(arg)).decode("utf-8") for arg in args ] - def test_execute_async_method(self): + async def test_execute_async_method(self): """Test execution of async method.""" request = FunctionRequest( execution_type="class", @@ -359,13 +359,13 @@ async def greet(self, name): kwargs={}, ) - response = self.executor.execute_class_method(request) + response = await self.executor.execute_class_method(request) assert response.success is True result = cloudpickle.loads(base64.b64decode(response.result)) assert result == "Hello, World!" - def test_execute_async_method_with_await(self): + async def test_execute_async_method_with_await(self): """Test async method that uses await.""" request = FunctionRequest( execution_type="class", @@ -387,13 +387,13 @@ async def process(self, item, delay=0.01): kwargs={}, ) - response = self.executor.execute_class_method(request) + response = await self.executor.execute_class_method(request) assert response.success is True result = cloudpickle.loads(base64.b64decode(response.result)) assert result == "Processed: task1" - def test_execute_async_method_returning_dict(self): + async def test_execute_async_method_returning_dict(self): """Test async method returning dict (like GPU worker).""" request = FunctionRequest( execution_type="class", @@ -418,7 +418,7 @@ async def process_batch(self, input_data: dict) -> dict: kwargs={}, ) - response = self.executor.execute_class_method(request) + response = await self.executor.execute_class_method(request) assert response.success is True result = cloudpickle.loads(base64.b64decode(response.result)) @@ -427,7 +427,7 @@ async def process_batch(self, input_data: dict) -> dict: assert result["batch_size"] == 64 assert result["processed_items"] == 640 - def test_async_method_instance_persistence(self): + async def test_async_method_instance_persistence(self): """Test that async methods work with instance persistence.""" # Create instance and call async method initial_request = FunctionRequest( @@ -455,7 +455,7 @@ async def get_count(self): kwargs={}, ) - first_response = self.executor.execute_class_method(initial_request) + first_response = await self.executor.execute_class_method(initial_request) instance_id = first_response.instance_id assert first_response.success is True @@ -474,14 +474,14 @@ async def get_count(self): kwargs={}, ) - second_response = self.executor.execute_class_method(reuse_request) + second_response = await self.executor.execute_class_method(reuse_request) assert second_response.success is True assert second_response.instance_id == instance_id result2 = cloudpickle.loads(base64.b64decode(second_response.result)) assert result2 == 1 # Count should be preserved from first call - def test_mixed_sync_async_methods(self): + async def test_mixed_sync_async_methods(self): """Test class with both sync and async methods.""" # First call sync method sync_request = FunctionRequest( @@ -509,7 +509,7 @@ async def async_get(self): kwargs={}, ) - sync_response = self.executor.execute_class_method(sync_request) + sync_response = await self.executor.execute_class_method(sync_request) instance_id = sync_response.instance_id assert sync_response.success is True @@ -528,7 +528,7 @@ async def async_get(self): kwargs={}, ) - async_response = self.executor.execute_class_method(async_request) + async_response = await self.executor.execute_class_method(async_request) assert async_response.success is True async_result = cloudpickle.loads(base64.b64decode(async_response.result)) diff --git a/tests/unit/test_function_executor.py b/tests/unit/test_function_executor.py index 5e3935d..302e292 100644 --- a/tests/unit/test_function_executor.py +++ b/tests/unit/test_function_executor.py @@ -27,7 +27,7 @@ def encode_kwargs(self, **kwargs): for k, v in kwargs.items() } - def test_execute_simple_function(self): + async def test_execute_simple_function(self): """Test basic function execution.""" request = FunctionRequest( function_name="hello", @@ -36,13 +36,13 @@ def test_execute_simple_function(self): kwargs={}, ) - response = self.executor.execute(request) + response = await self.executor.execute(request) assert response.success is True result = cloudpickle.loads(base64.b64decode(response.result)) assert result == "hello world" - def test_execute_function_with_args(self): + async def test_execute_function_with_args(self): """Test function execution with arguments.""" request = FunctionRequest( function_name="add", @@ -51,13 +51,13 @@ def test_execute_function_with_args(self): kwargs={}, ) - response = self.executor.execute(request) + response = await self.executor.execute(request) assert response.success is True result = cloudpickle.loads(base64.b64decode(response.result)) assert result == 8 - def test_execute_function_with_kwargs(self): + async def test_execute_function_with_kwargs(self): """Test function execution with keyword arguments.""" request = FunctionRequest( function_name="greet", @@ -66,13 +66,13 @@ def test_execute_function_with_kwargs(self): kwargs=self.encode_kwargs(greeting="Hi"), ) - response = self.executor.execute(request) + response = await self.executor.execute(request) assert response.success is True result = cloudpickle.loads(base64.b64decode(response.result)) assert result == "Hi, Alice!" - def test_execute_function_not_found(self): + async def test_execute_function_not_found(self): """Test execution when function is not found in code.""" request = FunctionRequest( function_name="missing_func", @@ -81,13 +81,13 @@ def test_execute_function_not_found(self): kwargs={}, ) - response = self.executor.execute(request) + response = await self.executor.execute(request) assert response.success is False assert "missing_func" in response.result assert "not found" in response.result - def test_execute_function_with_exception(self): + async def test_execute_function_with_exception(self): """Test error handling when function raises exception.""" request = FunctionRequest( function_name="error_func", @@ -96,13 +96,13 @@ def test_execute_function_with_exception(self): kwargs={}, ) - response = self.executor.execute(request) + response = await self.executor.execute(request) assert response.success is False assert "Test error" in response.error assert "ValueError" in response.error - def test_execute_function_with_output_capture(self): + async def test_execute_function_with_output_capture(self): """Test that stdout, stderr, and logs are captured.""" request = FunctionRequest( function_name="output_func", @@ -120,7 +120,7 @@ def output_func(): kwargs={}, ) - response = self.executor.execute(request) + response = await self.executor.execute(request) assert response.success is True result = cloudpickle.loads(base64.b64decode(response.result)) @@ -143,7 +143,7 @@ def encode_args(self, *args): base64.b64encode(cloudpickle.dumps(arg)).decode("utf-8") for arg in args ] - def test_execute_async_function(self): + async def test_execute_async_function(self): """Test execution of async function.""" request = FunctionRequest( function_name="async_hello", @@ -152,13 +152,13 @@ def test_execute_async_function(self): kwargs={}, ) - response = self.executor.execute(request) + response = await self.executor.execute(request) assert response.success is True result = cloudpickle.loads(base64.b64decode(response.result)) assert result == "async hello world" - def test_execute_async_function_with_args(self): + async def test_execute_async_function_with_args(self): """Test async function with arguments.""" request = FunctionRequest( function_name="async_multiply", @@ -167,13 +167,13 @@ def test_execute_async_function_with_args(self): kwargs={}, ) - response = self.executor.execute(request) + response = await self.executor.execute(request) assert response.success is True result = cloudpickle.loads(base64.b64decode(response.result)) assert result == 42 - def test_execute_async_function_with_await(self): + async def test_execute_async_function_with_await(self): """Test async function that uses await.""" request = FunctionRequest( function_name="async_with_await", @@ -188,13 +188,13 @@ async def async_with_await(delay): kwargs={}, ) - response = self.executor.execute(request) + response = await self.executor.execute(request) assert response.success is True result = cloudpickle.loads(base64.b64decode(response.result)) assert result == "slept for 0.01s" - def test_execute_async_function_with_dict_return(self): + async def test_execute_async_function_with_dict_return(self): """Test async function returning dict (like GPU worker).""" request = FunctionRequest( function_name="gpu_matrix_multiply", @@ -211,7 +211,7 @@ async def gpu_matrix_multiply(input_data: dict) -> dict: kwargs={}, ) - response = self.executor.execute(request) + response = await self.executor.execute(request) assert response.success is True result = cloudpickle.loads(base64.b64decode(response.result)) @@ -227,7 +227,7 @@ def setup_method(self): """Setup for each test method.""" self.executor = FunctionExecutor() - def test_execute_function_handles_errors(self): + async def test_execute_function_handles_errors(self): """Test that function execution properly handles errors.""" request = FunctionRequest( function_name="error_func", @@ -236,7 +236,7 @@ def test_execute_function_handles_errors(self): kwargs={}, ) - response = self.executor.execute(request) + response = await self.executor.execute(request) # Verify error was captured assert response.success is False From 9085174981e6f012d8b4b5b1e7b82f4d97d101fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dean=20Qui=C3=B1anola?= Date: Thu, 13 Nov 2025 23:50:09 -0800 Subject: [PATCH 5/6] chore: make lint-fix --- src/class_executor.py | 1 - src/function_executor.py | 1 - 2 files changed, 2 deletions(-) diff --git a/src/class_executor.py b/src/class_executor.py index 76cd275..52ee182 100644 --- a/src/class_executor.py +++ b/src/class_executor.py @@ -2,7 +2,6 @@ import logging import traceback import uuid -import asyncio import inspect from contextlib import redirect_stdout, redirect_stderr from datetime import datetime diff --git a/src/function_executor.py b/src/function_executor.py index 1b70634..e22909b 100644 --- a/src/function_executor.py +++ b/src/function_executor.py @@ -1,7 +1,6 @@ import io import logging import traceback -import asyncio import inspect from contextlib import redirect_stdout, redirect_stderr from typing import Dict, Any From b6db645cafcdb11143440b4e2cada2f981e4db8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dean=20Qui=C3=B1anola?= Date: Fri, 14 Nov 2025 00:07:39 -0800 Subject: [PATCH 6/6] test: fix async test missing await in integration test --- tests/integration/test_handler_integration.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_handler_integration.py b/tests/integration/test_handler_integration.py index 0eca974..012a6d5 100644 --- a/tests/integration/test_handler_integration.py +++ b/tests/integration/test_handler_integration.py @@ -356,7 +356,8 @@ def bad_function(: assert "SyntaxError" in result["error"] or "invalid syntax" in result["error"] @pytest.mark.integration - def test_remote_executor_direct_execution(self): + @pytest.mark.asyncio + async def test_remote_executor_direct_execution(self): """Test RemoteExecutor direct method calls.""" executor = RemoteExecutor() @@ -370,7 +371,7 @@ def direct_test(): kwargs={}, ) - result = executor.function_executor.execute(request) + result = await executor.function_executor.execute(request) assert result.success is True deserialized_result = cloudpickle.loads(base64.b64decode(result.result))