Skip to content

Commit 5699272

Browse files
committed
Update with workflow PR
1 parent 42cb55e commit 5699272

File tree

6 files changed

+183
-156
lines changed

6 files changed

+183
-156
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import io.serverlessworkflow.impl.executors.ForkExecutor.ForkExecutorBuilder;
2828
import io.serverlessworkflow.impl.executors.ListenExecutor.ListenExecutorBuilder;
2929
import io.serverlessworkflow.impl.executors.RaiseExecutor.RaiseExecutorBuilder;
30-
import io.serverlessworkflow.impl.executors.RunExecutor.RunExecutorBuilder;
3130
import io.serverlessworkflow.impl.executors.SetExecutor.SetExecutorBuilder;
3231
import io.serverlessworkflow.impl.executors.SwitchExecutor.SwitchExecutorBuilder;
3332
import io.serverlessworkflow.impl.executors.TryExecutor.TryExecutorBuilder;
@@ -78,7 +77,7 @@ public TaskExecutorBuilder<? extends TaskBase> getTaskExecutor(
7877
} else if (task.getEmitTask() != null) {
7978
return new EmitExecutorBuilder(position, task.getEmitTask(), definition);
8079
} else if (task.getRunTask() != null) {
81-
return new RunExecutorBuilder(position, task.getRunTask(), definition);
80+
return new RunExecutor.RunTaskExecutorBuilder(position, task.getRunTask(), definition);
8281
}
8382
throw new UnsupportedOperationException(task.get().getClass().getName() + " not supported yet");
8483
}
Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,18 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package io.serverlessworkflow.impl.executors;
217

3-
public record ProcessResult(int code, String stdout, String stderr) {
4-
}
18+
public record ProcessResult(int code, String stdout, String stderr) {}

impl/core/src/main/java/io/serverlessworkflow/impl/executors/RegularTaskExecutor.java

Lines changed: 31 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -21,52 +21,51 @@
2121
import io.serverlessworkflow.impl.WorkflowDefinition;
2222
import io.serverlessworkflow.impl.WorkflowModel;
2323
import io.serverlessworkflow.impl.WorkflowMutablePosition;
24-
2524
import java.util.Map;
2625
import java.util.concurrent.CompletableFuture;
2726

2827
public abstract class RegularTaskExecutor<T extends TaskBase> extends AbstractTaskExecutor<T> {
2928

30-
protected TransitionInfo transition;
31-
32-
protected RegularTaskExecutor(RegularTaskExecutorBuilder<T> builder) {
33-
super(builder);
34-
}
29+
protected TransitionInfo transition;
3530

36-
public abstract static class RegularTaskExecutorBuilder<T extends TaskBase>
37-
extends AbstractTaskExecutorBuilder<T, RegularTaskExecutor<T>> {
31+
protected RegularTaskExecutor(RegularTaskExecutorBuilder<T> builder) {
32+
super(builder);
33+
}
3834

39-
TaskExecutor<?> taskExecutor;
40-
private TransitionInfoBuilder transition;
35+
public abstract static class RegularTaskExecutorBuilder<T extends TaskBase>
36+
extends AbstractTaskExecutorBuilder<T, RegularTaskExecutor<T>> {
4137

42-
protected RegularTaskExecutorBuilder(
43-
WorkflowMutablePosition position, T task, WorkflowDefinition definition) {
44-
super(position, task, definition);
45-
}
38+
TaskExecutor<?> taskExecutor;
39+
private TransitionInfoBuilder transition;
4640

47-
public void connect(Map<String, TaskExecutorBuilder<?>> connections) {
48-
this.transition = next(task.getThen(), connections);
49-
}
41+
protected RegularTaskExecutorBuilder(
42+
WorkflowMutablePosition position, T task, WorkflowDefinition definition) {
43+
super(position, task, definition);
44+
}
5045

51-
@Override
52-
protected void buildTransition(RegularTaskExecutor<T> instance) {
53-
instance.transition = TransitionInfo.build(transition);
54-
}
46+
public void connect(Map<String, TaskExecutorBuilder<?>> connections) {
47+
this.transition = next(task.getThen(), connections);
5548
}
5649

5750
@Override
58-
protected TransitionInfo getSkipTransition() {
59-
return transition;
51+
protected void buildTransition(RegularTaskExecutor<T> instance) {
52+
instance.transition = TransitionInfo.build(transition);
6053
}
54+
}
6155

62-
protected CompletableFuture<TaskContext> execute(
63-
WorkflowContext workflow, TaskContext taskContext) {
64-
CompletableFuture<TaskContext> future =
65-
internalExecute(workflow, taskContext)
66-
.thenApply(node -> taskContext.rawOutput(node).transition(transition));
67-
return future;
68-
}
56+
@Override
57+
protected TransitionInfo getSkipTransition() {
58+
return transition;
59+
}
60+
61+
protected CompletableFuture<TaskContext> execute(
62+
WorkflowContext workflow, TaskContext taskContext) {
63+
CompletableFuture<TaskContext> future =
64+
internalExecute(workflow, taskContext)
65+
.thenApply(node -> taskContext.rawOutput(node).transition(transition));
66+
return future;
67+
}
6968

70-
protected abstract CompletableFuture<WorkflowModel> internalExecute(
71-
WorkflowContext workflow, TaskContext task);
69+
protected abstract CompletableFuture<WorkflowModel> internalExecute(
70+
WorkflowContext workflow, TaskContext task);
7271
}
Lines changed: 83 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,108 +1,125 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package io.serverlessworkflow.impl.executors;
217

318
import io.serverlessworkflow.api.types.RunShell;
419
import io.serverlessworkflow.api.types.RunTask;
5-
import io.serverlessworkflow.api.types.RunTaskConfigurationUnion;
20+
import io.serverlessworkflow.api.types.TaskBase;
621
import io.serverlessworkflow.impl.TaskContext;
22+
import io.serverlessworkflow.impl.WorkflowApplication;
723
import io.serverlessworkflow.impl.WorkflowContext;
824
import io.serverlessworkflow.impl.WorkflowDefinition;
925
import io.serverlessworkflow.impl.WorkflowModel;
1026
import io.serverlessworkflow.impl.WorkflowMutablePosition;
1127
import io.serverlessworkflow.impl.WorkflowUtils;
1228
import io.serverlessworkflow.impl.expressions.ExpressionUtils;
13-
1429
import java.io.IOException;
30+
import java.io.OutputStream;
1531
import java.util.Objects;
16-
import java.util.Optional;
1732
import java.util.concurrent.CompletableFuture;
1833

19-
public class RunExecutor extends RegularTaskExecutor<RunTask> {
34+
public abstract class RunExecutor<T extends TaskBase> extends RegularTaskExecutor<T> {
35+
36+
protected RunExecutor(RegularTaskExecutorBuilder<T> builder) {
37+
super(builder);
38+
}
2039

21-
private final TaskExecutor<?> taskExecutor;
40+
public static class RunTaskExecutorBuilder
41+
extends RegularTaskExecutor.RegularTaskExecutorBuilder<RunTask> {
2242

23-
private RunExecutor(RunExecutorBuilder builder) {
24-
super(builder);
25-
this.taskExecutor = builder.taskExecutor;
43+
protected RunTaskExecutorBuilder(
44+
WorkflowMutablePosition position, RunTask task, WorkflowDefinition definition) {
45+
super(position, task, definition);
2646
}
2747

2848
@Override
29-
protected CompletableFuture<WorkflowModel> internalExecute(WorkflowContext workflow, TaskContext taskContext) {
30-
return TaskExecutorHelper.processTaskList(
31-
taskExecutor, workflow, Optional.of(taskContext), taskContext.input());
49+
protected RegularTaskExecutor<RunTask> buildInstance() {
50+
if (task.getRun().getRunShell() != null) {
51+
return new RunShellExecutor(this);
52+
} else {
53+
throw new RuntimeException("Unsupported run task type");
54+
}
3255
}
56+
}
3357

34-
public static class RunExecutorBuilder extends RegularTaskExecutorBuilder<RunTask> {
58+
public static class RunShellExecutor extends RunExecutor<RunTask> {
3559

36-
private TaskExecutor<?> taskExecutor;
60+
private final WorkflowApplication application;
3761

38-
protected RunExecutorBuilder(WorkflowMutablePosition position, RunTask runTask, WorkflowDefinition definition) {
39-
super(position, runTask, definition);
40-
41-
RunTaskConfigurationUnion run = runTask.getRun();
62+
protected RunShellExecutor(RegularTaskExecutorBuilder<RunTask> builder) {
63+
super(builder);
64+
this.application = builder.application;
65+
}
4266

43-
if (run.get() instanceof RunShell runShell) {
44-
// For RunShell, no internal task executor is created yet.
45-
taskExecutor = (workflowContext, parentContext, input) -> {
67+
@Override
68+
protected CompletableFuture<WorkflowModel> internalExecute(
69+
WorkflowContext workflow, TaskContext taskContext) {
4670

47-
String command = ExpressionUtils.isExpr(runShell.getShell().getCommand()) ?
48-
WorkflowUtils.buildStringResolver(definition.application(), runShell.getShell().getCommand(), input.asJavaObject())
49-
.apply(workflowContext, parentContext.get(), input)
50-
: runShell.getShell().getCommand();
71+
if (taskContext.task() != null && taskContext.task() instanceof RunTask runTask) {
72+
RunShell runShell = runTask.getRun().getRunShell();
5173

52-
Objects.requireNonNull(command, "Shell command must be provided in RunShell task");
74+
try {
75+
String command =
76+
ExpressionUtils.isExpr(runShell.getShell().getCommand())
77+
? WorkflowUtils.buildStringResolver(
78+
application,
79+
runShell.getShell().getCommand(),
80+
taskContext.input().asJavaObject())
81+
.apply(workflow, taskContext, taskContext.input())
82+
: runShell.getShell().getCommand();
5383

54-
ProcessBuilder builder = new ProcessBuilder(command.split(" "));
84+
Objects.requireNonNull(command, "Shell command must be provided in RunShell taskContext");
5585

56-
try {
57-
Process process = builder.start();
86+
ProcessBuilder builder = new ProcessBuilder(command.split(" "));
5887

59-
// get output of process
60-
StringBuilder output = new StringBuilder();
61-
StringBuilder error = new StringBuilder();
62-
process.getInputStream().transferTo(new java.io.OutputStream() {
63-
@Override
64-
public void write(int b) {
65-
output.append((char) b);
66-
}
67-
});
88+
Process process = builder.start();
6889

69-
process.getErrorStream().transferTo(new java.io.OutputStream() {
70-
@Override
71-
public void write(int b) {
72-
error.append((char) b);
73-
}
74-
});
90+
// get output of process
91+
StringBuilder output = new StringBuilder();
92+
StringBuilder error = new StringBuilder();
93+
process.getInputStream().transferTo(getOutputStream(output));
94+
process.getErrorStream().transferTo(getOutputStream(error));
7595

76-
boolean isAwait = runTask.getRun().get().isAwait();
77-
if (!isAwait) {
78-
throw new UnsupportedOperationException("Non-await RunShell is not supported yet");
79-
}
96+
boolean isAwait = runTask.getRun().get().isAwait();
97+
if (!isAwait) {
98+
throw new UnsupportedOperationException("Non-await RunShell is not supported yet");
99+
}
80100

81-
int exitCode = process.waitFor();
101+
int exitCode = process.waitFor();
82102

83-
// Return a completed future with the input as output.
84-
// get stdout and stderr in ProcessResult
103+
ProcessResult result =
104+
new ProcessResult(exitCode, output.toString().trim(), error.toString().trim());
85105

86-
return CompletableFuture.completedFuture(new TaskContext(
87-
definition.application().modelFactory().fromAny(new ProcessResult(
88-
exitCode, output.toString().trim(), error.toString()
89-
))
90-
, position, parentContext, "", runTask
91-
));
106+
return CompletableFuture.completedFuture(application.modelFactory().fromAny(result));
92107

93-
} catch (IOException | InterruptedException e) {
94-
throw new RuntimeException(e);
95-
}
96-
};
97-
} else {
98-
throw new UnsupportedOperationException(run.getClass() + " is not supported yet in RunTask");
99-
}
108+
} catch (IOException | InterruptedException e) {
109+
throw new RuntimeException(e);
100110
}
111+
}
112+
113+
throw new RuntimeException("Task must be of type RunTask");
114+
}
101115

116+
private static OutputStream getOutputStream(StringBuilder output) {
117+
return new OutputStream() {
102118
@Override
103-
protected RegularTaskExecutor<RunTask> buildInstance() {
104-
return new RunExecutor(this);
119+
public void write(int b) {
120+
output.append((char) b);
105121
}
122+
};
106123
}
107-
124+
}
108125
}

impl/test/src/test/java/io/serverlessworkflow/impl/test/HTTPWorkflowDefinitionTest.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -92,20 +92,22 @@ private static Stream<Arguments> provideParameters() {
9292
.equals("Star Trek"),
9393
"StartTrek");
9494
return Stream.of(
95-
// Arguments.of("workflows-samples/callGetHttp.yaml", petInput, petCondition),
96-
// Arguments.of(
97-
// "workflows-samples/callGetHttp.yaml",
98-
// Map.of("petId", "-1"),
99-
// new Condition<WorkflowModel>(
100-
// o -> o.asMap().orElseThrow().containsKey("petId"), "notFoundCondition")),
101-
// Arguments.of(
102-
// "workflows-samples/call-http-endpoint-interpolation.yaml", petInput, petCondition),
103-
// Arguments.of(
104-
// "workflows-samples/call-http-query-parameters.yaml", starTrekInput, starTrekCondition),
105-
// Arguments.of(
106-
// "workflows-samples/call-http-query-parameters-external-schema.yaml",
107-
// starTrekInput,
108-
// starTrekCondition),
95+
// Arguments.of("workflows-samples/callGetHttp.yaml", petInput, petCondition),
96+
// Arguments.of(
97+
// "workflows-samples/callGetHttp.yaml",
98+
// Map.of("petId", "-1"),
99+
// new Condition<WorkflowModel>(
100+
// o -> o.asMap().orElseThrow().containsKey("petId"), "notFoundCondition")),
101+
// Arguments.of(
102+
// "workflows-samples/call-http-endpoint-interpolation.yaml", petInput,
103+
// petCondition),
104+
// Arguments.of(
105+
// "workflows-samples/call-http-query-parameters.yaml", starTrekInput,
106+
// starTrekCondition),
107+
// Arguments.of(
108+
// "workflows-samples/call-http-query-parameters-external-schema.yaml",
109+
// starTrekInput,
110+
// starTrekCondition),
109111
Arguments.of(
110112
"workflows-samples/callPostHttp.yaml",
111113
Map.of("name", "Javierito", "surname", "Unknown"),

0 commit comments

Comments
 (0)