Skip to content

Commit d936e0a

Browse files
authored
Merge pull request #2903 from ginjups/ginjups-feature-df-esm-chaining
Durable Function ESM and Chaining Pattern
2 parents 5f69318 + 3fb5d92 commit d936e0a

File tree

9 files changed

+607
-0
lines changed

9 files changed

+607
-0
lines changed
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
# Event-Driven Data Pipeline with Lambda Durable Functions
2+
3+
This serverless pattern demonstrates how to build an event-driven data processing pipeline using AWS Lambda Durable Functions with **direct SQS Event Source Mapping** and Lambda invoke chaining.
4+
5+
## How It Works
6+
7+
This pattern demonstrates an event-driven data processing pipeline using AWS Lambda Durable Functions with direct SQS Event Source Mapping. When a message arrives in the SQS queue, it directly triggers the durable function (no intermediary Lambda needed). The durable function then orchestrates a series of specialized processing steps using Lambda invoke chaining - first validating the incoming data, then transforming it (converting data_source to uppercase), and finally storing the processed results in DynamoDB. Throughout this process, the durable function automatically creates checkpoints, enabling fault-tolerant execution that can recover from failures without losing progress. The entire pipeline operates within the 15-minute ESM execution limit, making it ideal for reliable batch processing workflows.
8+
9+
## Architecture Overview
10+
11+
The pattern showcases two key Durable Functions capabilities:
12+
1. **Direct Event Source Mapping**: SQS directly triggers the durable function (15-minute limit)
13+
2. **Lambda Invoke Chaining**: Orchestrates specialized processing functions
14+
15+
![Architecture Diagram](architecture-diagram.png)
16+
17+
## Key Features
18+
19+
- **Direct ESM Integration**: No intermediary function needed
20+
- **15-minute execution constraint**: Demonstrates ESM time limits
21+
- **Fault-tolerant processing**: Automatic checkpointing and recovery
22+
- **Microservices coordination**: Chains specialized Lambda functions
23+
- **Batch processing**: Handles multiple SQS records per invocation
24+
- **Simple storage**: Uses DynamoDB for processed data
25+
26+
## Important ESM Constraints
27+
28+
⚠️ **15-Minute Execution Limit**: When using Event Source Mapping with Durable Functions, the total execution time cannot exceed 15 minutes. This includes:
29+
- All processing steps
30+
- Function invocations
31+
- No long wait operations
32+
33+
## Use Cases
34+
35+
- ETL pipelines with validation and transformation
36+
- Event-driven microservices orchestration
37+
- Batch processing with fault tolerance
38+
- Data processing workflows requiring checkpointing
39+
40+
## Prerequisites
41+
42+
- [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html) configured with appropriate permissions
43+
- [AWS SAM CLI](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/install-sam-cli.html) latest version installed
44+
- [Python 3.14](https://www.python.org/downloads/release/python-3140/) runtime installed
45+
46+
## Deployment
47+
48+
1. **Build the application**:
49+
```bash
50+
sam build
51+
```
52+
53+
2. **Deploy to AWS**:
54+
```bash
55+
sam deploy --guided
56+
```
57+
58+
Note the outputs after deployment:
59+
- `DataProcessingQueueUrl`: Use this for `<QUEUE_URL>`
60+
- `ProcessedDataTable`: Use this for `<PROCESSED_DATA_TABLE>`
61+
62+
3. **Test the pipeline**:
63+
```bash
64+
# Send a test message to SQS
65+
aws sqs send-message \
66+
--queue-url <QUEUE_URL> \
67+
--message-body '{"data_source": "test.csv", "processing_type": "standard"}'
68+
--region <REPLACE_REGION>
69+
```
70+
71+
4. **Verify successful processing**:
72+
```bash
73+
# Check if data was processed and stored in DynamoDB
74+
aws dynamodb scan --table-name <PROCESSED_DATA_TABLE> --query 'Items[*]' --region <REPLACE_REGION>
75+
```
76+
77+
**Success indicators:**
78+
- You should see at least one item in the DynamoDB table
79+
- Original input data: `"data_source": "test.csv"`
80+
- Transformed data: `"data_source": "TEST.CSV"` (uppercase transformation applied)
81+
- Execution tracking with unique `execution_id`
82+
- Timestamps showing when data was processed and stored
83+
84+
This confirms the entire pipeline worked: SQS → Durable Function → Validation → Transformation → Storage → DynamoDB
85+
86+
## Components
87+
88+
### 1. Durable Pipeline Function (`src/durable_pipeline/`)
89+
- **Direct SQS Event Source Mapping**: Receives SQS events directly
90+
- **15-minute execution limit**: Must complete all processing within ESM constraints
91+
- **Batch processing**: Handles multiple SQS records per invocation
92+
- **Lambda invoke chaining**: Orchestrates validation, transformation, and storage
93+
- **Automatic checkpointing**: Recovers from failures without losing progress
94+
95+
### 2. Specialized Processing Functions
96+
- **Validation Function**: Simple data validation checks
97+
- **Transformation Function**: Basic data transformation
98+
- **Storage Function**: Persists processed data to DynamoDB
99+
100+
## Monitoring
101+
102+
- CloudWatch Logs for execution tracking
103+
- DynamoDB table for processed data
104+
- SQS DLQ for failed messages
105+
106+
## Configuration
107+
108+
Key environment variables:
109+
- `ENVIRONMENT`: Deployment environment (dev/prod)
110+
- `PROCESSED_DATA_TABLE`: DynamoDB table for processed data
111+
- `VALIDATION_FUNCTION_ARN`: ARN of validation function
112+
- `TRANSFORMATION_FUNCTION_ARN`: ARN of transformation function
113+
- `STORAGE_FUNCTION_ARN`: ARN of storage function
114+
115+
## ESM-Specific Considerations
116+
117+
- **Execution Timeout**: Set to 900 seconds (15 minutes) maximum
118+
- **Batch Size**: Configured for optimal processing (5 records)
119+
- **Error Handling**: Uses SQS DLQ for failed batches
120+
- **Efficient Processing**: Optimized for speed to stay within time limits
121+
122+
## Error Handling
123+
124+
- Automatic retries with exponential backoff
125+
- Dead Letter Queue for failed messages
126+
- Partial batch failure support
127+
- Checkpoint-based recovery
128+
129+
## Cost Optimization
130+
131+
- Pay only for active compute time
132+
- Efficient batch processing
133+
- Automatic scaling based on queue depth
134+
135+
## Cleanup
136+
137+
```bash
138+
sam delete
139+
```
140+
141+
## Learn More
142+
143+
- [AWS Lambda Durable Functions Documentation](https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html)
144+
- [Event Source Mappings with Durable Functions](https://docs.aws.amazon.com/lambda/latest/dg/durable-invoking-esm.html)
145+
- [Lambda Invoke Chaining](https://docs.aws.amazon.com/lambda/latest/dg/durable-examples.html#durable-examples-chained-invocations)
44.7 KB
Loading
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
{
2+
"title": "Event-Driven Data Pipeline with Lambda Durable Functions",
3+
"description": "This serverless pattern demonstrates building an event-driven pipeline using AWS Lambda Durable Functions with direct SQS Event Source Mapping",
4+
"language": "Python",
5+
"level": "200",
6+
"framework": "AWS SAM",
7+
"services": [
8+
"sqs",
9+
"lambda",
10+
"dynamoDB"
11+
],
12+
"introBox": {
13+
"headline": "How it works",
14+
"text": [
15+
"This pattern demonstrates an event-driven data processing pipeline using AWS Lambda Durable Functions with direct SQS Event Source Mapping. When a message arrives in the SQS queue, it directly triggers the durable function (no intermediary Lambda needed). The durable function then orchestrates a series of specialized processing steps using Lambda invoke chaining, first validating the incoming data, then transforming it , and finally storing the processed results in DynamoDB. Throughout this process, the durable function automatically creates checkpoints, enabling fault-tolerant execution that can recover from failures without losing progress. The entire pipeline operates within the 15-minute ESM execution limit, making it ideal for reliable batch processing workflows."
16+
]
17+
},
18+
"testing": {
19+
"headline": "Testing",
20+
"text": [
21+
"See the GitHub repo for detailed testing instructions."
22+
]
23+
},
24+
"cleanup": {
25+
"headline": "Cleanup",
26+
"text": [
27+
"Delete the stack: <code>sam delete</code>."
28+
]
29+
},
30+
"deploy": {
31+
"text": [
32+
"sam build",
33+
"sam deploy --guided"
34+
]
35+
},
36+
"gitHub": {
37+
"template": {
38+
"repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/lambda-durable-esm-and-chaining",
39+
"templateURL": "serverles-patterns/lambda-durable-esm-and-chaining",
40+
"templateFile": "template.yaml",
41+
"projectFolder": "lambda-durable-esm-and-chaining"
42+
}
43+
},
44+
"resources": {
45+
"headline": "Additional resources",
46+
"bullets": [
47+
{
48+
"text": "AWS Lambda Durable Functions Documentation",
49+
"link": "https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html"
50+
},
51+
{
52+
"text": "Event Source Mappings with Durable Functions",
53+
"link": "https://docs.aws.amazon.com/lambda/latest/dg/durable-invoking-esm.html"
54+
},
55+
{
56+
"text": "Durbale Function Lambda Invoke Chaining",
57+
"link": "https://docs.aws.amazon.com/lambda/latest/dg/durable-examples.html#durable-examples-chained-invocations"
58+
}
59+
]
60+
},
61+
"authors": [
62+
{
63+
"name": "Sahithi Ginjupalli",
64+
"image": "https://drive.google.com/file/d/1YcKYuGz3LfzSxiwb2lWJfpyi49SbvOSr/view?usp=sharing",
65+
"bio": "Cloud Engineer at AWS with a passion for diving deep into cloud and AI services to build innovative serverless applications.",
66+
"linkedin": "ginjupalli-sahithi-37460a18b",
67+
"twitter": ""
68+
}
69+
],
70+
"patternArch": {
71+
"icon1": {
72+
"x": 20,
73+
"y": 50,
74+
"service": "sqs",
75+
"label": "SQS Queue"
76+
},
77+
"icon2": {
78+
"x": 60,
79+
"y": 50,
80+
"service": "lambda",
81+
"label": "Lambda Durable Function"
82+
},
83+
"line1": {
84+
"from": "icon1",
85+
"to": "icon2",
86+
"label": "Event Source Mapping"
87+
},
88+
"icon3": {
89+
"x": 90,
90+
"y": 10,
91+
"service": "lambda",
92+
"label": ""
93+
},
94+
"icon4": {
95+
"x": 90,
96+
"y": 50,
97+
"service": "lambda",
98+
"label": ""
99+
},
100+
"icon5": {
101+
"x": 90,
102+
"y": 90,
103+
"service": "lambda",
104+
"label": ""
105+
},
106+
"line2": {
107+
"from": "icon2",
108+
"to": "icon3",
109+
"label": ""
110+
},
111+
"line3": {
112+
"from": "icon2",
113+
"to": "icon4",
114+
"label": "Chained Invocation"
115+
},
116+
"line4": {
117+
"from": "icon2",
118+
"to": "icon5",
119+
"label": ""
120+
}
121+
}
122+
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import json
2+
import os
3+
import boto3
4+
from datetime import datetime
5+
from typing import Dict, Any, List
6+
from aws_durable_execution_sdk_python import DurableContext, durable_execution
7+
8+
# Initialize AWS clients
9+
dynamodb = boto3.resource('dynamodb')
10+
lambda_client = boto3.client('lambda')
11+
12+
@durable_execution
13+
def lambda_handler(event: Dict[str, Any], context: DurableContext) -> Dict[str, Any]:
14+
"""
15+
Main durable pipeline function that processes SQS events directly via ESM.
16+
Demonstrates lambda invoke chaining with checkpointing and recovery.
17+
Limited to 15 minutes total execution time due to ESM constraints.
18+
"""
19+
20+
# Extract configuration from environment
21+
validation_function_arn = os.environ['VALIDATION_FUNCTION_ARN']
22+
transformation_function_arn = os.environ['TRANSFORMATION_FUNCTION_ARN']
23+
storage_function_arn = os.environ['STORAGE_FUNCTION_ARN']
24+
processed_data_table = os.environ['PROCESSED_DATA_TABLE']
25+
environment = os.environ.get('ENVIRONMENT', 'dev')
26+
27+
print(f"Processing SQS batch with {len(event.get('Records', []))} records")
28+
29+
# Process each SQS record in the batch
30+
batch_results = []
31+
32+
for record in event.get('Records', []):
33+
try:
34+
# Extract data from SQS record
35+
message_id = record['messageId']
36+
data = json.loads(record['body'])
37+
execution_name = f"{environment}-esm-{message_id}"
38+
39+
print(f"Processing record: {message_id}")
40+
41+
# Step 1: Validate data by invoking validation function
42+
validation_result = context.invoke(
43+
validation_function_arn,
44+
{'data': data, 'execution_id': execution_name},
45+
name=f'validate-data-{message_id}'
46+
)
47+
48+
if not validation_result.get('is_valid', False):
49+
batch_results.append({
50+
'message_id': message_id,
51+
'status': 'failed',
52+
'reason': 'validation_failed'
53+
})
54+
continue
55+
56+
# Step 2: Transform data by invoking transformation function
57+
transformation_result = context.invoke(
58+
transformation_function_arn,
59+
{'data': data, 'execution_id': execution_name},
60+
name=f'transform-data-{message_id}'
61+
)
62+
63+
# Step 3: Store processed data by invoking storage function
64+
storage_result = context.invoke(
65+
storage_function_arn,
66+
{
67+
'transformed_data': transformation_result,
68+
'execution_id': execution_name,
69+
'original_data': data
70+
},
71+
name=f'store-data-{message_id}'
72+
)
73+
74+
batch_results.append({
75+
'message_id': message_id,
76+
'status': 'completed',
77+
'execution_id': execution_name
78+
})
79+
80+
except Exception as e:
81+
print(f"Error processing record {record.get('messageId', 'unknown')}: {str(e)}")
82+
batch_results.append({
83+
'message_id': record.get('messageId', 'unknown'),
84+
'status': 'error',
85+
'error': str(e)
86+
})
87+
88+
# Return batch processing summary
89+
successful_records = len([r for r in batch_results if r['status'] == 'completed'])
90+
failed_records = len([r for r in batch_results if r['status'] in ['failed', 'error']])
91+
92+
return {
93+
'batch_summary': {
94+
'total_records': len(batch_results),
95+
'successful_records': successful_records,
96+
'failed_records': failed_records
97+
},
98+
'record_results': batch_results,
99+
'processed_at': datetime.utcnow().isoformat()
100+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
aws-durable-execution-sdk-python

0 commit comments

Comments
 (0)