Skip to content

Commit 6d98cc7

Browse files
committed
[WIP] refactor
Signed-off-by: Matheus Cruz <matheuscruz.dev@gmail.com>
1 parent d202674 commit 6d98cc7

File tree

2 files changed

+194
-175
lines changed

2 files changed

+194
-175
lines changed
Lines changed: 137 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,18 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package io.serverlessworkflow.impl.executors;
217

318
import io.serverlessworkflow.api.types.RunScript;
@@ -12,143 +27,146 @@
1227
import io.serverlessworkflow.impl.WorkflowModelFactory;
1328
import io.serverlessworkflow.impl.WorkflowUtils;
1429
import io.serverlessworkflow.impl.expressions.ExpressionUtils;
15-
import org.graalvm.polyglot.Context;
16-
import org.graalvm.polyglot.PolyglotException;
17-
import org.graalvm.polyglot.Value;
18-
1930
import java.io.ByteArrayOutputStream;
20-
import java.nio.charset.StandardCharsets;
2131
import java.util.HashMap;
2232
import java.util.Map;
2333
import java.util.Set;
2434
import java.util.concurrent.CompletableFuture;
2535
import java.util.concurrent.ExecutorService;
36+
import org.graalvm.polyglot.Context;
37+
import org.graalvm.polyglot.Value;
2638

2739
public class RunScriptExecutor implements RunnableTask<RunScript> {
2840

29-
private static final Set<String> SUPPORTED_LANGUAGES = Set.of("js", "python");
30-
31-
@FunctionalInterface
32-
private interface FnExecutor {
33-
WorkflowModel apply(WorkflowContext workflowContext, TaskContext taskContext);
41+
private static final Set<String> SUPPORTED_LANGUAGES = Set.of("js", "python");
42+
43+
@FunctionalInterface
44+
private interface FnExecutor {
45+
WorkflowModel apply(WorkflowContext workflowContext, TaskContext taskContext);
46+
}
47+
48+
private FnExecutor fnExecutor;
49+
50+
@Override
51+
public void init(RunScript taskConfiguration, WorkflowDefinition definition) {
52+
ScriptUnion scriptUnion = taskConfiguration.getScript();
53+
Script script = scriptUnion.get();
54+
String language = script.getLanguage();
55+
boolean isAwait = taskConfiguration.isAwait();
56+
57+
WorkflowApplication application = definition.application();
58+
if (language == null || !SUPPORTED_LANGUAGES.contains(language.toLowerCase())) {
59+
throw new IllegalArgumentException(
60+
"Unsupported script language: "
61+
+ language
62+
+ ". Supported languages are: "
63+
+ SUPPORTED_LANGUAGES);
3464
}
3565

36-
private FnExecutor fnExecutor;
37-
38-
@Override
39-
public void init(RunScript taskConfiguration, WorkflowDefinition definition) {
40-
ScriptUnion scriptUnion = taskConfiguration.getScript();
41-
Script script = scriptUnion.get();
42-
String language = script.getLanguage();
43-
boolean isAwait = taskConfiguration.isAwait();
44-
45-
WorkflowApplication application = definition.application();
46-
if (language == null || !SUPPORTED_LANGUAGES.contains(language.toLowerCase())) {
47-
throw new IllegalArgumentException("Unsupported script language: " + language + ". Supported languages are: " + SUPPORTED_LANGUAGES);
48-
}
49-
50-
fnExecutor = (workflowContext, taskContext) -> {
51-
52-
ByteArrayOutputStream stderr = new ByteArrayOutputStream();
53-
ByteArrayOutputStream stdout = new ByteArrayOutputStream();
54-
55-
Map<String, Object> args = new HashMap<>();
56-
if (script.getArguments() != null
57-
&& script.getArguments().getAdditionalProperties() != null) {
58-
for (Map.Entry<String, Object> entry :
59-
script.getArguments().getAdditionalProperties().entrySet()) {
60-
61-
String key = ExpressionUtils.isExpr(entry.getKey())
62-
? WorkflowUtils.buildStringResolver(
63-
application, entry.getKey(), taskContext.input().asJavaObject())
64-
.apply(workflowContext, taskContext, taskContext.input())
65-
: entry.getKey();
66-
67-
Object value = null;
68-
if (entry.getValue() != null) {
69-
value = ExpressionUtils.isExpr(entry.getValue())
70-
? WorkflowUtils.buildStringResolver(
71-
application,
72-
entry.getValue().toString(),
73-
taskContext.input().asJavaObject())
74-
.apply(workflowContext, taskContext, taskContext.input())
75-
: entry.getValue();
76-
}
77-
args.put(key, value);
78-
}
66+
fnExecutor =
67+
(workflowContext, taskContext) -> {
68+
ByteArrayOutputStream stderr = new ByteArrayOutputStream();
69+
ByteArrayOutputStream stdout = new ByteArrayOutputStream();
70+
71+
Map<String, String> envs = new HashMap<>();
72+
if (script.getEnvironment() != null) {
73+
for (Map.Entry<String, Object> entry :
74+
script.getEnvironment().getAdditionalProperties().entrySet()) {
75+
String value =
76+
ExpressionUtils.isExpr(entry.getValue())
77+
? WorkflowUtils.buildStringResolver(
78+
application,
79+
entry.getValue().toString(),
80+
taskContext.input().asJavaObject())
81+
.apply(workflowContext, taskContext, taskContext.input())
82+
: entry.getValue().toString();
83+
envs.put(entry.getKey(), value);
7984
}
80-
81-
Map<String, String> envs = new HashMap<>();
82-
if (script.getEnvironment() != null
83-
&& script.getEnvironment().getAdditionalProperties() != null) {
84-
for (Map.Entry<String, Object> entry :
85-
script.getEnvironment().getAdditionalProperties().entrySet()) {
86-
String value =
87-
ExpressionUtils.isExpr(entry.getValue())
88-
? WorkflowUtils.buildStringResolver(
89-
application,
90-
entry.getValue().toString(),
91-
taskContext.input().asJavaObject())
92-
.apply(workflowContext, taskContext, taskContext.input())
93-
: entry.getValue().toString();
94-
envs.put(entry.getKey(), value);
95-
}
85+
}
86+
87+
Map<String, Object> args = new HashMap<>();
88+
if (script.getArguments() != null) {
89+
for (Map.Entry<String, Object> entry :
90+
script.getArguments().getAdditionalProperties().entrySet()) {
91+
String value =
92+
ExpressionUtils.isExpr(entry.getValue())
93+
? WorkflowUtils.buildStringResolver(
94+
application,
95+
entry.getValue().toString(),
96+
taskContext.input().asJavaObject())
97+
.apply(workflowContext, taskContext, taskContext.input())
98+
: entry.getValue().toString();
99+
args.put(entry.getKey(), value);
96100
}
97-
98-
try (Context jsCtx = Context.newBuilder(language.toLowerCase())
99-
.err(stderr)
100-
.out(stdout)
101-
.environment(envs)
102-
.option("engine.WarnInterpreterOnly", "false") // disabling it due to warnings in stderr
103-
.build()) {
104-
105-
ExecutorService executorService = application.executorService();
106-
107-
args.forEach((arg, val) -> {
108-
jsCtx.getBindings(language.toLowerCase()).putMember(arg, val);
101+
}
102+
103+
try (Context context =
104+
Context.newBuilder(language.toLowerCase())
105+
.err(stderr)
106+
.out(stdout)
107+
.environment(envs)
108+
.option(
109+
"engine.WarnInterpreterOnly",
110+
"false") // disabling it due to warnings in stderr
111+
.build()) {
112+
113+
ExecutorService executorService = application.executorService();
114+
115+
args.forEach(
116+
(arg, val) -> {
117+
context.getBindings(language.toLowerCase()).putMember(arg, val);
109118
});
110119

111-
// configure process.env for js
112-
if (language.equalsIgnoreCase("js")) {
113-
Value bindings = jsCtx.getBindings("js");
114-
Value process = jsCtx.eval("js", "({ env: {} })");
115-
116-
for (var entry : envs.entrySet()) {
117-
process.getMember("env").putMember(entry.getKey(), entry.getValue());
118-
}
119-
bindings.putMember("process", process);
120-
}
121-
122-
if (!isAwait) {
123-
executorService.submit(() -> {
124-
jsCtx.eval(scriptUnion.getInlineScript().getLanguage(), scriptUnion.getInlineScript().getCode());
125-
});
126-
return application.modelFactory().fromAny(taskContext.input());
127-
}
128-
129-
jsCtx.eval(scriptUnion.getInlineScript().getLanguage(), scriptUnion.getInlineScript().getCode());
130-
WorkflowModelFactory modelFactory = application.modelFactory();
131-
132-
return switch (taskConfiguration.getReturn()) {
133-
case ALL -> modelFactory.fromAny(new ProcessResult(0, stdout.toString(), stderr.toString()));
134-
case NONE -> modelFactory.fromNull();
135-
case CODE -> modelFactory.from(0);
136-
case STDOUT -> modelFactory.from(stdout.toString().trim());
137-
case STDERR -> modelFactory.from(stderr.toString().trim());
138-
};
120+
// configure process.env for js
121+
if (language.equalsIgnoreCase("js")) {
122+
configureProcessEnv(context, envs);
123+
}
124+
125+
if (!isAwait) {
126+
executorService.submit(
127+
() -> {
128+
context.eval(
129+
scriptUnion.getInlineScript().getLanguage(),
130+
scriptUnion.getInlineScript().getCode());
131+
});
132+
return application.modelFactory().fromAny(taskContext.input());
139133
}
134+
135+
context.eval(
136+
scriptUnion.getInlineScript().getLanguage(),
137+
scriptUnion.getInlineScript().getCode());
138+
WorkflowModelFactory modelFactory = application.modelFactory();
139+
140+
return switch (taskConfiguration.getReturn()) {
141+
case ALL ->
142+
modelFactory.fromAny(new ProcessResult(0, stdout.toString(), stderr.toString()));
143+
case NONE -> modelFactory.fromNull();
144+
case CODE -> modelFactory.from(0);
145+
case STDOUT -> modelFactory.from(stdout.toString().trim());
146+
case STDERR -> modelFactory.from(stderr.toString().trim());
147+
};
148+
}
140149
};
141-
}
150+
}
142151

143-
@Override
144-
public CompletableFuture<WorkflowModel> apply(WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
145-
return CompletableFuture.completedFuture(this.fnExecutor.apply(
146-
workflowContext, taskContext
147-
));
148-
}
152+
private void configureProcessEnv(Context jsCtx, Map<String, String> envs) {
153+
Value bindings = jsCtx.getBindings("js");
154+
Value process = jsCtx.eval("js", "({ env: {} })");
149155

150-
@Override
151-
public boolean accept(Class<? extends RunTaskConfiguration> clazz) {
152-
return RunScript.class.equals(clazz);
156+
for (var entry : envs.entrySet()) {
157+
process.getMember("env").putMember(entry.getKey(), entry.getValue());
153158
}
159+
bindings.putMember("process", process);
160+
}
161+
162+
@Override
163+
public CompletableFuture<WorkflowModel> apply(
164+
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
165+
return CompletableFuture.supplyAsync(() -> this.fnExecutor.apply(workflowContext, taskContext));
166+
}
167+
168+
@Override
169+
public boolean accept(Class<? extends RunTaskConfiguration> clazz) {
170+
return RunScript.class.equals(clazz);
171+
}
154172
}

0 commit comments

Comments
 (0)