Skip to content

Commit 0b271ba

Browse files
committed
Merge branch 'main' into sdk_consolidation
2 parents f91c412 + 5be5a12 commit 0b271ba

File tree

29 files changed

+2392
-9
lines changed

29 files changed

+2392
-9
lines changed

conductor-client/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.commons.lang3.StringUtils;
2121

2222
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
23+
import com.netflix.conductor.common.run.tasks.TypedTask;
2324

2425
import lombok.*;
2526

@@ -334,4 +335,35 @@ public boolean equals(Object o) {
334335
public int hashCode() {
335336
return Objects.hash(getTaskType(), getStatus(), getInputData(), getReferenceTaskName(), getWorkflowPriority(), getRetryCount(), getSeq(), getCorrelationId(), getPollCount(), getTaskDefName(), getScheduledTime(), getStartTime(), getEndTime(), getUpdateTime(), getStartDelayInSeconds(), getRetriedTaskId(), isRetried(), isExecuted(), isCallbackFromWorker(), getResponseTimeoutSeconds(), getWorkflowInstanceId(), getWorkflowType(), getTaskId(), getReasonForIncompletion(), getCallbackAfterSeconds(), getWorkerId(), getOutputData(), getWorkflowTask(), getDomain(), getRateLimitPerFrequency(), getRateLimitFrequencyInSeconds(), getExternalInputPayloadStoragePath(), getExternalOutputPayloadStoragePath(), getIsolationGroupId(), getExecutionNameSpace(), getParentTaskId(), getFirstStartTime());
336337
}
337-
}
338+
339+
/**
340+
* Converts this task to a typed task wrapper.
341+
*
342+
* <p>Example usage:
343+
* <pre>{@code
344+
* WaitTask wait = task.as(WaitTask.class);
345+
* if (wait.isDurationBased()) {
346+
* Duration d = wait.getDuration().orElseThrow();
347+
* }
348+
* }</pre>
349+
*
350+
* @param type the typed task class to convert to
351+
* @param <T> the typed task type
352+
* @return a new instance of the typed task wrapping this task
353+
* @throws IllegalArgumentException if this task's type doesn't match the expected type
354+
* @throws RuntimeException if the typed task class cannot be instantiated
355+
*/
356+
public <T extends TypedTask> T as(Class<T> type) {
357+
try {
358+
return type.getConstructor(Task.class).newInstance(this);
359+
} catch (java.lang.reflect.InvocationTargetException e) {
360+
Throwable cause = e.getCause();
361+
if (cause instanceof IllegalArgumentException) {
362+
throw (IllegalArgumentException) cause;
363+
}
364+
throw new RuntimeException("Failed to create typed task: " + type.getName(), cause);
365+
} catch (Exception e) {
366+
throw new RuntimeException("Failed to create typed task: " + type.getName(), e);
367+
}
368+
}
369+
}

conductor-client/src/main/java/com/netflix/conductor/common/run/Workflow.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.netflix.conductor.common.metadata.Auditable;
2121
import com.netflix.conductor.common.metadata.tasks.Task;
2222
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
23+
import com.netflix.conductor.common.run.tasks.TypedTask;
2324

2425
import lombok.*;
2526

@@ -217,6 +218,31 @@ public Task getTaskByRefName(String refName) {
217218
return found.getLast();
218219
}
219220

221+
/**
222+
* Returns a typed task by its reference name.
223+
*
224+
* <p>Example usage:
225+
* <pre>{@code
226+
* WaitTask wait = workflow.getTaskByRefName("myWait", WaitTask.class);
227+
* if (wait.isDurationBased()) {
228+
* Duration d = wait.getDuration().orElseThrow();
229+
* }
230+
* }</pre>
231+
*
232+
* @param refName the reference task name
233+
* @param type the typed task class to convert to
234+
* @param <T> the typed task type
235+
* @return the typed task, or null if no task with the given reference name exists
236+
* @throws IllegalArgumentException if the task's type doesn't match the expected type
237+
*/
238+
public <T extends TypedTask> T getTaskByRefName(String refName, Class<T> type) {
239+
Task task = getTaskByRefName(refName);
240+
if (task == null) {
241+
return null;
242+
}
243+
return task.as(type);
244+
}
245+
220246
/**
221247
* @return a deep copy of the workflow instance
222248
*/
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright 2024 Conductor Authors.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
package com.netflix.conductor.common.run.tasks;
14+
15+
import com.netflix.conductor.common.metadata.tasks.Task;
16+
import com.netflix.conductor.common.metadata.tasks.TaskType;
17+
18+
/**
19+
* Typed wrapper for DO_WHILE tasks providing convenient access to loop properties.
20+
*
21+
* <p>DO_WHILE tasks execute a set of tasks repeatedly until a condition is met.
22+
*
23+
* <p>Example usage:
24+
* <pre>{@code
25+
* Task task = workflow.getTaskByRefName("myLoop");
26+
* DoWhileTask doWhile = new DoWhileTask(task);
27+
*
28+
* int iteration = doWhile.getCurrentIteration();
29+
* String condition = doWhile.getLoopCondition();
30+
* }</pre>
31+
*/
32+
public class DoWhileTask extends TypedTask {
33+
34+
public static final String LOOP_CONDITION_INPUT = "loopCondition";
35+
public static final String ITERATION_OUTPUT = "iteration";
36+
37+
/**
38+
* Creates a DoWhileTask wrapper.
39+
*
40+
* @param task the underlying task to wrap
41+
* @throws IllegalArgumentException if task is null or not a DO_WHILE task
42+
*/
43+
public DoWhileTask(Task task) {
44+
super(task, TaskType.TASK_TYPE_DO_WHILE);
45+
}
46+
47+
/**
48+
* Checks if the given task is a DO_WHILE task.
49+
*/
50+
public static boolean isDoWhileTask(Task task) {
51+
return task != null && TaskType.TASK_TYPE_DO_WHILE.equals(task.getTaskType());
52+
}
53+
54+
/**
55+
* Returns the loop condition expression, or null if not set.
56+
*/
57+
public String getLoopCondition() {
58+
return getInputString(LOOP_CONDITION_INPUT);
59+
}
60+
61+
/**
62+
* Returns the current iteration number (1-based from the task's iteration field).
63+
*/
64+
public int getCurrentIteration() {
65+
return task.getIteration();
66+
}
67+
68+
/**
69+
* Returns the iteration count from output (may differ from current iteration), or null if not available.
70+
*/
71+
public Integer getIterationCount() {
72+
return getOutputInteger(ITERATION_OUTPUT);
73+
}
74+
75+
/**
76+
* Returns true if this is the first iteration.
77+
*/
78+
public boolean isFirstIteration() {
79+
return getCurrentIteration() <= 1;
80+
}
81+
82+
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright 2024 Conductor Authors.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
package com.netflix.conductor.common.run.tasks;
14+
15+
import java.util.Collections;
16+
import java.util.HashMap;
17+
import java.util.Map;
18+
19+
import com.netflix.conductor.common.metadata.tasks.Task;
20+
import com.netflix.conductor.common.metadata.tasks.TaskType;
21+
22+
/**
23+
* Typed wrapper for EVENT tasks providing convenient access to event properties.
24+
*
25+
* <p>EVENT tasks publish messages to external event systems like SQS, Kafka, NATS, etc.
26+
*
27+
* <p>Example usage:
28+
* <pre>{@code
29+
* Task task = workflow.getTaskByRefName("myEvent");
30+
* EventTask event = new EventTask(task);
31+
*
32+
* String sink = event.getSink();
33+
* Map<String, Object> payload = event.getEventPayload();
34+
* }</pre>
35+
*/
36+
public class EventTask extends TypedTask {
37+
38+
public static final String SINK_INPUT = "sink";
39+
public static final String EVENT_PRODUCED_OUTPUT = "event_produced";
40+
41+
/**
42+
* Creates an EventTask wrapper.
43+
*
44+
* @param task the underlying task to wrap
45+
* @throws IllegalArgumentException if task is null or not an EVENT task
46+
*/
47+
public EventTask(Task task) {
48+
super(task, TaskType.TASK_TYPE_EVENT);
49+
}
50+
51+
/**
52+
* Checks if the given task is an EVENT task.
53+
*/
54+
public static boolean isEventTask(Task task) {
55+
return task != null && TaskType.TASK_TYPE_EVENT.equals(task.getTaskType());
56+
}
57+
58+
/**
59+
* Returns the event sink (e.g., "sqs:queue-name", "kafka:topic-name"), or null if not set.
60+
*/
61+
public String getSink() {
62+
return getInputString(SINK_INPUT);
63+
}
64+
65+
/**
66+
* Returns the sink type (e.g., "sqs", "kafka", "nats"), or null if sink is not set.
67+
*/
68+
public String getSinkType() {
69+
String sink = getSink();
70+
if (sink == null) {
71+
return null;
72+
}
73+
int colonIndex = sink.indexOf(':');
74+
return colonIndex > 0 ? sink.substring(0, colonIndex) : sink;
75+
}
76+
77+
/**
78+
* Returns the sink target (e.g., queue name, topic name), or null/empty if sink is not set.
79+
*/
80+
public String getSinkTarget() {
81+
String sink = getSink();
82+
if (sink == null) {
83+
return null;
84+
}
85+
int colonIndex = sink.indexOf(':');
86+
return colonIndex > 0 ? sink.substring(colonIndex + 1) : "";
87+
}
88+
89+
/**
90+
* Returns the event payload (all input data except sink).
91+
*/
92+
public Map<String, Object> getEventPayload() {
93+
Map<String, Object> inputData = task.getInputData();
94+
if (inputData == null || inputData.isEmpty()) {
95+
return Collections.emptyMap();
96+
}
97+
// The payload is typically the entire input except for the sink
98+
HashMap<String, Object> payload = new HashMap<>(inputData);
99+
payload.remove(SINK_INPUT);
100+
return payload;
101+
}
102+
103+
/**
104+
* Returns the produced event details (available after the event is sent).
105+
*/
106+
@SuppressWarnings("unchecked")
107+
public Map<String, Object> getEventProduced() {
108+
Object produced = task.getOutputData().get(EVENT_PRODUCED_OUTPUT);
109+
if (produced instanceof Map) {
110+
return (Map<String, Object>) produced;
111+
}
112+
return Collections.emptyMap();
113+
}
114+
115+
/**
116+
* Returns true if the event has been published.
117+
*/
118+
public boolean isEventPublished() {
119+
return !getEventProduced().isEmpty();
120+
}
121+
}

0 commit comments

Comments
 (0)