2222import io .serverlessworkflow .impl .WorkflowApplication ;
2323import io .serverlessworkflow .impl .WorkflowContext ;
2424import io .serverlessworkflow .impl .WorkflowDefinition ;
25+ import io .serverlessworkflow .impl .WorkflowError ;
26+ import io .serverlessworkflow .impl .WorkflowException ;
2527import io .serverlessworkflow .impl .WorkflowModel ;
2628import io .serverlessworkflow .impl .WorkflowModelFactory ;
2729import io .serverlessworkflow .impl .WorkflowUtils ;
2830import io .serverlessworkflow .impl .expressions .ExpressionUtils ;
31+ import java .io .BufferedReader ;
2932import java .io .IOException ;
30- import java .io .OutputStream ;
33+ import java .io .InputStream ;
34+ import java .io .InputStreamReader ;
35+ import java .io .StringWriter ;
36+ import java .nio .charset .StandardCharsets ;
3137import java .util .Map ;
3238import java .util .concurrent .CompletableFuture ;
3339
@@ -39,19 +45,20 @@ public class RunShellExecutor implements RunnableTask<RunShell> {
3945 @ FunctionalInterface
4046 private interface ShellResultSupplier {
4147 WorkflowModel apply (
42- TaskContext taskContext , ProcessBuilder processBuilder , WorkflowModel input );
48+ TaskContext taskContext , WorkflowModel input , ProcessBuilder processBuilder );
4349 }
4450
51+ @ FunctionalInterface
4552 private interface ProcessBuilderSupplier {
46- ProcessBuilder apply (TaskContext taskContext , WorkflowContext workflowContext );
53+ ProcessBuilder apply (WorkflowContext workflowContext , TaskContext taskContext );
4754 }
4855
4956 @ Override
5057 public CompletableFuture <WorkflowModel > apply (
5158 WorkflowContext workflowContext , TaskContext taskContext , WorkflowModel input ) {
52- ProcessBuilder processBuilder = this .processBuilderSupplier .apply (taskContext , workflowContext );
59+ ProcessBuilder processBuilder = this .processBuilderSupplier .apply (workflowContext , taskContext );
5360 WorkflowModel workflowModel =
54- this .shellResultSupplier .apply (taskContext , processBuilder , input );
61+ this .shellResultSupplier .apply (taskContext , input , processBuilder );
5562 return CompletableFuture .completedFuture (workflowModel );
5663 }
5764
@@ -60,8 +67,11 @@ public void init(RunShell taskConfiguration, WorkflowDefinition definition) {
6067 Shell shell = taskConfiguration .getShell ();
6168 final String shellCommand = shell .getCommand ();
6269
70+ if (shellCommand == null || shellCommand .isBlank ()) {
71+ throw new IllegalStateException ("Missing shell command in RunShell task configuration" );
72+ }
6373 this .processBuilderSupplier =
64- (taskContext , workflowContext ) -> {
74+ (workflowContext , taskContext ) -> {
6575 WorkflowApplication application = definition .application ();
6676
6777 String command =
@@ -94,49 +104,50 @@ public void init(RunShell taskConfiguration, WorkflowDefinition definition) {
94104 };
95105
96106 this .shellResultSupplier =
97- (taskContext , processBuilder , input ) -> {
98- if (shellCommand == null || shellCommand .isBlank ()) {
99- throw new IllegalStateException (
100- "Missing shell command in RunShell task: " + taskContext .taskName ());
101- }
102-
107+ (taskContext , input , processBuilder ) -> {
103108 try {
104109 Process process = processBuilder .start ();
105110
106111 if (taskConfiguration .isAwait ()) {
107- return executeAwaiting (taskConfiguration , definition , process );
112+ return waitForResult (taskConfiguration , definition , process );
108113 } else {
109114 return input ;
110115 }
111116
112117 } catch (IOException | InterruptedException e ) {
113- throw new RuntimeException ( e );
118+ throw new WorkflowException ( WorkflowError . runtime ( taskContext , e ). build (), e );
114119 }
115120 };
116121 }
117122
118- private WorkflowModel executeAwaiting (
123+ private WorkflowModel waitForResult (
119124 RunShell taskConfiguration , WorkflowDefinition definition , Process process )
120125 throws IOException , InterruptedException {
121- StringBuilder stdout = new StringBuilder ();
122- StringBuilder stderr = new StringBuilder ();
123126
124- process .getInputStream ().transferTo (the (stdout ));
125- process .getErrorStream ().transferTo (the (stderr ));
127+ CompletableFuture <String > futureStdout =
128+ CompletableFuture .supplyAsync (() -> readInputStream (process .getInputStream ()));
129+ CompletableFuture <String > futureStderr =
130+ CompletableFuture .supplyAsync (() -> readInputStream (process .getErrorStream ()));
131+
126132 int exitCode = process .waitFor ();
127133
134+ CompletableFuture <Void > allStd = CompletableFuture .allOf (futureStdout , futureStderr );
135+
136+ allStd .join ();
137+
138+ String stdout = futureStdout .join ();
139+ String stderr = futureStderr .join ();
140+
128141 RunTaskConfiguration .ProcessReturnType returnType = taskConfiguration .getReturn ();
129142
130143 WorkflowModelFactory modelFactory = definition .application ().modelFactory ();
131144
132145 return switch (returnType ) {
133- case ALL ->
134- modelFactory .fromAny (
135- new ProcessResult (exitCode , stdout .toString ().trim (), stderr .toString ().trim ()));
146+ case ALL -> modelFactory .fromAny (new ProcessResult (exitCode , stdout .trim (), stderr .trim ()));
136147 case NONE -> modelFactory .fromNull ();
137148 case CODE -> modelFactory .from (exitCode );
138- case STDOUT -> modelFactory .from (stdout .toString (). trim ());
139- case STDERR -> modelFactory .from (stderr .toString (). trim ());
149+ case STDOUT -> modelFactory .from (stdout .trim ());
150+ case STDERR -> modelFactory .from (stderr .trim ());
140151 };
141152 }
142153
@@ -145,18 +156,18 @@ public boolean accept(Class<? extends RunTaskConfiguration> clazz) {
145156 return RunShell .class .equals (clazz );
146157 }
147158
148- /**
149- * Helper to create an OutputStream that writes to a StringBuilder
150- *
151- * @param output the {@link StringBuilder} to write to
152- * @return the {@link OutputStream}
153- */
154- private static OutputStream the (StringBuilder output ) {
155- return new OutputStream () {
156- @ Override
157- public void write (int b ) {
158- output .append ((char ) b );
159+ public static String readInputStream (InputStream inputStream ) {
160+ StringWriter writer = new StringWriter ();
161+ try (BufferedReader reader =
162+ new BufferedReader (new InputStreamReader (inputStream , StandardCharsets .UTF_8 ))) {
163+ char [] buffer = new char [1024 ];
164+ int charsRead ;
165+ while ((charsRead = reader .read (buffer )) != -1 ) {
166+ writer .write (buffer , 0 , charsRead );
159167 }
160- };
168+ } catch (IOException e ) {
169+ throw new RuntimeException (e );
170+ }
171+ return writer .toString ();
161172 }
162173}
0 commit comments