diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/CommandPropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/CommandPropertySetter.java index 7c125c72..b0af3191 100644 --- a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/CommandPropertySetter.java +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/CommandPropertySetter.java @@ -15,20 +15,38 @@ */ package io.serverlessworkflow.impl.container.executors; +import static io.serverlessworkflow.impl.WorkflowUtils.isValid; + import com.github.dockerjava.api.command.CreateContainerCmd; import io.serverlessworkflow.api.types.Container; -import java.util.function.Function; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.WorkflowValueResolver; +import java.util.Optional; + +class CommandPropertySetter implements ContainerPropertySetter { -class CommandPropertySetter extends ContainerPropertySetter { + private Optional> command; - CommandPropertySetter(CreateContainerCmd createContainerCmd, Container configuration) { - super(createContainerCmd, configuration); + CommandPropertySetter(WorkflowDefinition definition, Container configuration) { + String commandName = configuration.getCommand(); + command = + isValid(commandName) + ? Optional.of(WorkflowUtils.buildStringFilter(definition.application(), commandName)) + : Optional.empty(); } @Override - public void accept(Function resolver) { - if (configuration.getCommand() != null && !configuration.getCommand().isEmpty()) { - createContainerCmd.withCmd("sh", "-c", configuration.getCommand()); - } + public void accept( + CreateContainerCmd containerCmd, + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel model) { + command + .map(c -> c.apply(workflowContext, taskContext, model)) + .ifPresent(c -> containerCmd.withCmd("sh", "-c", c)); } } diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerEnvironmentPropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerEnvironmentPropertySetter.java index 18a4dc85..c4f143a3 100644 --- a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerEnvironmentPropertySetter.java +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerEnvironmentPropertySetter.java @@ -17,34 +17,43 @@ import com.github.dockerjava.api.command.CreateContainerCmd; import io.serverlessworkflow.api.types.Container; -import java.util.ArrayList; -import java.util.List; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.WorkflowValueResolver; import java.util.Map; -import java.util.function.Function; +import java.util.Optional; -class ContainerEnvironmentPropertySetter extends ContainerPropertySetter { +class ContainerEnvironmentPropertySetter implements ContainerPropertySetter { - ContainerEnvironmentPropertySetter( - CreateContainerCmd createContainerCmd, Container configuration) { - super(createContainerCmd, configuration); + private final Optional>> envResolver; + + ContainerEnvironmentPropertySetter(WorkflowDefinition definition, Container configuration) { + + this.envResolver = + configuration.getEnvironment() != null + && configuration.getEnvironment().getAdditionalProperties() != null + ? Optional.of( + WorkflowUtils.buildMapResolver( + definition.application(), + null, + configuration.getEnvironment().getAdditionalProperties())) + : Optional.empty(); } @Override - public void accept(Function resolver) { - if (!(configuration.getEnvironment() == null - || configuration.getEnvironment().getAdditionalProperties() == null)) { - List envs = new ArrayList<>(); - for (Map.Entry entry : - configuration.getEnvironment().getAdditionalProperties().entrySet()) { - String key = entry.getKey(); - if (entry.getValue() instanceof String value) { - String resolvedValue = resolver.apply(value); - envs.add(key + "=" + resolvedValue); - } else { - throw new IllegalArgumentException("Environment variable values must be strings"); - } - } - createContainerCmd.withEnv(envs.toArray(new String[0])); - } + public void accept( + CreateContainerCmd command, + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel model) { + envResolver + .map(env -> env.apply(workflowContext, taskContext, model)) + .ifPresent( + envs -> + command.withEnv( + envs.entrySet().stream().map(e -> e.getKey() + "=" + e.getValue()).toList())); } } diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerPropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerPropertySetter.java index 44b62ebe..9524a92a 100644 --- a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerPropertySetter.java +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerPropertySetter.java @@ -16,17 +16,14 @@ package io.serverlessworkflow.impl.container.executors; import com.github.dockerjava.api.command.CreateContainerCmd; -import io.serverlessworkflow.api.types.Container; -import java.util.function.Consumer; -import java.util.function.Function; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; -abstract class ContainerPropertySetter implements Consumer> { - - protected final CreateContainerCmd createContainerCmd; - protected final Container configuration; - - ContainerPropertySetter(CreateContainerCmd createContainerCmd, Container configuration) { - this.createContainerCmd = createContainerCmd; - this.configuration = configuration; - } +interface ContainerPropertySetter { + abstract void accept( + CreateContainerCmd command, + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel model); } diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java index 8fb8d65f..5c14a3ac 100644 --- a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java @@ -16,6 +16,7 @@ package io.serverlessworkflow.impl.container.executors; import static io.serverlessworkflow.api.types.ContainerLifetime.*; +import static io.serverlessworkflow.impl.WorkflowUtils.isValid; import com.github.dockerjava.api.DockerClient; import com.github.dockerjava.api.command.CreateContainerCmd; @@ -28,107 +29,117 @@ import com.github.dockerjava.core.NameParser; import com.github.dockerjava.httpclient5.ApacheDockerHttpClient; import io.serverlessworkflow.api.types.Container; +import io.serverlessworkflow.api.types.ContainerLifetime; +import io.serverlessworkflow.api.types.TimeoutAfter; import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowUtils; import io.serverlessworkflow.impl.WorkflowValueResolver; +import java.io.IOException; import java.time.Duration; import java.util.ArrayList; -import java.util.List; +import java.util.Collection; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.function.Function; class ContainerRunner { private static final DefaultDockerClientConfig DEFAULT_CONFIG = DefaultDockerClientConfig.createDefaultConfigBuilder().build(); - private static final DockerClient dockerClient = - DockerClientImpl.getInstance( - DEFAULT_CONFIG, - new ApacheDockerHttpClient.Builder().dockerHost(DEFAULT_CONFIG.getDockerHost()).build()); + private static class DockerClientHolder { + private static final DockerClient dockerClient = + DockerClientImpl.getInstance( + DEFAULT_CONFIG, + new ApacheDockerHttpClient.Builder() + .dockerHost(DEFAULT_CONFIG.getDockerHost()) + .build()); + } - private final CreateContainerCmd createContainerCmd; - private final Container container; - private final List propertySetters; - private final WorkflowDefinition definition; + private final Collection propertySetters; + private final Optional> timeout; + private final ContainerCleanupPolicy policy; + private final String containerImage; - private ContainerRunner( - CreateContainerCmd createContainerCmd, WorkflowDefinition definition, Container container) { - this.createContainerCmd = createContainerCmd; - this.definition = definition; - this.container = container; - this.propertySetters = new ArrayList<>(); + private ContainerRunner(ContainerRunnerBuilder builder) { + this.propertySetters = builder.propertySetters; + this.timeout = Optional.ofNullable(builder.timeout); + this.policy = builder.policy; + this.containerImage = builder.containerImage; } CompletableFuture start( WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { return CompletableFuture.supplyAsync( () -> startSync(workflowContext, taskContext, input), - definition.application().executorService()); + workflowContext.definition().application().executorService()); } private WorkflowModel startSync( WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { - try { - var resolver = new StringExpressionResolver(workflowContext, taskContext, input); - applyPropertySetters(resolver); - pullImageIfNeeded(container.getImage()); - - String id = createAndStartContainer(); - int exit = waitAccordingToLifetime(id, workflowContext, taskContext, input); - if (exit == 0) { - return input; - } + Integer exit = executeContainer(workflowContext, taskContext, input); + if (exit == null || exit == 0) { + return input; + } else { throw mapExitCode(exit); + } + } + + private Integer executeContainer( + WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { + try { + pullImageIfNeeded(containerImage); + CreateContainerCmd containerCommand = + DockerClientHolder.dockerClient.createContainerCmd(containerImage); + propertySetters.forEach(p -> p.accept(containerCommand, workflowContext, taskContext, input)); + return waitAccordingToLifetime( + createAndStartContainer(containerCommand), workflowContext, taskContext, input); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw failed("Container execution failed with exit code " + ie.getMessage()); - } catch (Exception e) { + } catch (IOException e) { throw failed("Container execution failed with exit code " + e.getMessage()); } } - private void applyPropertySetters(Function resolver) { - propertySetters.forEach(setter -> setter.accept(resolver)); - } - private void pullImageIfNeeded(String imageRef) throws InterruptedException { NameParser.ReposTag rt = NameParser.parseRepositoryTag(imageRef); - NameParser.HostnameReposName hr = NameParser.resolveRepositoryName(imageRef); - - String repository = hr.reposName; - String tag = (rt.tag == null || rt.tag.isBlank()) ? "latest" : rt.tag; - dockerClient.pullImageCmd(repository).withTag(tag).start().awaitCompletion(); + DockerClientHolder.dockerClient + .pullImageCmd(NameParser.resolveRepositoryName(imageRef).reposName) + .withTag(WorkflowUtils.isValid(rt.tag) ? rt.tag : "latest") + .start() + .awaitCompletion(); } - private String createAndStartContainer() { - CreateContainerResponse resp = createContainerCmd.exec(); + private String createAndStartContainer(CreateContainerCmd containerCommand) { + CreateContainerResponse resp = containerCommand.exec(); String id = resp.getId(); - if (id == null || id.isEmpty()) { + if (!isValid(id)) { throw new IllegalStateException("Container creation failed: empty ID"); } - dockerClient.startContainerCmd(id).exec(); + DockerClientHolder.dockerClient.startContainerCmd(id).exec(); return id; } - private int waitAccordingToLifetime( + private Integer waitAccordingToLifetime( String id, WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) - throws Exception { - - var lifetime = container.getLifetime(); - var policy = lifetime != null ? lifetime.getCleanup() : null; - - try (var cb = dockerClient.waitContainerCmd(id).exec(new WaitContainerResultCallback())) { + throws IOException { + try (var cb = + DockerClientHolder.dockerClient + .waitContainerCmd(id) + .exec(new WaitContainerResultCallback())) { if (policy == ContainerCleanupPolicy.EVENTUALLY) { - Duration timeout = resolveAfter(lifetime, workflowContext, taskContext, input); + Duration timeout = + this.timeout + .map(t -> t.apply(workflowContext, taskContext, input)) + .orElse(Duration.ZERO); try { Integer exit = cb.awaitStatusCode(timeout.toMillis(), TimeUnit.MILLISECONDS); safeStop(id); - return exit != null ? exit : 0; + return exit; } catch (DockerClientException timeoutOrOther) { safeStop(id); } @@ -141,23 +152,9 @@ private int waitAccordingToLifetime( return 0; } - private Duration resolveAfter( - io.serverlessworkflow.api.types.ContainerLifetime lifetime, - WorkflowContext workflowContext, - TaskContext taskContext, - WorkflowModel input) { - - if (lifetime == null || lifetime.getAfter() == null) { - return Duration.ZERO; - } - WorkflowValueResolver r = - WorkflowUtils.fromTimeoutAfter(definition.application(), lifetime.getAfter()); - return r.apply(workflowContext, taskContext, input); - } - private boolean isRunning(String id) { try { - var st = dockerClient.inspectContainerCmd(id).exec().getState(); + var st = DockerClientHolder.dockerClient.inspectContainerCmd(id).exec().getState(); return st != null && Boolean.TRUE.equals(st.getRunning()); } catch (Exception e) { return false; // must be already removed @@ -167,7 +164,10 @@ private boolean isRunning(String id) { private void safeStop(String id) { if (isRunning(id)) { safeStop(id, Duration.ofSeconds(10)); - try (var cb2 = dockerClient.waitContainerCmd(id).exec(new WaitContainerResultCallback())) { + try (var cb2 = + DockerClientHolder.dockerClient + .waitContainerCmd(id) + .exec(new WaitContainerResultCallback())) { cb2.awaitStatusCode(); safeRemove(id); } catch (Exception ignore) { @@ -180,7 +180,10 @@ private void safeStop(String id) { private void safeStop(String id, Duration timeout) { try { - dockerClient.stopContainerCmd(id).withTimeout((int) Math.max(1, timeout.toSeconds())).exec(); + DockerClientHolder.dockerClient + .stopContainerCmd(id) + .withTimeout((int) Math.max(1, timeout.toSeconds())) + .exec(); } catch (Exception ignore) { // we can ignore this } @@ -189,13 +192,13 @@ private void safeStop(String id, Duration timeout) { // must be removed because of withAutoRemove(true), but just in case private void safeRemove(String id) { try { - dockerClient.removeContainerCmd(id).withForce(true).exec(); + DockerClientHolder.dockerClient.removeContainerCmd(id).withForce(true).exec(); } catch (Exception ignore) { // we can ignore this } } - private static Exception mapExitCode(int exit) { + private static RuntimeException mapExitCode(int exit) { return switch (exit) { case 1 -> failed("General error (exit code 1)"); case 2 -> failed("Shell syntax error (exit code 2)"); @@ -218,8 +221,12 @@ static ContainerRunnerBuilder builder() { } public static class ContainerRunnerBuilder { - private Container container = null; - private WorkflowDefinition workflowDefinition; + private Container container; + private WorkflowDefinition definition; + private WorkflowValueResolver timeout; + private ContainerCleanupPolicy policy; + private String containerImage; + private Collection propertySetters = new ArrayList<>(); private ContainerRunnerBuilder() {} @@ -229,28 +236,31 @@ ContainerRunnerBuilder withContainer(Container container) { } public ContainerRunnerBuilder withWorkflowDefinition(WorkflowDefinition definition) { - this.workflowDefinition = definition; + this.definition = definition; return this; } ContainerRunner build() { - if (container.getImage() == null || container.getImage().isEmpty()) { + propertySetters.add(new NamePropertySetter(definition, container)); + propertySetters.add(new CommandPropertySetter(definition, container)); + propertySetters.add(new ContainerEnvironmentPropertySetter(definition, container)); + propertySetters.add(new LifetimePropertySetter(container)); + propertySetters.add(new PortsPropertySetter(container)); + propertySetters.add(new VolumesPropertySetter(definition, container)); + + containerImage = container.getImage(); + if (containerImage == null || container.getImage().isBlank()) { throw new IllegalArgumentException("Container image must be provided"); } + ContainerLifetime lifetime = container.getLifetime(); + if (lifetime != null) { + policy = lifetime.getCleanup(); + TimeoutAfter afterTimeout = lifetime.getAfter(); + if (afterTimeout != null) + timeout = WorkflowUtils.fromTimeoutAfter(definition.application(), afterTimeout); + } - CreateContainerCmd createContainerCmd = dockerClient.createContainerCmd(container.getImage()); - - ContainerRunner runner = - new ContainerRunner(createContainerCmd, workflowDefinition, container); - - runner.propertySetters.add(new CommandPropertySetter(createContainerCmd, container)); - runner.propertySetters.add( - new ContainerEnvironmentPropertySetter(createContainerCmd, container)); - runner.propertySetters.add(new NamePropertySetter(createContainerCmd, container)); - runner.propertySetters.add(new PortsPropertySetter(createContainerCmd, container)); - runner.propertySetters.add(new VolumesPropertySetter(createContainerCmd, container)); - runner.propertySetters.add(new LifetimePropertySetter(createContainerCmd, container)); - return runner; + return new ContainerRunner(this); } } } diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/LifetimePropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/LifetimePropertySetter.java index 52454778..3606ae93 100644 --- a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/LifetimePropertySetter.java +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/LifetimePropertySetter.java @@ -19,26 +19,29 @@ import com.github.dockerjava.api.command.CreateContainerCmd; import io.serverlessworkflow.api.types.Container; -import io.serverlessworkflow.api.types.ContainerLifetime; -import java.util.function.Function; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; -class LifetimePropertySetter extends ContainerPropertySetter { +class LifetimePropertySetter implements ContainerPropertySetter { - LifetimePropertySetter(CreateContainerCmd createContainerCmd, Container configuration) { - super(createContainerCmd, configuration); + private final ContainerCleanupPolicy cleanupPolicy; + + LifetimePropertySetter(Container configuration) { + this.cleanupPolicy = + configuration.getLifetime() != null ? configuration.getLifetime().getCleanup() : null; } @Override - public void accept(Function resolver) { - // case of cleanup=eventually processed at ContainerRunner - if (configuration.getLifetime() != null) { - ContainerLifetime lifetime = configuration.getLifetime(); - ContainerCleanupPolicy cleanupPolicy = lifetime.getCleanup(); - if (cleanupPolicy.equals(ContainerCleanupPolicy.ALWAYS)) { - createContainerCmd.getHostConfig().withAutoRemove(true); - } else if (cleanupPolicy.equals(ContainerCleanupPolicy.NEVER)) { - createContainerCmd.getHostConfig().withAutoRemove(false); - } + public void accept( + CreateContainerCmd command, + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel model) { + if (ContainerCleanupPolicy.ALWAYS.equals(cleanupPolicy)) { + command.getHostConfig().withAutoRemove(true); + } else if (ContainerCleanupPolicy.NEVER.equals(cleanupPolicy)) { + command.getHostConfig().withAutoRemove(false); } } } diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/NamePropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/NamePropertySetter.java index d8c820c5..70bc8d83 100644 --- a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/NamePropertySetter.java +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/NamePropertySetter.java @@ -15,21 +15,38 @@ */ package io.serverlessworkflow.impl.container.executors; +import static io.serverlessworkflow.impl.WorkflowUtils.isValid; + import com.github.dockerjava.api.command.CreateContainerCmd; import io.serverlessworkflow.api.types.Container; -import java.util.function.Function; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.WorkflowValueResolver; +import java.util.Optional; + +class NamePropertySetter implements ContainerPropertySetter { -class NamePropertySetter extends ContainerPropertySetter { + private final Optional> containerName; - NamePropertySetter(CreateContainerCmd createContainerCmd, Container configuration) { - super(createContainerCmd, configuration); + NamePropertySetter(WorkflowDefinition definition, Container container) { + String containerName = container.getName(); + this.containerName = + isValid(containerName) + ? Optional.of(WorkflowUtils.buildStringFilter(definition.application(), containerName)) + : Optional.empty(); } @Override - public void accept(Function resolver) { - if (configuration.getName() != null && !configuration.getName().isEmpty()) { - String resolvedName = resolver.apply(configuration.getName()); - createContainerCmd.withName(resolvedName); - } + public void accept( + CreateContainerCmd createContainerCmd, + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel model) { + containerName + .map(c -> c.apply(workflowContext, taskContext, model)) + .ifPresent(createContainerCmd::withName); } } diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/PortsPropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/PortsPropertySetter.java index 882abfa6..b176b110 100644 --- a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/PortsPropertySetter.java +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/PortsPropertySetter.java @@ -19,34 +19,51 @@ import com.github.dockerjava.api.model.ExposedPort; import com.github.dockerjava.api.model.Ports; import io.serverlessworkflow.api.types.Container; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.function.Function; -class PortsPropertySetter extends ContainerPropertySetter { +class PortsPropertySetter implements ContainerPropertySetter { - PortsPropertySetter(CreateContainerCmd createContainerCmd, Container configuration) { - super(createContainerCmd, configuration); - } + private Ports portBindings = new Ports(); + private List exposed = new ArrayList<>(); - @Override - public void accept(Function resolver) { + PortsPropertySetter(Container configuration) { if (configuration.getPorts() != null && configuration.getPorts().getAdditionalProperties() != null) { - Ports portBindings = new Ports(); - List exposed = new ArrayList<>(); - for (Map.Entry entry : configuration.getPorts().getAdditionalProperties().entrySet()) { - int hostPort = Integer.parseInt(entry.getKey()); - int containerPort = Integer.parseInt(entry.getValue().toString()); - ExposedPort exposedPort = ExposedPort.tcp(containerPort); - portBindings.bind(exposedPort, Ports.Binding.bindPort(hostPort)); + ExposedPort exposedPort = ExposedPort.tcp(Integer.parseInt(entry.getKey())); exposed.add(exposedPort); + portBindings.bind(exposedPort, Ports.Binding.bindPort(from(entry.getValue()))); } - createContainerCmd.withExposedPorts(exposed.toArray(new ExposedPort[0])); - createContainerCmd.getHostConfig().withPortBindings(portBindings); + } + } + + @Override + public void accept( + CreateContainerCmd command, + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel model) { + command.withExposedPorts(exposed); + command.getHostConfig().withPortBindings(portBindings); + } + + private static Integer from(Object obj) { + if (obj instanceof Integer number) { + return number; + } else if (obj instanceof Number number) { + return number.intValue(); + } else if (obj instanceof String str) { + return Integer.parseInt(str); + } else if (obj != null) { + return Integer.parseInt(obj.toString()); + } else { + throw new IllegalArgumentException("Null value for port key"); } } } diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/StringExpressionResolver.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/StringExpressionResolver.java deleted file mode 100644 index e5848cee..00000000 --- a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/StringExpressionResolver.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.impl.container.executors; - -import io.serverlessworkflow.impl.TaskContext; -import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowUtils; -import io.serverlessworkflow.impl.expressions.ExpressionUtils; -import java.util.function.Function; - -class StringExpressionResolver implements Function { - - private final WorkflowContext workflowContext; - private final TaskContext taskContext; - private final WorkflowModel input; - - StringExpressionResolver( - WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { - this.workflowContext = workflowContext; - this.taskContext = taskContext; - this.input = input; - } - - public String apply(String expression) { - if (ExpressionUtils.isExpr(expression)) { - return WorkflowUtils.buildStringResolver( - workflowContext.definition().application(), - expression, - taskContext.input().asJavaObject()) - .apply(workflowContext, taskContext, input); - } - return expression; - } -} diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/VolumesPropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/VolumesPropertySetter.java index bb7215ca..63830f34 100644 --- a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/VolumesPropertySetter.java +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/VolumesPropertySetter.java @@ -19,34 +19,56 @@ import com.github.dockerjava.api.model.Bind; import com.github.dockerjava.api.model.Volume; import io.serverlessworkflow.api.types.Container; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.WorkflowValueResolver; import java.util.ArrayList; -import java.util.List; +import java.util.Collection; import java.util.Map; -import java.util.function.Function; +import java.util.Objects; -class VolumesPropertySetter extends ContainerPropertySetter { +class VolumesPropertySetter implements ContainerPropertySetter { - VolumesPropertySetter(CreateContainerCmd createContainerCmd, Container configuration) { - super(createContainerCmd, configuration); - } + private static record HostContainer( + WorkflowValueResolver host, WorkflowValueResolver container) {} - @Override - public void accept(Function resolver) { + private final Collection binds = new ArrayList<>(); + + VolumesPropertySetter(WorkflowDefinition definition, Container configuration) { if (configuration.getVolumes() != null && configuration.getVolumes().getAdditionalProperties() != null) { - List binds = new ArrayList<>(); for (Map.Entry entry : configuration.getVolumes().getAdditionalProperties().entrySet()) { - String hostPath = entry.getKey(); - if (entry.getValue() instanceof String containerPath) { - String resolvedHostPath = resolver.apply(hostPath); - String resolvedContainerPath = resolver.apply(containerPath); - binds.add(new Bind(resolvedHostPath, new Volume(resolvedContainerPath))); - } else { - throw new IllegalArgumentException("Volume container paths must be strings"); - } + binds.add( + new HostContainer( + WorkflowUtils.buildStringFilter(definition.application(), entry.getKey()), + WorkflowUtils.buildStringFilter( + definition.application(), + Objects.requireNonNull( + entry.getValue(), "Volume value must be a not null string") + .toString()))); } - createContainerCmd.getHostConfig().withBinds(binds); } } + + @Override + public void accept( + CreateContainerCmd command, + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel model) { + command + .getHostConfig() + .withBinds( + binds.stream() + .map( + r -> + new Bind( + r.host().apply(workflowContext, taskContext, model), + new Volume(r.container.apply(workflowContext, taskContext, model)))) + .toList()); + } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java index 7b6b3403..6fef5a85 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java @@ -59,6 +59,10 @@ public static Optional getSchemaValidator( return Optional.empty(); } + public static boolean isValid(String str) { + return str != null && !str.isBlank(); + } + public static Optional buildWorkflowFilter( WorkflowApplication app, InputFrom from) { return from != null diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/ContainerTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/ContainerTest.java index a30c3fab..ed5d803e 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/ContainerTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/ContainerTest.java @@ -38,29 +38,57 @@ import java.time.Duration; import java.util.Arrays; import java.util.Map; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledIf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +@DisabledIf("checkDocker") public class ContainerTest { - private static final DefaultDockerClientConfig DEFAULT_CONFIG = - DefaultDockerClientConfig.createDefaultConfigBuilder().build(); + private static DockerClient dockerClient; + private static Logger logger = LoggerFactory.getLogger(ContainerTest.class); - private static final DockerClient dockerClient = - DockerClientImpl.getInstance( - DEFAULT_CONFIG, - new ApacheDockerHttpClient.Builder().dockerHost(DEFAULT_CONFIG.getDockerHost()).build()); + { + DefaultDockerClientConfig defaultConfig = + DefaultDockerClientConfig.createDefaultConfigBuilder().build(); + dockerClient = + DockerClientImpl.getInstance( + defaultConfig, + new ApacheDockerHttpClient.Builder().dockerHost(defaultConfig.getDockerHost()).build()); + } + + @SuppressWarnings("unused") + private static boolean checkDocker() { + try { + dockerClient.pingCmd().exec(); + return false; + } catch (Exception ex) { + logger.warn("Docker is not running, disabling container test"); + return true; + } + } + + private static WorkflowApplication app; + + @BeforeAll + static void init() { + app = WorkflowApplication.builder().build(); + } + + @AfterAll + static void cleanup() throws IOException { + app.close(); + } @Test public void testContainer() throws IOException, InterruptedException { Workflow workflow = readWorkflowFromClasspath("workflows-samples/container/container-test-command.yaml"); - Map result; - try (WorkflowApplication app = WorkflowApplication.builder().build()) { - result = - app.workflowDefinition(workflow).instance(Map.of()).start().get().asMap().orElseThrow(); - } catch (Exception e) { - throw new RuntimeException("Workflow execution failed", e); - } + Map result = + app.workflowDefinition(workflow).instance(Map.of()).start().join().asMap().orElseThrow(); String containerName = "hello-world"; String containerId = findContainerIdByName(containerName); @@ -91,12 +119,8 @@ public void testContainerEnv() throws IOException, InterruptedException { Map input = Map.of("someValue", "Tested"); - Map result; - try (WorkflowApplication app = WorkflowApplication.builder().build()) { - result = app.workflowDefinition(workflow).instance(input).start().get().asMap().orElseThrow(); - } catch (Exception e) { - throw new RuntimeException("Workflow execution failed", e); - } + Map result = + app.workflowDefinition(workflow).instance(input).start().join().asMap().orElseThrow(); String containerName = "hello-world-envs"; ByteArrayOutputStream output = new ByteArrayOutputStream(); @@ -126,13 +150,8 @@ public void testContainerTimeout() throws IOException { Workflow workflow = readWorkflowFromClasspath("workflows-samples/container/container-timeout.yaml"); - Map result; - try (WorkflowApplication app = WorkflowApplication.builder().build()) { - result = - app.workflowDefinition(workflow).instance(Map.of()).start().get().asMap().orElseThrow(); - } catch (Exception e) { - throw new RuntimeException("Workflow execution failed", e); - } + Map result = + app.workflowDefinition(workflow).instance(Map.of()).start().join().asMap().orElseThrow(); String containerName = "hello-world-timeout"; String containerId = findContainerIdByName(containerName); @@ -146,13 +165,8 @@ public void testContainerCleanup() throws IOException { Workflow workflow = readWorkflowFromClasspath("workflows-samples/container/container-cleanup.yaml"); - Map result; - try (WorkflowApplication app = WorkflowApplication.builder().build()) { - result = - app.workflowDefinition(workflow).instance(Map.of()).start().get().asMap().orElseThrow(); - } catch (Exception e) { - throw new RuntimeException("Workflow execution failed", e); - } + Map result = + app.workflowDefinition(workflow).instance(Map.of()).start().join().asMap().orElseThrow(); String containerName = "hello-world-cleanup"; String containerId = findContainerIdByName(containerName); @@ -165,14 +179,8 @@ public void testContainerCleanupDefault() throws IOException { Workflow workflow = readWorkflowFromClasspath("workflows-samples/container/container-cleanup-default.yaml"); - Map result; - try (WorkflowApplication app = WorkflowApplication.builder().build()) { - result = - app.workflowDefinition(workflow).instance(Map.of()).start().get().asMap().orElseThrow(); - } catch (Exception e) { - throw new RuntimeException("Workflow execution failed", e); - } - + Map result = + app.workflowDefinition(workflow).instance(Map.of()).start().join().asMap().orElseThrow(); String containerName = "hello-world-cleanup-default"; String containerId = findContainerIdByName(containerName); assertFalse(isContainerGone(containerId)); @@ -188,17 +196,12 @@ void testPortBindings() throws Exception { new Thread( () -> { - try (WorkflowApplication app = WorkflowApplication.builder().build()) { - app.workflowDefinition(workflow) - .instance(Map.of()) - .start() - .get() - .asMap() - .orElseThrow(); - } catch (Exception e) { - // we can ignore exceptions here, as the workflow will end when the container is - // removed - } + app.workflowDefinition(workflow) + .instance(Map.of()) + .start() + .join() + .asMap() + .orElseThrow(); }) .start(); diff --git a/impl/test/src/test/resources/workflows-samples/container/container-cleanup-default.yaml b/impl/test/src/test/resources/workflows-samples/container/container-cleanup-default.yaml index 208b1faa..2a6cea6d 100644 --- a/impl/test/src/test/resources/workflows-samples/container/container-cleanup-default.yaml +++ b/impl/test/src/test/resources/workflows-samples/container/container-cleanup-default.yaml @@ -1,7 +1,7 @@ document: dsl: '1.0.2' namespace: test - name: run-example + name: container-cleanup-default version: '0.1.0' do: - runContainer: diff --git a/impl/test/src/test/resources/workflows-samples/container/container-cleanup.yaml b/impl/test/src/test/resources/workflows-samples/container/container-cleanup.yaml index b7501a75..968cc34c 100644 --- a/impl/test/src/test/resources/workflows-samples/container/container-cleanup.yaml +++ b/impl/test/src/test/resources/workflows-samples/container/container-cleanup.yaml @@ -1,7 +1,7 @@ document: dsl: '1.0.2' namespace: test - name: run-example + name: cointaner-cleanup version: '0.1.0' do: - runContainer: diff --git a/impl/test/src/test/resources/workflows-samples/container/container-env.yaml b/impl/test/src/test/resources/workflows-samples/container/container-env.yaml index 9876480b..71988b84 100644 --- a/impl/test/src/test/resources/workflows-samples/container/container-env.yaml +++ b/impl/test/src/test/resources/workflows-samples/container/container-env.yaml @@ -1,7 +1,7 @@ document: dsl: '1.0.2' namespace: test - name: run-example + name: container-env version: '0.1.0' do: - runContainer: diff --git a/impl/test/src/test/resources/workflows-samples/container/container-ports.yaml b/impl/test/src/test/resources/workflows-samples/container/container-ports.yaml index bc330610..0639e326 100644 --- a/impl/test/src/test/resources/workflows-samples/container/container-ports.yaml +++ b/impl/test/src/test/resources/workflows-samples/container/container-ports.yaml @@ -1,7 +1,7 @@ document: dsl: '1.0.2' namespace: test - name: run-example + name: container-ports version: '0.1.0' do: - runContainer: diff --git a/impl/test/src/test/resources/workflows-samples/container/container-test-command.yaml b/impl/test/src/test/resources/workflows-samples/container/container-test-command.yaml index 068137fc..874c9fbf 100644 --- a/impl/test/src/test/resources/workflows-samples/container/container-test-command.yaml +++ b/impl/test/src/test/resources/workflows-samples/container/container-test-command.yaml @@ -1,7 +1,7 @@ document: dsl: '1.0.2' namespace: test - name: run-example + name: container-test-command version: '0.1.0' do: - runContainer: diff --git a/impl/test/src/test/resources/workflows-samples/container/container-timeout.yaml b/impl/test/src/test/resources/workflows-samples/container/container-timeout.yaml index b06c0bb8..b8a3b5e0 100644 --- a/impl/test/src/test/resources/workflows-samples/container/container-timeout.yaml +++ b/impl/test/src/test/resources/workflows-samples/container/container-timeout.yaml @@ -1,7 +1,7 @@ document: dsl: '1.0.2' namespace: test - name: run-example + name: container-timeout version: '0.1.0' do: - runContainer: