diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncCallTaskBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncCallTaskBuilder.java index 180804d0..58115e4e 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncCallTaskBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncCallTaskBuilder.java @@ -18,6 +18,7 @@ import io.serverlessworkflow.api.types.func.CallJava; import io.serverlessworkflow.api.types.func.CallTaskJava; import io.serverlessworkflow.api.types.func.JavaContextFunction; +import io.serverlessworkflow.api.types.func.JavaFilterFunction; import io.serverlessworkflow.fluent.func.spi.ConditionalTaskBuilder; import io.serverlessworkflow.fluent.func.spi.FuncTaskTransformations; import io.serverlessworkflow.fluent.spec.TaskBaseBuilder; @@ -61,6 +62,16 @@ public FuncCallTaskBuilder function( return this; } + public FuncCallTaskBuilder function(JavaFilterFunction function) { + return function(function, null); + } + + public FuncCallTaskBuilder function(JavaFilterFunction function, Class argClass) { + this.callTaskJava = new CallTaskJava(CallJava.function(function, argClass)); + super.setTask(this.callTaskJava.getCallJava()); + return this; + } + /** Accept a side-effect Consumer; engine should pass input through unchanged. */ public FuncCallTaskBuilder consumer(Consumer consumer) { this.callTaskJava = new CallTaskJava(CallJava.consumer(consumer)); diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncCallStep.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncCallStep.java index c784cebf..2e828e3b 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncCallStep.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncCallStep.java @@ -16,6 +16,7 @@ package io.serverlessworkflow.fluent.func.dsl; import io.serverlessworkflow.api.types.func.JavaContextFunction; +import io.serverlessworkflow.api.types.func.JavaFilterFunction; import io.serverlessworkflow.fluent.func.FuncCallTaskBuilder; import io.serverlessworkflow.fluent.func.FuncTaskItemListBuilder; import java.util.function.Consumer; @@ -26,6 +27,7 @@ public final class FuncCallStep extends Step, FuncCallT private final String name; private final Function fn; private final JavaContextFunction ctxFn; + private final JavaFilterFunction filterFn; private final Class argClass; /** Function variant (unnamed). */ @@ -38,6 +40,7 @@ public final class FuncCallStep extends Step, FuncCallT this.name = name; this.fn = fn; this.ctxFn = null; + this.filterFn = null; this.argClass = argClass; } @@ -51,6 +54,21 @@ public final class FuncCallStep extends Step, FuncCallT this.name = name; this.fn = null; this.ctxFn = ctxFn; + this.filterFn = null; + this.argClass = argClass; + } + + /** JavaFilterFunction variant (unnamed). */ + FuncCallStep(JavaFilterFunction filterFn, Class argClass) { + this(null, filterFn, argClass); + } + + /** JavaFilterFunction variant (named). */ + FuncCallStep(String name, JavaFilterFunction filterFn, Class argClass) { + this.name = name; + this.fn = null; + this.ctxFn = null; + this.filterFn = filterFn; this.argClass = argClass; } @@ -60,6 +78,8 @@ protected void configure(FuncTaskItemListBuilder list, Consumer { if (ctxFn != null) { cb.function(ctxFn, argClass); + } else if (filterFn != null) { + cb.function(filterFn, argClass); } else { cb.function(fn, argClass); } diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java index e6a0d917..cacf2bc6 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java @@ -18,6 +18,7 @@ import io.cloudevents.CloudEventData; import io.serverlessworkflow.api.types.FlowDirectiveEnum; import io.serverlessworkflow.api.types.func.JavaContextFunction; +import io.serverlessworkflow.api.types.func.JavaFilterFunction; import io.serverlessworkflow.fluent.func.FuncCallTaskBuilder; import io.serverlessworkflow.fluent.func.FuncEmitTaskBuilder; import io.serverlessworkflow.fluent.func.FuncSwitchTaskBuilder; @@ -26,6 +27,7 @@ import io.serverlessworkflow.fluent.func.configurers.FuncTaskConfigurer; import io.serverlessworkflow.fluent.func.configurers.SwitchCaseConfigurer; import io.serverlessworkflow.fluent.func.dsl.internal.CommonFuncOps; +import io.serverlessworkflow.impl.WorkflowContextData; import java.util.Collection; import java.util.List; import java.util.Map; @@ -286,7 +288,7 @@ public static FuncCallStep function(Function fn, Class cla } /** - * Build a call step for functions that need {@code WorkflowContextData} as the first parameter. + * Build a call step for functions that need {@link WorkflowContextData} as the first parameter. * The DSL wraps it as a {@link JavaContextFunction} and injects the runtime context. * *

Signature expected: {@code (ctx, payload) -> result} @@ -297,7 +299,7 @@ public static FuncCallStep function(Function fn, Class cla * @param result type * @return a call step */ - public static FuncCallStep withContext(CtxBiFunction fn, Class in) { + public static FuncCallStep withContext(JavaContextFunction fn, Class in) { return withContext(null, fn, in); } @@ -319,7 +321,7 @@ public static FuncCallStep withInstanceId( } /** - * Named variant of {@link #withContext(CtxBiFunction, Class)}. + * Named variant of {@link #withContext(JavaContextFunction, Class)}. * * @param name task name * @param fn context-aware bi-function @@ -329,9 +331,40 @@ public static FuncCallStep withInstanceId( * @return a named call step */ public static FuncCallStep withContext( - String name, CtxBiFunction fn, Class in) { - JavaContextFunction jcf = (payload, wctx) -> fn.apply(wctx, payload); - return new FuncCallStep<>(name, jcf, in); + String name, JavaContextFunction fn, Class in) { + return new FuncCallStep<>(name, fn, in); + } + + /** + * Build a call step for functions that need {@link WorkflowContextData} and {@link + * io.serverlessworkflow.impl.TaskContextData} as the first and second parameter. The DSL wraps it + * as a {@link JavaFilterFunction} and injects the runtime context. + * + *

Signature expected: {@code (wctx, tctx, payload) -> result} + * + * @param fn context-aware bi-function + * @param in payload input class + * @param input type + * @param result type + * @return a call step + */ + public static FuncCallStep withFilter(JavaFilterFunction fn, Class in) { + return withFilter(null, fn, in); + } + + /** + * Named variant of {@link #withFilter(JavaFilterFunction, Class)}. + * + * @param name task name + * @param fn context-aware bi-function + * @param in payload input class + * @param input type + * @param result type + * @return a named call step + */ + public static FuncCallStep withFilter( + String name, JavaFilterFunction fn, Class in) { + return new FuncCallStep<>(name, fn, in); } /** @@ -350,6 +383,31 @@ public static FuncCallStep withInstanceId( return new FuncCallStep<>(name, jcf, in); } + /** + * Build a call step for functions that expect a composition with the workflow instance id and the + * task name as the first parameter. The instance ID is extracted from the runtime context, the + * task name from the definition. + * + *

Signature expected: {@code (uniqueId, payload) -> result} + * + * @param fn unique-id-aware bi-function + * @param in payload input class + * @param input type + * @param result type + * @return a call step + */ + public static FuncCallStep withUniqueId( + String name, UniqueIdBiFunction fn, Class in) { + JavaFilterFunction jff = + (payload, wctx, tctx) -> + fn.apply(String.format("%s-%s", wctx.instanceData().id(), tctx.taskName()), payload); + return new FuncCallStep<>(name, jff, in); + } + + public static FuncCallStep withUniqueId(UniqueIdBiFunction fn, Class in) { + return withUniqueId(null, fn, in); + } + /** * Create a fire-and-forget side-effect step (unnamed). The consumer receives the typed input. * @@ -387,12 +445,12 @@ public static ConsumeStep consume(String name, Consumer consumer, Clas * @param result type * @return a call step */ - public static FuncCallStep agent(InstanceIdBiFunction fn, Class in) { - return withInstanceId(fn, in); + public static FuncCallStep agent(UniqueIdBiFunction fn, Class in) { + return withUniqueId(fn, in); } /** - * Named agent-style sugar. See {@link #agent(InstanceIdBiFunction, Class)}. + * Named agent-style sugar. See {@link #agent(UniqueIdBiFunction, Class)}. * * @param name task name * @param fn (instanceId, payload) -> result @@ -402,8 +460,8 @@ public static FuncCallStep agent(InstanceIdBiFunction fn, Cla * @return a named call step */ public static FuncCallStep agent( - String name, InstanceIdBiFunction fn, Class in) { - return withInstanceId(name, fn, in); + String name, UniqueIdBiFunction fn, Class in) { + return withUniqueId(name, fn, in); } /** diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/UniqueIdBiFunction.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/UniqueIdBiFunction.java new file mode 100644 index 00000000..5d630d40 --- /dev/null +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/UniqueIdBiFunction.java @@ -0,0 +1,28 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func.dsl; + +/** + * Functions that expect a unique ID injection in runtime, typically an idempotent generated unique + * id based on the workflow instance id and task name. + * + * @param The task payload input + * @param The task result output + */ +@FunctionalInterface +public interface UniqueIdBiFunction { + R apply(String uniqueId, T object); +} diff --git a/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLUniqueIdTest.java b/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLUniqueIdTest.java new file mode 100644 index 00000000..34cdd9ed --- /dev/null +++ b/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLUniqueIdTest.java @@ -0,0 +1,260 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func; + +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.agent; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.withUniqueId; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.fail; + +import io.serverlessworkflow.api.types.Task; +import io.serverlessworkflow.api.types.TaskBase; +import io.serverlessworkflow.api.types.TaskItem; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.api.types.func.CallJava; +import io.serverlessworkflow.api.types.func.JavaFilterFunction; +import io.serverlessworkflow.fluent.func.dsl.UniqueIdBiFunction; +import io.serverlessworkflow.impl.TaskContextData; +import io.serverlessworkflow.impl.WorkflowContextData; +import io.serverlessworkflow.impl.WorkflowDefinitionData; +import io.serverlessworkflow.impl.WorkflowInstanceData; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowPosition; +import io.serverlessworkflow.impl.WorkflowStatus; +import java.time.Instant; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +/** + * Verifies that withUniqueId/agent wrap the user's function so that, at runtime, the first argument + * is a "unique id" composed as instanceId + "-" + taskName. + */ +class FuncDSLUniqueIdTest { + + @SuppressWarnings("unchecked") + private static JavaFilterFunction extractJavaFilterFunction(CallJava callJava) { + if (callJava instanceof CallJava.CallJavaFilterFunction f) { + return (JavaFilterFunction) f.function(); + } + fail("CallTask is not a CallJavaFilterFunction; DSL contract may have changed."); + return null; // unreachable + } + + @Test + @DisplayName("withUniqueId(name, fn, in) composes uniqueId = instanceId-taskName and passes it") + void withUniqueId_named_composes_and_passes_unique_id() throws Exception { + AtomicReference receivedUniqueId = new AtomicReference<>(); + AtomicReference receivedPayload = new AtomicReference<>(); + + // (uniqueId, payload) -> result; we capture inputs for assertion + UniqueIdBiFunction fn = + (uniqueId, payload) -> { + receivedUniqueId.set(uniqueId); + receivedPayload.set(payload); + return payload.toUpperCase(); + }; + + Workflow wf = + FuncWorkflowBuilder.workflow("wf-unique-named") + .tasks( + // important: NAME is provided → should appear in the uniqueId + withUniqueId("notify", fn, String.class)) + .build(); + + List items = wf.getDo(); + assertEquals(1, items.size(), "one task expected"); + Task t = items.get(0).getTask(); + assertNotNull(t.getCallTask(), "CallTask expected"); + + String taskName = items.get(0).getName(); + assertEquals("notify", taskName, "task name should be set on the step"); + + CallJava cj = (CallJava) t.getCallTask().get(); + var jff = extractJavaFilterFunction(cj); + assertNotNull(jff, "JavaFilterFunction must be present for withUniqueId"); + + // Invoke the wrapped function "as runtime" with fake contexts + var wctx = new FakeWorkflowContextData("inst-123"); + var tctx = new FakeTaskContextData(taskName); + + // The JavaFilterFunction signature in your impl is (payload, wctx, tctx) -> result + Object result = jff.apply("hello", wctx, tctx); + + assertEquals("inst-123-notify", receivedUniqueId.get(), "uniqueId must be instanceId-taskName"); + assertEquals( + "hello", receivedPayload.get(), "payload should be forwarded to the user function"); + assertEquals("HELLO", result, "wrapped function result should be returned"); + } + + @Test + @DisplayName("agent(fn, in) is sugar for withUniqueId(fn, in) and passes instanceId-taskName") + void agent_unnamed_composes_and_passes_unique_id() throws Exception { + AtomicReference receivedUniqueId = new AtomicReference<>(); + AtomicReference receivedPayload = new AtomicReference<>(); + + UniqueIdBiFunction fn = + (uniqueId, payload) -> { + receivedUniqueId.set(uniqueId); + receivedPayload.set(payload); + return payload + 1; + }; + + Workflow wf = + FuncWorkflowBuilder.workflow("wf-agent") + .tasks( + // No explicit name here; builder should still set a task name, + // which participates in the uniqueId (instanceId-taskName) + agent(fn, Integer.class)) + .build(); + + List items = wf.getDo(); + assertEquals(1, items.size(), "one task expected"); + Task t = items.get(0).getTask(); + assertNotNull(t.getCallTask(), "CallTask expected"); + String taskName = items.get(0).getName(); + assertNotNull(taskName, "task name should be assigned even if not explicitly provided"); + + CallJava cj = (CallJava) t.getCallTask().get(); + var jff = extractJavaFilterFunction(cj); + assertNotNull(jff, "JavaFilterFunction must be present for agent/withUniqueId"); + + WorkflowContextData wctx = new FakeWorkflowContextData("wf-999"); + TaskContextData tctx = new FakeTaskContextData(taskName); + + Object result = jff.apply(41, wctx, tctx); + + assertEquals( + "wf-999-" + taskName, + receivedUniqueId.get(), + "agent should compose uniqueId as instanceId-taskName"); + assertEquals(41, receivedPayload.get(), "payload should be forwarded to the user function"); + assertEquals(42, result, "wrapped function result should be returned"); + } + + /** + * Minimal test doubles to satisfy the JavaFilterFunction call path. We only implement the members + * used by the DSL composition: - wctx.instanceData().id() - tctx.taskName() + */ + static final class FakeWorkflowContextData implements WorkflowContextData { + private final String id; + + FakeWorkflowContextData(String id) { + this.id = id; + } + + @Override + public WorkflowInstanceData instanceData() { + // Provide just the id() accessor + return new WorkflowInstanceData() { + @Override + public String id() { + return id; + } + + @Override + public Instant startedAt() { + return null; + } + + @Override + public Instant completedAt() { + return null; + } + + @Override + public WorkflowModel input() { + return null; + } + + @Override + public WorkflowStatus status() { + return null; + } + + @Override + public WorkflowModel output() { + return null; + } + + @Override + public WorkflowModel context() { + return null; + } + + @Override + public T outputAs(Class clazz) { + return null; + } + }; + } + + @Override + public WorkflowModel context() { + return null; + } + + @Override + public WorkflowDefinitionData definition() { + return null; + } + } + + record FakeTaskContextData(String taskName) implements TaskContextData { + + @Override + public WorkflowModel input() { + return null; + } + + @Override + public WorkflowModel rawInput() { + return null; + } + + @Override + public TaskBase task() { + return null; + } + + @Override + public WorkflowModel rawOutput() { + return null; + } + + @Override + public WorkflowModel output() { + return null; + } + + @Override + public WorkflowPosition position() { + return null; + } + + @Override + public Instant startedAt() { + return null; + } + + @Override + public Instant completedAt() { + return null; + } + } +} diff --git a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/CallJava.java b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/CallJava.java index 971e3a9c..a9699b6b 100644 --- a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/CallJava.java +++ b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/CallJava.java @@ -54,6 +54,10 @@ public static CallJava function(JavaContextFunction function, Class return new CallJavaContextFunction<>(function, Optional.ofNullable(inputClass)); } + public static CallJava function(JavaFilterFunction function, Class inputClass) { + return new CallJavaFilterFunction<>(function, Optional.ofNullable(inputClass)); + } + public static class CallJavaConsumer extends CallJava { private static final long serialVersionUID = 1L; private final Consumer consumer; @@ -95,8 +99,8 @@ public Optional> inputClass() { public static class CallJavaContextFunction extends CallJava { private static final long serialVersionUID = 1L; - private JavaContextFunction function; - private Optional> inputClass; + private final JavaContextFunction function; + private final Optional> inputClass; public CallJavaContextFunction( JavaContextFunction function, Optional> inputClass) { @@ -113,6 +117,26 @@ public Optional> inputClass() { } } + public static class CallJavaFilterFunction extends CallJava { + private static final long serialVersionUID = 1L; + private final JavaFilterFunction function; + private final Optional> inputClass; + + public CallJavaFilterFunction( + JavaFilterFunction function, Optional> inputClass) { + this.function = function; + this.inputClass = inputClass; + } + + public JavaFilterFunction function() { + return function; + } + + public Optional> inputClass() { + return inputClass; + } + } + public static class CallJavaLoopFunction extends CallJava { private static final long serialVersionUID = 1L;