Skip to content

Commit b043e89

Browse files
authored
[Fix #899] Add RunTask infrastructure (#900)
Fix #899 Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent 255b031 commit b043e89

File tree

5 files changed

+167
-0
lines changed

5 files changed

+167
-0
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.serverlessworkflow.impl.executors.ForkExecutor.ForkExecutorBuilder;
2828
import io.serverlessworkflow.impl.executors.ListenExecutor.ListenExecutorBuilder;
2929
import io.serverlessworkflow.impl.executors.RaiseExecutor.RaiseExecutorBuilder;
30+
import io.serverlessworkflow.impl.executors.RunTaskExecutor.RunTaskExecutorBuilder;
3031
import io.serverlessworkflow.impl.executors.SetExecutor.SetExecutorBuilder;
3132
import io.serverlessworkflow.impl.executors.SwitchExecutor.SwitchExecutorBuilder;
3233
import io.serverlessworkflow.impl.executors.TryExecutor.TryExecutorBuilder;
@@ -76,6 +77,8 @@ public TaskExecutorBuilder<? extends TaskBase> getTaskExecutor(
7677
return new ListenExecutorBuilder(position, task.getListenTask(), definition);
7778
} else if (task.getEmitTask() != null) {
7879
return new EmitExecutorBuilder(position, task.getEmitTask(), definition);
80+
} else if (task.getRunTask() != null) {
81+
return new RunTaskExecutorBuilder(position, task.getRunTask(), definition);
7982
}
8083
throw new UnsupportedOperationException(task.get().getClass().getName() + " not supported yet");
8184
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors;
17+
18+
import io.serverlessworkflow.api.types.RunTask;
19+
import io.serverlessworkflow.api.types.RunTaskConfiguration;
20+
import io.serverlessworkflow.impl.TaskContext;
21+
import io.serverlessworkflow.impl.WorkflowContext;
22+
import io.serverlessworkflow.impl.WorkflowDefinition;
23+
import io.serverlessworkflow.impl.WorkflowModel;
24+
import io.serverlessworkflow.impl.WorkflowMutablePosition;
25+
import java.util.ServiceLoader;
26+
import java.util.ServiceLoader.Provider;
27+
import java.util.concurrent.CompletableFuture;
28+
29+
public class RunTaskExecutor<T extends RunTaskConfiguration> extends RegularTaskExecutor<RunTask> {
30+
31+
private final RunnableTask<T> runnable;
32+
33+
private static final ServiceLoader<RunnableTask> runnables =
34+
ServiceLoader.load(RunnableTask.class);
35+
36+
public static class RunTaskExecutorBuilder extends RegularTaskExecutorBuilder<RunTask> {
37+
private RunnableTask runnable;
38+
39+
protected RunTaskExecutorBuilder(
40+
WorkflowMutablePosition position, RunTask task, WorkflowDefinition definition) {
41+
super(position, task, definition);
42+
RunTaskConfiguration config = task.getRun().get();
43+
this.runnable =
44+
runnables.stream()
45+
.map(Provider::get)
46+
.filter(r -> r.accept(config.getClass()))
47+
.findFirst()
48+
.orElseThrow(
49+
() ->
50+
new UnsupportedOperationException(
51+
"No runnable found for operation " + config.getClass()));
52+
runnable.init(config, definition);
53+
}
54+
55+
@Override
56+
public RunTaskExecutor buildInstance() {
57+
return new RunTaskExecutor(this);
58+
}
59+
}
60+
61+
protected RunTaskExecutor(RunTaskExecutorBuilder builder) {
62+
super(builder);
63+
this.runnable = builder.runnable;
64+
}
65+
66+
@Override
67+
protected CompletableFuture<WorkflowModel> internalExecute(
68+
WorkflowContext workflow, TaskContext taskContext) {
69+
return runnable.apply(workflow, taskContext, taskContext.input());
70+
}
71+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors;
17+
18+
import io.serverlessworkflow.api.types.RunTaskConfiguration;
19+
import io.serverlessworkflow.api.types.RunWorkflow;
20+
import io.serverlessworkflow.api.types.SubflowConfiguration;
21+
import io.serverlessworkflow.impl.TaskContext;
22+
import io.serverlessworkflow.impl.WorkflowContext;
23+
import io.serverlessworkflow.impl.WorkflowDefinition;
24+
import io.serverlessworkflow.impl.WorkflowDefinitionId;
25+
import io.serverlessworkflow.impl.WorkflowModel;
26+
import java.util.Map;
27+
import java.util.concurrent.CompletableFuture;
28+
29+
public class RunWorkflowExecutor implements RunnableTask<RunWorkflow> {
30+
31+
private WorkflowDefinitionId workflowDefinitionId;
32+
private Map<String, Object> additionalParameters;
33+
34+
public void init(RunWorkflow taskConfiguration, WorkflowDefinition definition) {
35+
SubflowConfiguration workflowConfig = taskConfiguration.getWorkflow();
36+
this.workflowDefinitionId =
37+
new WorkflowDefinitionId(
38+
workflowConfig.getNamespace(), workflowConfig.getName(), workflowConfig.getVersion());
39+
this.additionalParameters = workflowConfig.getInput().getAdditionalProperties();
40+
}
41+
42+
@Override
43+
public CompletableFuture<WorkflowModel> apply(
44+
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
45+
WorkflowDefinition definition =
46+
workflowContext.definition().application().workflowDefinitions().get(workflowDefinitionId);
47+
if (definition != null) {
48+
// TODO add additional parameters
49+
return definition.instance(input).start();
50+
} else {
51+
throw new IllegalArgumentException(
52+
"Workflow definition for " + workflowDefinitionId + " has not been found");
53+
}
54+
}
55+
56+
@Override
57+
public boolean accept(Class<? extends RunTaskConfiguration> clazz) {
58+
return RunWorkflow.class.equals(clazz);
59+
}
60+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors;
17+
18+
import io.serverlessworkflow.api.types.RunTaskConfiguration;
19+
import io.serverlessworkflow.impl.TaskContext;
20+
import io.serverlessworkflow.impl.WorkflowContext;
21+
import io.serverlessworkflow.impl.WorkflowDefinition;
22+
import io.serverlessworkflow.impl.WorkflowModel;
23+
import java.util.concurrent.CompletableFuture;
24+
25+
public interface RunnableTask<T extends RunTaskConfiguration> {
26+
default void init(T taskConfiguration, WorkflowDefinition definition) {}
27+
28+
CompletableFuture<WorkflowModel> apply(
29+
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input);
30+
31+
boolean accept(Class<? extends RunTaskConfiguration> clazz);
32+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
io.serverlessworkflow.impl.executors.RunWorkflowExecutor

0 commit comments

Comments
 (0)