Skip to content

Commit f9c5d18

Browse files
committed
refactoring + tests
Signed-off-by: Dmitrii Tikhomirov <chani.liet@gmail.com>
1 parent a29faa6 commit f9c5d18

18 files changed

+363
-86
lines changed

impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/CommandPropertySetter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import com.github.dockerjava.api.command.CreateContainerCmd;
1919
import io.serverlessworkflow.api.types.Container;
20+
import java.util.function.Function;
2021

2122
class CommandPropertySetter extends ContainerPropertySetter {
2223

@@ -25,7 +26,7 @@ class CommandPropertySetter extends ContainerPropertySetter {
2526
}
2627

2728
@Override
28-
public void accept(StringExpressionResolver resolver) {
29+
public void accept(Function<String, String> resolver) {
2930
if (configuration.getCommand() != null && !configuration.getCommand().isEmpty()) {
3031
createContainerCmd.withCmd("sh", "-c", configuration.getCommand());
3132
}

impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerEnvironmentPropertySetter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.ArrayList;
2121
import java.util.List;
2222
import java.util.Map;
23+
import java.util.function.Function;
2324

2425
class ContainerEnvironmentPropertySetter extends ContainerPropertySetter {
2526

@@ -29,15 +30,15 @@ class ContainerEnvironmentPropertySetter extends ContainerPropertySetter {
2930
}
3031

3132
@Override
32-
public void accept(StringExpressionResolver resolver) {
33+
public void accept(Function<String, String> resolver) {
3334
if (!(configuration.getEnvironment() == null
3435
|| configuration.getEnvironment().getAdditionalProperties() == null)) {
3536
List<String> envs = new ArrayList<>();
3637
for (Map.Entry<String, Object> entry :
3738
configuration.getEnvironment().getAdditionalProperties().entrySet()) {
3839
String key = entry.getKey();
3940
if (entry.getValue() instanceof String value) {
40-
String resolvedValue = resolver.resolve(value);
41+
String resolvedValue = resolver.apply(value);
4142
envs.add(key + "=" + resolvedValue);
4243
} else {
4344
throw new IllegalArgumentException("Environment variable values must be strings");

impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerPropertySetter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@
1818
import com.github.dockerjava.api.command.CreateContainerCmd;
1919
import io.serverlessworkflow.api.types.Container;
2020
import java.util.function.Consumer;
21+
import java.util.function.Function;
2122

22-
abstract class ContainerPropertySetter implements Consumer<StringExpressionResolver> {
23+
abstract class ContainerPropertySetter implements Consumer<Function<String, String>> {
2324

2425
protected final CreateContainerCmd createContainerCmd;
2526
protected final Container configuration;

impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java

Lines changed: 47 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@
2020
import com.github.dockerjava.api.DockerClient;
2121
import com.github.dockerjava.api.command.CreateContainerCmd;
2222
import com.github.dockerjava.api.command.CreateContainerResponse;
23-
import com.github.dockerjava.api.command.PullImageResultCallback;
2423
import com.github.dockerjava.api.command.WaitContainerResultCallback;
24+
import com.github.dockerjava.api.exception.DockerClientException;
25+
import com.github.dockerjava.api.exception.NotFoundException;
2526
import com.github.dockerjava.core.DefaultDockerClientConfig;
2627
import com.github.dockerjava.core.DockerClientImpl;
2728
import com.github.dockerjava.core.NameParser;
@@ -38,6 +39,7 @@
3839
import java.util.List;
3940
import java.util.concurrent.CompletableFuture;
4041
import java.util.concurrent.TimeUnit;
42+
import java.util.function.Function;
4143

4244
class ContainerRunner {
4345

@@ -62,47 +64,45 @@ private ContainerRunner(
6264
this.propertySetters = new ArrayList<>();
6365
}
6466

65-
/**
66-
* Blocking container execution according to the lifetime policy. Returns an already completed
67-
* CompletableFuture: - completedFuture(input) if exitCode == 0 - exceptionally completed if the
68-
* exit code is non-zero or an error occurs. The method blocks the calling thread until the
69-
* container finishes or the timeout expires.
70-
*/
71-
CompletableFuture<WorkflowModel> startSync(
67+
CompletableFuture<WorkflowModel> start(
7268
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
69+
return CompletableFuture.supplyAsync(
70+
() -> startSync(workflowContext, taskContext, input),
71+
definition.application().executorService());
72+
}
7373

74+
private WorkflowModel startSync(
75+
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
7476
try {
7577
var resolver = new StringExpressionResolver(workflowContext, taskContext, input);
7678
applyPropertySetters(resolver);
7779
pullImageIfNeeded(container.getImage());
7880

7981
String id = createAndStartContainer();
8082
int exit = waitAccordingToLifetime(id, workflowContext, taskContext, input);
81-
82-
return mapExitCode(exit, input);
83+
if (exit == 0) {
84+
return input;
85+
}
86+
throw mapExitCode(exit);
8387
} catch (InterruptedException ie) {
8488
Thread.currentThread().interrupt();
85-
return failed("Interrupted while waiting for container");
89+
throw failed("Container execution failed with exit code " + ie.getMessage());
8690
} catch (Exception e) {
87-
return failed("Container run failed: " + e.getMessage());
91+
throw failed("Container execution failed with exit code " + e.getMessage());
8892
}
8993
}
9094

91-
private void applyPropertySetters(StringExpressionResolver resolver) {
92-
for (var setter : propertySetters) setter.accept(resolver);
95+
private void applyPropertySetters(Function<String, String> resolver) {
96+
propertySetters.forEach(setter -> setter.accept(resolver));
9397
}
9498

9599
private void pullImageIfNeeded(String imageRef) throws InterruptedException {
96100
NameParser.ReposTag rt = NameParser.parseRepositoryTag(imageRef);
97101
NameParser.HostnameReposName hr = NameParser.resolveRepositoryName(imageRef);
98102

99103
String repository = hr.reposName;
100-
String tag = rt.tag != null && rt.tag.isEmpty() ? rt.tag : "latest";
101-
dockerClient
102-
.pullImageCmd(repository)
103-
.withTag(tag)
104-
.exec(new PullImageResultCallback())
105-
.awaitCompletion();
104+
String tag = (rt.tag == null || rt.tag.isBlank()) ? "latest" : rt.tag;
105+
dockerClient.pullImageCmd(repository).withTag(tag).start().awaitCompletion();
106106
}
107107

108108
private String createAndStartContainer() {
@@ -123,25 +123,22 @@ private int waitAccordingToLifetime(
123123
var policy = lifetime != null ? lifetime.getCleanup() : null;
124124

125125
try (var cb = dockerClient.waitContainerCmd(id).exec(new WaitContainerResultCallback())) {
126-
127126
if (policy == ContainerCleanupPolicy.EVENTUALLY) {
128127
Duration timeout = resolveAfter(lifetime, workflowContext, taskContext, input);
129-
int exit = cb.awaitStatusCode(timeout.toMillis(), TimeUnit.MILLISECONDS);
130-
131-
if (isRunning(id)) {
132-
safeStop(id, Duration.ofSeconds(10));
128+
try {
129+
Integer exit = cb.awaitStatusCode(timeout.toMillis(), TimeUnit.MILLISECONDS);
130+
safeStop(id);
131+
return exit != null ? exit : 0;
132+
} catch (DockerClientException timeoutOrOther) {
133+
safeStop(id);
133134
}
134-
safeRemove(id);
135-
return exit;
136-
137135
} else {
138-
int exit = cb.awaitStatusCode();
139-
if (policy == ContainerCleanupPolicy.ALWAYS) {
140-
safeRemove(id);
141-
}
142-
return exit;
136+
return cb.awaitStatusCode();
143137
}
138+
} catch (NotFoundException e) {
139+
// container already removed
144140
}
141+
return 0;
145142
}
146143

147144
private Duration resolveAfter(
@@ -167,6 +164,20 @@ private boolean isRunning(String id) {
167164
}
168165
}
169166

167+
private void safeStop(String id) {
168+
if (isRunning(id)) {
169+
safeStop(id, Duration.ofSeconds(10));
170+
try (var cb2 = dockerClient.waitContainerCmd(id).exec(new WaitContainerResultCallback())) {
171+
cb2.awaitStatusCode();
172+
safeRemove(id);
173+
} catch (Exception ignore) {
174+
// we can ignore this
175+
}
176+
} else {
177+
safeRemove(id);
178+
}
179+
}
180+
170181
private void safeStop(String id, Duration timeout) {
171182
try {
172183
dockerClient.stopContainerCmd(id).withTimeout((int) Math.max(1, timeout.toSeconds())).exec();
@@ -184,9 +195,8 @@ private void safeRemove(String id) {
184195
}
185196
}
186197

187-
private static <T> CompletableFuture<T> mapExitCode(int exit, T ok) {
198+
private static Exception mapExitCode(int exit) {
188199
return switch (exit) {
189-
case 0 -> CompletableFuture.completedFuture(ok);
190200
case 1 -> failed("General error (exit code 1)");
191201
case 2 -> failed("Shell syntax error (exit code 2)");
192202
case 126 -> failed("Command found but not executable (exit code 126)");
@@ -199,10 +209,8 @@ private static <T> CompletableFuture<T> mapExitCode(int exit, T ok) {
199209
};
200210
}
201211

202-
private static <T> CompletableFuture<T> failed(String message) {
203-
CompletableFuture<T> f = new CompletableFuture<>();
204-
f.completeExceptionally(new RuntimeException(message));
205-
return f;
212+
private static RuntimeException failed(String message) {
213+
return new RuntimeException(message);
206214
}
207215

208216
static ContainerRunnerBuilder builder() {

impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/LifetimePropertySetter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.github.dockerjava.api.command.CreateContainerCmd;
2121
import io.serverlessworkflow.api.types.Container;
2222
import io.serverlessworkflow.api.types.ContainerLifetime;
23+
import java.util.function.Function;
2324

2425
class LifetimePropertySetter extends ContainerPropertySetter {
2526

@@ -28,7 +29,7 @@ class LifetimePropertySetter extends ContainerPropertySetter {
2829
}
2930

3031
@Override
31-
public void accept(StringExpressionResolver resolver) {
32+
public void accept(Function<String, String> resolver) {
3233
// case of cleanup=eventually processed at ContainerRunner
3334
if (configuration.getLifetime() != null) {
3435
ContainerLifetime lifetime = configuration.getLifetime();

impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/NamePropertySetter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import com.github.dockerjava.api.command.CreateContainerCmd;
1919
import io.serverlessworkflow.api.types.Container;
20+
import java.util.function.Function;
2021

2122
class NamePropertySetter extends ContainerPropertySetter {
2223

@@ -25,9 +26,9 @@ class NamePropertySetter extends ContainerPropertySetter {
2526
}
2627

2728
@Override
28-
public void accept(StringExpressionResolver resolver) {
29+
public void accept(Function<String, String> resolver) {
2930
if (configuration.getName() != null && !configuration.getName().isEmpty()) {
30-
String resolvedName = resolver.resolve(configuration.getName());
31+
String resolvedName = resolver.apply(configuration.getName());
3132
createContainerCmd.withName(resolvedName);
3233
}
3334
}

impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/PortsPropertySetter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.ArrayList;
2323
import java.util.List;
2424
import java.util.Map;
25+
import java.util.function.Function;
2526

2627
class PortsPropertySetter extends ContainerPropertySetter {
2728

@@ -30,7 +31,7 @@ class PortsPropertySetter extends ContainerPropertySetter {
3031
}
3132

3233
@Override
33-
public void accept(StringExpressionResolver resolver) {
34+
public void accept(Function<String, String> resolver) {
3435
if (configuration.getPorts() != null
3536
&& configuration.getPorts().getAdditionalProperties() != null) {
3637
Ports portBindings = new Ports();

impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/RunContainerExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public void init(RunContainer taskConfiguration, WorkflowDefinition definition)
4141
@Override
4242
public CompletableFuture<WorkflowModel> apply(
4343
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
44-
return containerRunner.startSync(workflowContext, taskContext, input);
44+
return containerRunner.start(workflowContext, taskContext, input);
4545
}
4646

4747
@Override

impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/StringExpressionResolver.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@
2020
import io.serverlessworkflow.impl.WorkflowModel;
2121
import io.serverlessworkflow.impl.WorkflowUtils;
2222
import io.serverlessworkflow.impl.expressions.ExpressionUtils;
23+
import java.util.function.Function;
2324

24-
class StringExpressionResolver {
25+
class StringExpressionResolver implements Function<String, String> {
2526

2627
private final WorkflowContext workflowContext;
2728
private final TaskContext taskContext;
@@ -34,9 +35,9 @@ class StringExpressionResolver {
3435
this.input = input;
3536
}
3637

37-
String resolve(String expression) {
38+
public String apply(String expression) {
3839
if (ExpressionUtils.isExpr(expression)) {
39-
WorkflowUtils.buildStringResolver(
40+
return WorkflowUtils.buildStringResolver(
4041
workflowContext.definition().application(),
4142
expression,
4243
taskContext.input().asJavaObject())

impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/VolumesPropertySetter.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.ArrayList;
2323
import java.util.List;
2424
import java.util.Map;
25+
import java.util.function.Function;
2526

2627
class VolumesPropertySetter extends ContainerPropertySetter {
2728

@@ -30,16 +31,16 @@ class VolumesPropertySetter extends ContainerPropertySetter {
3031
}
3132

3233
@Override
33-
public void accept(StringExpressionResolver resolver) {
34+
public void accept(Function<String, String> resolver) {
3435
if (configuration.getVolumes() != null
3536
&& configuration.getVolumes().getAdditionalProperties() != null) {
3637
List<Bind> binds = new ArrayList<>();
3738
for (Map.Entry<String, Object> entry :
3839
configuration.getVolumes().getAdditionalProperties().entrySet()) {
3940
String hostPath = entry.getKey();
4041
if (entry.getValue() instanceof String containerPath) {
41-
String resolvedHostPath = resolver.resolve(hostPath);
42-
String resolvedContainerPath = resolver.resolve(containerPath);
42+
String resolvedHostPath = resolver.apply(hostPath);
43+
String resolvedContainerPath = resolver.apply(containerPath);
4344
binds.add(new Bind(resolvedHostPath, new Volume(resolvedContainerPath)));
4445
} else {
4546
throw new IllegalArgumentException("Volume container paths must be strings");

0 commit comments

Comments
 (0)