Skip to content

Commit 3c9f078

Browse files
authored
Add RunTask.shell task (#898)
* Add support for RunTask.shell task Signed-off-by: Matheus Cruz <matheuscruz.dev@gmail.com> * Apply pull request suggestions Signed-off-by: Matheus Cruz <matheuscruz.dev@gmail.com> * Apply pull request suggestions Signed-off-by: Matheus Cruz <matheuscruz.dev@gmail.com> * Apply pull request suggestions Signed-off-by: Matheus Cruz <matheuscruz.dev@gmail.com> --------- Signed-off-by: Matheus Cruz <matheuscruz.dev@gmail.com>
1 parent 7a3e216 commit 3c9f078

File tree

16 files changed

+621
-1
lines changed

16 files changed

+621
-1
lines changed
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors;
17+
18+
public record ProcessResult(int code, String stdout, String stderr) {}
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors;
17+
18+
import io.serverlessworkflow.api.types.RunShell;
19+
import io.serverlessworkflow.api.types.RunTaskConfiguration;
20+
import io.serverlessworkflow.api.types.Shell;
21+
import io.serverlessworkflow.impl.TaskContext;
22+
import io.serverlessworkflow.impl.WorkflowApplication;
23+
import io.serverlessworkflow.impl.WorkflowContext;
24+
import io.serverlessworkflow.impl.WorkflowDefinition;
25+
import io.serverlessworkflow.impl.WorkflowError;
26+
import io.serverlessworkflow.impl.WorkflowException;
27+
import io.serverlessworkflow.impl.WorkflowModel;
28+
import io.serverlessworkflow.impl.WorkflowModelFactory;
29+
import io.serverlessworkflow.impl.WorkflowUtils;
30+
import io.serverlessworkflow.impl.expressions.ExpressionUtils;
31+
import java.io.IOException;
32+
import java.nio.charset.StandardCharsets;
33+
import java.util.Map;
34+
import java.util.concurrent.CompletableFuture;
35+
36+
public class RunShellExecutor implements RunnableTask<RunShell> {
37+
38+
private ShellResultSupplier shellResultSupplier;
39+
private ProcessBuilderSupplier processBuilderSupplier;
40+
41+
@FunctionalInterface
42+
private interface ShellResultSupplier {
43+
WorkflowModel apply(
44+
TaskContext taskContext, WorkflowModel input, ProcessBuilder processBuilder);
45+
}
46+
47+
@FunctionalInterface
48+
private interface ProcessBuilderSupplier {
49+
ProcessBuilder apply(WorkflowContext workflowContext, TaskContext taskContext);
50+
}
51+
52+
@Override
53+
public CompletableFuture<WorkflowModel> apply(
54+
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
55+
ProcessBuilder processBuilder = this.processBuilderSupplier.apply(workflowContext, taskContext);
56+
return CompletableFuture.completedFuture(
57+
this.shellResultSupplier.apply(taskContext, input, processBuilder));
58+
}
59+
60+
@Override
61+
public void init(RunShell taskConfiguration, WorkflowDefinition definition) {
62+
Shell shell = taskConfiguration.getShell();
63+
final String shellCommand = shell.getCommand();
64+
65+
if (shellCommand == null || shellCommand.isBlank()) {
66+
throw new IllegalStateException("Missing shell command in RunShell task configuration");
67+
}
68+
this.processBuilderSupplier =
69+
(workflowContext, taskContext) -> {
70+
WorkflowApplication application = definition.application();
71+
72+
String command =
73+
ExpressionUtils.isExpr(shellCommand)
74+
? WorkflowUtils.buildStringResolver(
75+
application, shellCommand, taskContext.input().asJavaObject())
76+
.apply(workflowContext, taskContext, taskContext.input())
77+
: shellCommand;
78+
79+
StringBuilder commandBuilder = new StringBuilder(command);
80+
81+
if (shell.getArguments() != null
82+
&& shell.getArguments().getAdditionalProperties() != null) {
83+
for (Map.Entry<String, Object> entry :
84+
shell.getArguments().getAdditionalProperties().entrySet()) {
85+
86+
String argKey =
87+
ExpressionUtils.isExpr(entry.getKey())
88+
? WorkflowUtils.buildStringResolver(
89+
application, entry.getKey(), taskContext.input().asJavaObject())
90+
.apply(workflowContext, taskContext, taskContext.input())
91+
: entry.getKey();
92+
93+
if (entry.getValue() == null) {
94+
commandBuilder.append(" ").append(argKey);
95+
continue;
96+
}
97+
98+
String argValue =
99+
ExpressionUtils.isExpr(entry.getValue())
100+
? WorkflowUtils.buildStringResolver(
101+
application,
102+
entry.getValue().toString(),
103+
taskContext.input().asJavaObject())
104+
.apply(workflowContext, taskContext, taskContext.input())
105+
: entry.getValue().toString();
106+
107+
commandBuilder.append(" ").append(argKey).append("=").append(argValue);
108+
}
109+
}
110+
111+
// TODO: support Windows cmd.exe
112+
ProcessBuilder builder = new ProcessBuilder("sh", "-c", commandBuilder.toString());
113+
114+
if (shell.getEnvironment() != null
115+
&& shell.getEnvironment().getAdditionalProperties() != null) {
116+
for (Map.Entry<String, Object> entry :
117+
shell.getEnvironment().getAdditionalProperties().entrySet()) {
118+
String value =
119+
ExpressionUtils.isExpr(entry.getValue())
120+
? WorkflowUtils.buildStringResolver(
121+
application,
122+
entry.getValue().toString(),
123+
taskContext.input().asJavaObject())
124+
.apply(workflowContext, taskContext, taskContext.input())
125+
: entry.getValue().toString();
126+
127+
// configure environments
128+
builder.environment().put(entry.getKey(), value);
129+
}
130+
}
131+
132+
return builder;
133+
};
134+
135+
this.shellResultSupplier =
136+
(taskContext, input, processBuilder) -> {
137+
try {
138+
Process process = processBuilder.start();
139+
140+
if (taskConfiguration.isAwait()) {
141+
return buildResultFromProcess(taskConfiguration, definition, process);
142+
} else {
143+
return input;
144+
}
145+
146+
} catch (IOException | InterruptedException e) {
147+
throw new WorkflowException(WorkflowError.runtime(taskContext, e).build(), e);
148+
}
149+
};
150+
}
151+
152+
/**
153+
* Builds the WorkflowModel result from the executed process. It waits for the process to finish
154+
* and captures the exit code, stdout, and stderr based on the task configuration.
155+
*/
156+
private WorkflowModel buildResultFromProcess(
157+
RunShell taskConfiguration, WorkflowDefinition definition, Process process)
158+
throws IOException, InterruptedException {
159+
160+
int exitCode = process.waitFor();
161+
162+
String stdout = new String(process.getInputStream().readAllBytes(), StandardCharsets.UTF_8);
163+
String stderr = new String(process.getErrorStream().readAllBytes(), StandardCharsets.UTF_8);
164+
165+
RunTaskConfiguration.ProcessReturnType returnType = taskConfiguration.getReturn();
166+
167+
WorkflowModelFactory modelFactory = definition.application().modelFactory();
168+
169+
return switch (returnType) {
170+
case ALL -> modelFactory.fromAny(new ProcessResult(exitCode, stdout.trim(), stderr.trim()));
171+
case NONE -> modelFactory.fromNull();
172+
case CODE -> modelFactory.from(exitCode);
173+
case STDOUT -> modelFactory.from(stdout.trim());
174+
case STDERR -> modelFactory.from(stderr.trim());
175+
};
176+
}
177+
178+
@Override
179+
public boolean accept(Class<? extends RunTaskConfiguration> clazz) {
180+
return RunShell.class.equals(clazz);
181+
}
182+
}
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1-
io.serverlessworkflow.impl.executors.RunWorkflowExecutor
1+
io.serverlessworkflow.impl.executors.RunWorkflowExecutor
2+
io.serverlessworkflow.impl.executors.RunShellExecutor

0 commit comments

Comments
 (0)