Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.serverlessworkflow.api.types.func.CallJava;
import io.serverlessworkflow.api.types.func.CallTaskJava;
import io.serverlessworkflow.api.types.func.JavaContextFunction;
import io.serverlessworkflow.api.types.func.JavaFilterFunction;
import io.serverlessworkflow.fluent.func.spi.ConditionalTaskBuilder;
import io.serverlessworkflow.fluent.func.spi.FuncTaskTransformations;
import io.serverlessworkflow.fluent.spec.TaskBaseBuilder;
Expand Down Expand Up @@ -61,6 +62,16 @@ public <T, V> FuncCallTaskBuilder function(
return this;
}

public <T, V> FuncCallTaskBuilder function(JavaFilterFunction<T, V> function) {
return function(function, null);
}

public <T, V> FuncCallTaskBuilder function(JavaFilterFunction<T, V> function, Class<T> argClass) {
this.callTaskJava = new CallTaskJava(CallJava.function(function, argClass));
super.setTask(this.callTaskJava.getCallJava());
return this;
}

/** Accept a side-effect Consumer; engine should pass input through unchanged. */
public <T> FuncCallTaskBuilder consumer(Consumer<T> consumer) {
this.callTaskJava = new CallTaskJava(CallJava.consumer(consumer));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.serverlessworkflow.fluent.func.dsl;

import io.serverlessworkflow.api.types.func.JavaContextFunction;
import io.serverlessworkflow.api.types.func.JavaFilterFunction;
import io.serverlessworkflow.fluent.func.FuncCallTaskBuilder;
import io.serverlessworkflow.fluent.func.FuncTaskItemListBuilder;
import java.util.function.Consumer;
Expand All @@ -26,6 +27,7 @@ public final class FuncCallStep<T, R> extends Step<FuncCallStep<T, R>, FuncCallT
private final String name;
private final Function<T, R> fn;
private final JavaContextFunction<T, R> ctxFn;
private final JavaFilterFunction<T, R> filterFn;
private final Class<T> argClass;

/** Function<T,R> variant (unnamed). */
Expand All @@ -38,6 +40,7 @@ public final class FuncCallStep<T, R> extends Step<FuncCallStep<T, R>, FuncCallT
this.name = name;
this.fn = fn;
this.ctxFn = null;
this.filterFn = null;
this.argClass = argClass;
}

Expand All @@ -51,6 +54,21 @@ public final class FuncCallStep<T, R> extends Step<FuncCallStep<T, R>, FuncCallT
this.name = name;
this.fn = null;
this.ctxFn = ctxFn;
this.filterFn = null;
this.argClass = argClass;
}

/** JavaFilterFunction<T,R> variant (unnamed). */
FuncCallStep(JavaFilterFunction<T, R> filterFn, Class<T> argClass) {
this(null, filterFn, argClass);
}

/** JavaFilterFunction<T,R> variant (named). */
FuncCallStep(String name, JavaFilterFunction<T, R> filterFn, Class<T> argClass) {
this.name = name;
this.fn = null;
this.ctxFn = null;
this.filterFn = filterFn;
this.argClass = argClass;
}

Expand All @@ -60,6 +78,8 @@ protected void configure(FuncTaskItemListBuilder list, Consumer<FuncCallTaskBuil
cb -> {
if (ctxFn != null) {
cb.function(ctxFn, argClass);
} else if (filterFn != null) {
cb.function(filterFn, argClass);
} else {
cb.function(fn, argClass);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.cloudevents.CloudEventData;
import io.serverlessworkflow.api.types.FlowDirectiveEnum;
import io.serverlessworkflow.api.types.func.JavaContextFunction;
import io.serverlessworkflow.api.types.func.JavaFilterFunction;
import io.serverlessworkflow.fluent.func.FuncCallTaskBuilder;
import io.serverlessworkflow.fluent.func.FuncEmitTaskBuilder;
import io.serverlessworkflow.fluent.func.FuncSwitchTaskBuilder;
Expand All @@ -26,6 +27,7 @@
import io.serverlessworkflow.fluent.func.configurers.FuncTaskConfigurer;
import io.serverlessworkflow.fluent.func.configurers.SwitchCaseConfigurer;
import io.serverlessworkflow.fluent.func.dsl.internal.CommonFuncOps;
import io.serverlessworkflow.impl.WorkflowContextData;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -286,7 +288,7 @@ public static <T, R> FuncCallStep<T, R> function(Function<T, R> fn, Class<T> cla
}

/**
* Build a call step for functions that need {@code WorkflowContextData} as the first parameter.
* Build a call step for functions that need {@link WorkflowContextData} as the first parameter.
* The DSL wraps it as a {@link JavaContextFunction} and injects the runtime context.
*
* <p>Signature expected: {@code (ctx, payload) -> result}
Expand All @@ -297,7 +299,7 @@ public static <T, R> FuncCallStep<T, R> function(Function<T, R> fn, Class<T> cla
* @param <R> result type
* @return a call step
*/
public static <T, R> FuncCallStep<T, R> withContext(CtxBiFunction<T, R> fn, Class<T> in) {
public static <T, R> FuncCallStep<T, R> withContext(JavaContextFunction<T, R> fn, Class<T> in) {
return withContext(null, fn, in);
}

Expand All @@ -319,7 +321,7 @@ public static <T, R> FuncCallStep<T, R> withInstanceId(
}

/**
* Named variant of {@link #withContext(CtxBiFunction, Class)}.
* Named variant of {@link #withContext(JavaContextFunction, Class)}.
*
* @param name task name
* @param fn context-aware bi-function
Expand All @@ -329,9 +331,40 @@ public static <T, R> FuncCallStep<T, R> withInstanceId(
* @return a named call step
*/
public static <T, R> FuncCallStep<T, R> withContext(
String name, CtxBiFunction<T, R> fn, Class<T> in) {
JavaContextFunction<T, R> jcf = (payload, wctx) -> fn.apply(wctx, payload);
return new FuncCallStep<>(name, jcf, in);
String name, JavaContextFunction<T, R> fn, Class<T> in) {
return new FuncCallStep<>(name, fn, in);
}

/**
* Build a call step for functions that need {@link WorkflowContextData} and {@link
* io.serverlessworkflow.impl.TaskContextData} as the first and second parameter. The DSL wraps it
* as a {@link JavaFilterFunction} and injects the runtime context.
*
* <p>Signature expected: {@code (wctx, tctx, payload) -> result}
*
* @param fn context-aware bi-function
* @param in payload input class
* @param <T> input type
* @param <R> result type
* @return a call step
*/
public static <T, R> FuncCallStep<T, R> withFilter(JavaFilterFunction<T, R> fn, Class<T> in) {
return withFilter(null, fn, in);
}

/**
* Named variant of {@link #withFilter(JavaFilterFunction, Class)}.
*
* @param name task name
* @param fn context-aware bi-function
* @param in payload input class
* @param <T> input type
* @param <R> result type
* @return a named call step
*/
public static <T, R> FuncCallStep<T, R> withFilter(
String name, JavaFilterFunction<T, R> fn, Class<T> in) {
return new FuncCallStep<>(name, fn, in);
}

/**
Expand All @@ -350,6 +383,31 @@ public static <T, R> FuncCallStep<T, R> withInstanceId(
return new FuncCallStep<>(name, jcf, in);
}

/**
* Build a call step for functions that expect a composition with the workflow instance id and the
* task name as the first parameter. The instance ID is extracted from the runtime context, the
* task name from the definition.
*
* <p>Signature expected: {@code (uniqueId, payload) -> result}
*
* @param fn unique-id-aware bi-function
* @param in payload input class
* @param <T> input type
* @param <R> result type
* @return a call step
*/
public static <T, R> FuncCallStep<T, R> withUniqueId(
String name, UniqueIdBiFunction<T, R> fn, Class<T> in) {
JavaFilterFunction<T, R> jff =
(payload, wctx, tctx) ->
fn.apply(String.format("%s-%s", wctx.instanceData().id(), tctx.taskName()), payload);
return new FuncCallStep<>(name, jff, in);
}

public static <T, R> FuncCallStep<T, R> withUniqueId(UniqueIdBiFunction<T, R> fn, Class<T> in) {
return withUniqueId(null, fn, in);
}

/**
* Create a fire-and-forget side-effect step (unnamed). The consumer receives the typed input.
*
Expand Down Expand Up @@ -387,12 +445,12 @@ public static <T> ConsumeStep<T> consume(String name, Consumer<T> consumer, Clas
* @param <R> result type
* @return a call step
*/
public static <T, R> FuncCallStep<T, R> agent(InstanceIdBiFunction<T, R> fn, Class<T> in) {
return withInstanceId(fn, in);
public static <T, R> FuncCallStep<T, R> agent(UniqueIdBiFunction<T, R> fn, Class<T> in) {
return withUniqueId(fn, in);
}

/**
* Named agent-style sugar. See {@link #agent(InstanceIdBiFunction, Class)}.
* Named agent-style sugar. See {@link #agent(UniqueIdBiFunction, Class)}.
*
* @param name task name
* @param fn (instanceId, payload) -> result
Expand All @@ -402,8 +460,8 @@ public static <T, R> FuncCallStep<T, R> agent(InstanceIdBiFunction<T, R> fn, Cla
* @return a named call step
*/
public static <T, R> FuncCallStep<T, R> agent(
String name, InstanceIdBiFunction<T, R> fn, Class<T> in) {
return withInstanceId(name, fn, in);
String name, UniqueIdBiFunction<T, R> fn, Class<T> in) {
return withUniqueId(name, fn, in);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.fluent.func.dsl;

/**
* Functions that expect a unique ID injection in runtime, typically an idempotent generated unique
* id based on the workflow instance id and task name.
*
* @param <T> The task payload input
* @param <R> The task result output
*/
@FunctionalInterface
public interface UniqueIdBiFunction<T, R> {
R apply(String uniqueId, T object);
}
Loading