-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworkflow_ops.rb
More file actions
305 lines (246 loc) · 10.3 KB
/
workflow_ops.rb
File metadata and controls
305 lines (246 loc) · 10.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
#!/usr/bin/env ruby
# frozen_string_literal: true
# Workflow Operations Example
# ============================
#
# Demonstrates various workflow lifecycle operations and control mechanisms.
#
# What it does:
# -------------
# - Start workflow: Create and execute a new workflow instance
# - Pause workflow: Temporarily halt workflow execution
# - Resume workflow: Continue paused workflow
# - Terminate workflow: Force stop a running workflow
# - Restart workflow: Restart from a specific task
# - Rerun workflow: Re-execute from beginning with same/different inputs
# - Update task: Manually update task status and output
# - Search workflows: Find workflows by correlation ID or query
#
# Use Cases:
# ----------
# - Workflow lifecycle management (start, pause, resume, terminate)
# - Manual intervention in workflow execution
# - Debugging and testing workflows
# - Implementing human-in-the-loop patterns
# - External event handling via task updates
# - Recovery from failures (restart, rerun)
#
# Key Operations:
# ---------------
# - start_workflow(): Launch new workflow instance
# - pause_workflow(): Halt at current task
# - resume_workflow(): Continue from pause
# - terminate_workflow(): Force stop with reason
# - restart_workflow(): Resume from failed task
# - rerun_workflow(): Start fresh with new/same inputs
# - update_task(): Manually complete tasks
#
# Usage:
# bundle exec ruby examples/workflow_ops.rb
require 'securerandom'
require_relative '../lib/conductor'
# Include workflow DSL
include Conductor::Workflow
def start_demo_workflow(workflow_executor, workflow_client)
# Create a workflow with wait tasks for demonstrating operations
workflow = ConductorWorkflow.new(workflow_client, 'workflow_ops_demo', version: 1, executor: workflow_executor)
# Wait for 2 seconds
wait_for_two_sec = WaitTask.new('wait_for_2_sec', wait_for_seconds: 2)
# Wait for external signal (no timeout - waits indefinitely until updated)
wait_for_signal = WaitTask.new('wait_for_signal')
# HTTP call
http_call = HttpTask.new('call_remote_api', {
'uri' => 'https://orkes-api-tester.orkesconductor.com/api'
})
# Build workflow
workflow >> wait_for_two_sec >> wait_for_signal >> http_call
# Register the workflow
workflow_executor.register_workflow(workflow, overwrite: true)
# Start workflow with a correlation ID
request = Conductor::Http::Models::StartWorkflowRequest.new(
name: 'workflow_ops_demo',
version: 1,
input: {},
correlation_id: 'correlation_123'
)
workflow_executor.start_workflow(request)
end
def main
# Configuration from environment variables
config = Conductor::Configuration.new
puts '=' * 70
puts 'Conductor Ruby SDK - Workflow Operations Example'
puts '=' * 70
puts
puts "Server: #{config.server_url}"
puts
# Create clients
clients = Conductor::Orkes::OrkesClients.new(config)
workflow_client = clients.get_workflow_client
task_client = clients.get_task_client
workflow_executor = clients.get_workflow_executor
# ============================================================================
# START WORKFLOW
# ============================================================================
workflow_id = start_demo_workflow(workflow_executor, workflow_client)
puts "Started workflow with ID: #{workflow_id}"
puts "Monitor at: #{config.ui_host}/execution/#{workflow_id}"
puts
# ============================================================================
# GET WORKFLOW STATUS
# ============================================================================
workflow = workflow_client.get_workflow(workflow_id, include_tasks: true)
last_task = workflow['tasks'].last
puts "Workflow status: #{workflow['status']}"
puts "Currently running task: #{last_task['referenceTaskName']}"
puts
# ============================================================================
# WAIT FOR TIMED WAIT TO COMPLETE
# ============================================================================
puts 'Waiting 3 seconds for the timed wait task to complete...'
sleep 3
workflow = workflow_client.get_workflow(workflow_id, include_tasks: true)
last_task = workflow['tasks'].last
puts "Workflow status: #{workflow['status']}"
puts "Currently running task: #{last_task['referenceTaskName']}"
puts
# ============================================================================
# TERMINATE WORKFLOW
# ============================================================================
puts 'Terminating workflow...'
workflow_client.terminate_workflow(workflow_id, reason: 'testing termination')
workflow = workflow_client.get_workflow(workflow_id, include_tasks: true)
last_task = workflow['tasks'].last
puts "Workflow status: #{workflow['status']}"
puts "Last task status: #{last_task['status']}"
puts
# ============================================================================
# RETRY WORKFLOW
# ============================================================================
puts 'Retrying workflow...'
workflow_client.retry_workflow(workflow_id)
workflow = workflow_client.get_workflow(workflow_id, include_tasks: true)
last_task = workflow['tasks'].last
puts "Workflow status: #{workflow['status']}"
puts "Last task: #{last_task['referenceTaskName']} (status: #{last_task['status']})"
puts
# ============================================================================
# MANUALLY COMPLETE WAIT TASK
# ============================================================================
puts 'Manually completing the wait_for_signal task...'
# Create task result to complete the WAIT task
task_result = Conductor::Http::Models::TaskResult.new(
workflow_instance_id: workflow_id,
task_id: last_task['taskId'],
status: 'COMPLETED',
output_data: { 'greetings' => 'hello from Conductor Ruby SDK' }
)
task_client.update_task(task_result)
workflow = workflow_client.get_workflow(workflow_id, include_tasks: true)
last_task = workflow['tasks'].last
puts "Workflow status: #{workflow['status']}"
puts "Last task: #{last_task['referenceTaskName']} (status: #{last_task['status']})"
# Wait for HTTP task to complete
sleep 2
# ============================================================================
# RERUN WORKFLOW
# ============================================================================
puts
puts 'Re-running workflow from the second task...'
# Get the workflow again to find the second task
workflow = workflow_client.get_workflow(workflow_id, include_tasks: true)
if workflow['tasks'].length > 1
second_task_id = workflow['tasks'][1]['taskId']
rerun_request = {
're_run_from_task_id' => second_task_id
}
workflow_client.rerun_workflow(workflow_id, rerun_request)
workflow = workflow_client.get_workflow(workflow_id, include_tasks: true)
puts "Workflow status after rerun: #{workflow['status']}"
end
# ============================================================================
# RESTART WORKFLOW
# ============================================================================
puts
puts 'Terminating and restarting workflow...'
workflow_client.terminate_workflow(workflow_id, reason: 'terminating so we can restart')
workflow_client.restart_workflow(workflow_id)
workflow = workflow_client.get_workflow(workflow_id, include_tasks: true)
puts "Workflow status after restart: #{workflow['status']}"
# ============================================================================
# PAUSE AND RESUME WORKFLOW
# ============================================================================
puts
puts 'Pausing workflow...'
workflow_client.pause_workflow(workflow_id)
workflow = workflow_client.get_workflow(workflow_id, include_tasks: true)
puts "Workflow status: #{workflow['status']}"
puts 'Waiting 3 seconds while paused...'
sleep 3
workflow = workflow_client.get_workflow(workflow_id, include_tasks: true)
# While paused, wait task may complete but no new tasks scheduled
wait_task = workflow['tasks'].first
puts "Wait task status: #{wait_task['status']}"
puts "Number of tasks: #{workflow['tasks'].length} (should be limited while paused)"
puts
puts 'Resuming workflow...'
workflow_client.resume_workflow(workflow_id)
sleep 1
workflow = workflow_client.get_workflow(workflow_id, include_tasks: true)
puts "Workflow status after resume: #{workflow['status']}"
puts "Number of tasks after resume: #{workflow['tasks'].length}"
# ============================================================================
# SEARCH WORKFLOWS
# ============================================================================
puts
puts 'Searching for workflows with correlation_id "correlation_123"...'
search_results = workflow_client.search(
start: 0,
size: 100,
free_text: '*',
query: 'correlationId = "correlation_123"'
)
puts "Found #{search_results['results']&.length || 0} workflow(s) with correlation_id 'correlation_123'"
# Search for a random correlation ID (should find nothing)
random_correlation_id = SecureRandom.uuid
search_results = workflow_client.search(
start: 0,
size: 100,
free_text: '*',
query: "status IN (RUNNING) AND correlationId = \"#{random_correlation_id}\""
)
puts "Found #{search_results['results']&.length || 0} workflow(s) with random correlation_id (expected: 0)"
# ============================================================================
# CLEANUP
# ============================================================================
puts
puts 'Terminating workflow for cleanup...'
workflow_client.terminate_workflow(workflow_id, reason: 'cleanup after demo')
puts
puts '-' * 70
puts 'Workflow Operations Demo Complete!'
puts '-' * 70
puts
puts 'Operations demonstrated:'
puts ' - Start workflow'
puts ' - Get workflow status'
puts ' - Terminate workflow'
puts ' - Retry workflow'
puts ' - Update task manually'
puts ' - Rerun workflow from task'
puts ' - Restart workflow'
puts ' - Pause workflow'
puts ' - Resume workflow'
puts ' - Search workflows'
end
if __FILE__ == $PROGRAM_NAME
begin
main
rescue Conductor::ApiError => e
puts "API Error: #{e.message}"
puts e.backtrace.first(5).join("\n")
rescue StandardError => e
puts "Error: #{e.message}"
puts e.backtrace.first(5).join("\n")
end
end