diff --git a/impl/core/pom.xml b/impl/core/pom.xml
index 0ee45fe9..6fb99f6d 100644
--- a/impl/core/pom.xml
+++ b/impl/core/pom.xml
@@ -24,5 +24,9 @@
de.huxhorn.sulky
de.huxhorn.sulky.ulid
+
+ com.cronutils
+ cron-utils
+
diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/SchedulerListener.java b/impl/core/src/main/java/io/serverlessworkflow/impl/SchedulerListener.java
new file mode 100644
index 00000000..2aa57694
--- /dev/null
+++ b/impl/core/src/main/java/io/serverlessworkflow/impl/SchedulerListener.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl;
+
+import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent;
+import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener;
+import io.serverlessworkflow.impl.scheduler.Cancellable;
+import io.serverlessworkflow.impl.scheduler.WorkflowScheduler;
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+class SchedulerListener implements WorkflowExecutionListener, AutoCloseable {
+
+ private final WorkflowScheduler scheduler;
+ private final Map> afterMap =
+ new ConcurrentHashMap<>();
+ private final Map cancellableMap = new ConcurrentHashMap<>();
+
+ public SchedulerListener(WorkflowScheduler scheduler) {
+ this.scheduler = scheduler;
+ }
+
+ public void addAfter(WorkflowDefinition definition, WorkflowValueResolver after) {
+ afterMap.put(definition, after);
+ }
+
+ @Override
+ public void onWorkflowCompleted(WorkflowCompletedEvent ev) {
+ WorkflowDefinition workflowDefinition = (WorkflowDefinition) ev.workflowContext().definition();
+ WorkflowValueResolver after = afterMap.get(workflowDefinition);
+ if (after != null) {
+ cancellableMap.put(
+ workflowDefinition,
+ scheduler.scheduleAfter(
+ workflowDefinition,
+ after.apply((WorkflowContext) ev.workflowContext(), null, ev.output())));
+ }
+ }
+
+ public void removeAfter(WorkflowDefinition definition) {
+ if (afterMap.remove(definition) != null) {
+ Cancellable cancellable = cancellableMap.remove(definition);
+ if (cancellable != null) {
+ cancellable.cancel();
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ cancellableMap.values().forEach(c -> c.cancel());
+ cancellableMap.clear();
+ afterMap.clear();
+ }
+}
diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java
index 9865f72b..413bfbe8 100644
--- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java
+++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java
@@ -36,6 +36,7 @@
import io.serverlessworkflow.impl.resources.ExternalResourceHandler;
import io.serverlessworkflow.impl.resources.ResourceLoaderFactory;
import io.serverlessworkflow.impl.scheduler.DefaultWorkflowScheduler;
+import io.serverlessworkflow.impl.scheduler.WorkflowScheduler;
import io.serverlessworkflow.impl.schema.SchemaValidator;
import io.serverlessworkflow.impl.schema.SchemaValidatorFactory;
import java.util.ArrayList;
@@ -71,6 +72,7 @@ public class WorkflowApplication implements AutoCloseable {
private final Map> additionalObjects;
private final ConfigManager configManager;
private final SecretManager secretManager;
+ private final SchedulerListener schedulerListener;
private WorkflowApplication(Builder builder) {
this.taskFactory = builder.taskFactory;
@@ -81,13 +83,14 @@ private WorkflowApplication(Builder builder) {
this.idFactory = builder.idFactory;
this.runtimeDescriptorFactory = builder.descriptorFactory;
this.executorFactory = builder.executorFactory;
- this.listeners = builder.listeners != null ? builder.listeners : Collections.emptySet();
+ this.listeners = builder.listeners;
this.definitions = new ConcurrentHashMap<>();
this.eventConsumer = builder.eventConsumer;
this.eventPublishers = builder.eventPublishers;
this.lifeCycleCEPublishingEnabled = builder.lifeCycleCEPublishingEnabled;
this.modelFactory = builder.modelFactory;
this.scheduler = builder.scheduler;
+ this.schedulerListener = builder.schedulerListener;
this.additionalObjects = builder.additionalObjects;
this.configManager = builder.configManager;
this.secretManager = builder.secretManager;
@@ -169,6 +172,7 @@ public SchemaValidator getValidator(SchemaInline inline) {
private Map> additionalObjects;
private SecretManager secretManager;
private ConfigManager configManager;
+ private SchedulerListener schedulerListener;
private Builder() {}
@@ -304,6 +308,8 @@ public WorkflowApplication build() {
if (scheduler == null) {
scheduler = new DefaultWorkflowScheduler();
}
+ schedulerListener = new SchedulerListener(scheduler);
+ listeners.add(schedulerListener);
if (additionalObjects == null) {
additionalObjects = Collections.emptyMap();
}
@@ -388,6 +394,10 @@ public SecretManager secretManager() {
return secretManager;
}
+ SchedulerListener schedulerListener() {
+ return schedulerListener;
+ }
+
public Optional additionalObject(
String name, WorkflowContext workflowContext, TaskContext taskContext) {
return Optional.ofNullable(additionalObjects.get(name))
diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java
index 41c5c2ee..6fe8856a 100644
--- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java
+++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java
@@ -20,7 +20,6 @@
import static io.serverlessworkflow.impl.WorkflowUtils.safeClose;
import io.serverlessworkflow.api.types.Input;
-import io.serverlessworkflow.api.types.ListenTo;
import io.serverlessworkflow.api.types.Output;
import io.serverlessworkflow.api.types.Schedule;
import io.serverlessworkflow.api.types.Workflow;
@@ -28,16 +27,20 @@
import io.serverlessworkflow.impl.executors.TaskExecutor;
import io.serverlessworkflow.impl.executors.TaskExecutorHelper;
import io.serverlessworkflow.impl.resources.ResourceLoader;
+import io.serverlessworkflow.impl.scheduler.Cancellable;
import io.serverlessworkflow.impl.scheduler.ScheduledEventConsumer;
+import io.serverlessworkflow.impl.scheduler.WorkflowScheduler;
import io.serverlessworkflow.impl.schema.SchemaValidator;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData {
private final Workflow workflow;
+ private final WorkflowDefinitionId definitionId;
private Optional inputSchemaValidator = Optional.empty();
private Optional outputSchemaValidator = Optional.empty();
private Optional inputFilter = Optional.empty();
@@ -47,10 +50,13 @@ public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData
private final ResourceLoader resourceLoader;
private final Map> executors = new HashMap<>();
private ScheduledEventConsumer scheculedConsumer;
+ private Cancellable everySchedule;
+ private Cancellable cronSchedule;
private WorkflowDefinition(
WorkflowApplication application, Workflow workflow, ResourceLoader resourceLoader) {
this.workflow = workflow;
+ this.definitionId = WorkflowDefinitionId.of(workflow);
this.application = application;
this.resourceLoader = resourceLoader;
@@ -84,15 +90,28 @@ static WorkflowDefinition of(WorkflowApplication application, Workflow workflow,
application.resourceLoaderFactory().getResourceLoader(application, path));
Schedule schedule = workflow.getSchedule();
if (schedule != null) {
- ListenTo to = schedule.getOn();
- if (to != null) {
+ WorkflowScheduler scheduler = application.scheduler();
+ if (schedule.getOn() != null) {
definition.scheculedConsumer =
- application
- .scheduler()
- .eventConsumer(
- definition,
- application.modelFactory()::from,
- EventRegistrationBuilderInfo.from(application, to, x -> null));
+ scheduler.eventConsumer(
+ definition,
+ application.modelFactory()::from,
+ EventRegistrationBuilderInfo.from(application, schedule.getOn(), x -> null));
+ }
+ if (schedule.getAfter() != null) {
+ application
+ .schedulerListener()
+ .addAfter(definition, WorkflowUtils.fromTimeoutAfter(application, schedule.getAfter()));
+ }
+ if (schedule.getCron() != null) {
+ definition.cronSchedule = scheduler.scheduleCron(definition, schedule.getCron());
+ }
+ if (schedule.getEvery() != null) {
+ definition.everySchedule =
+ scheduler.scheduleEvery(
+ definition,
+ WorkflowUtils.fromTimeoutAfter(application, schedule.getEvery())
+ .apply(null, null, application.modelFactory().fromNull()));
}
}
return definition;
@@ -148,7 +167,28 @@ public void addTaskExecutor(WorkflowMutablePosition position, TaskExecutor> ta
@Override
public void close() {
- safeClose(scheculedConsumer);
safeClose(resourceLoader);
+ safeClose(scheculedConsumer);
+ application.schedulerListener().removeAfter(this);
+ if (everySchedule != null) {
+ everySchedule.cancel();
+ }
+ if (cronSchedule != null) {
+ cronSchedule.cancel();
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(definitionId);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null) return false;
+ if (getClass() != obj.getClass()) return false;
+ WorkflowDefinition other = (WorkflowDefinition) obj;
+ return Objects.equals(definitionId, other.definitionId);
}
}
diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/Cancellable.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/Cancellable.java
new file mode 100644
index 00000000..fef5b973
--- /dev/null
+++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/Cancellable.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.scheduler;
+
+public interface Cancellable {
+ void cancel();
+}
diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/CronResolver.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/CronResolver.java
new file mode 100644
index 00000000..df254895
--- /dev/null
+++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/CronResolver.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.scheduler;
+
+import java.time.Duration;
+import java.util.Optional;
+
+public interface CronResolver {
+ Optional nextExecution();
+}
diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/CronResolverFactory.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/CronResolverFactory.java
new file mode 100644
index 00000000..4c67f062
--- /dev/null
+++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/CronResolverFactory.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.scheduler;
+
+public interface CronResolverFactory {
+ CronResolver parseCron(String cron);
+}
diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/CronUtilsResolver.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/CronUtilsResolver.java
new file mode 100644
index 00000000..fcb7086b
--- /dev/null
+++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/CronUtilsResolver.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.scheduler;
+
+import com.cronutils.model.Cron;
+import com.cronutils.model.time.ExecutionTime;
+import java.time.Duration;
+import java.time.ZonedDateTime;
+import java.util.Optional;
+
+class CronUtilsResolver implements CronResolver {
+
+ private final ExecutionTime executionTime;
+
+ public CronUtilsResolver(Cron cron) {
+ this.executionTime = ExecutionTime.forCron(cron);
+ }
+
+ @Override
+ public Optional nextExecution() {
+ return executionTime.timeToNextExecution(ZonedDateTime.now());
+ }
+}
diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/CronUtilsResolverFactory.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/CronUtilsResolverFactory.java
new file mode 100644
index 00000000..3facca0e
--- /dev/null
+++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/CronUtilsResolverFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.scheduler;
+
+import com.cronutils.model.CronType;
+import com.cronutils.model.definition.CronDefinitionBuilder;
+import com.cronutils.parser.CronParser;
+
+class CronUtilsResolverFactory implements CronResolverFactory {
+
+ private final CronParser cronParser;
+
+ public CronUtilsResolverFactory() {
+ this(CronType.UNIX);
+ }
+
+ public CronUtilsResolverFactory(CronType type) {
+ this.cronParser = new CronParser(CronDefinitionBuilder.instanceDefinitionFor(type));
+ }
+
+ @Override
+ public CronResolver parseCron(String cron) {
+ return new CronUtilsResolver(cronParser.parse(cron));
+ }
+}
diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/DefaultWorkflowScheduler.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/DefaultWorkflowScheduler.java
index 5e3338e3..338fbca7 100644
--- a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/DefaultWorkflowScheduler.java
+++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/DefaultWorkflowScheduler.java
@@ -19,20 +19,39 @@
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowInstance;
import io.serverlessworkflow.impl.WorkflowModel;
-import io.serverlessworkflow.impl.WorkflowScheduler;
import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
public class DefaultWorkflowScheduler implements WorkflowScheduler {
- private Map> instances =
+ private final Map> instances =
new ConcurrentHashMap<>();
+ private final ScheduledExecutorService service;
+ private final CronResolverFactory cronFactory;
+
+ public DefaultWorkflowScheduler() {
+ this(Executors.newSingleThreadScheduledExecutor(), new CronUtilsResolverFactory());
+ }
+
+ public DefaultWorkflowScheduler(
+ ScheduledExecutorService service, CronResolverFactory cronFactory) {
+ this.service = service;
+ this.cronFactory = cronFactory;
+ }
+
@Override
public Collection scheduledInstances(WorkflowDefinition definition) {
return Collections.unmodifiableCollection(theInstances(definition));
@@ -43,15 +62,93 @@ public ScheduledEventConsumer eventConsumer(
WorkflowDefinition definition,
Function converter,
EventRegistrationBuilderInfo builderInfo) {
- return new ScheduledEventConsumer(definition, converter, builderInfo) {
- @Override
- protected void addScheduledInstance(WorkflowInstance instance) {
- theInstances(definition).add(instance);
- }
- };
+ return new ScheduledEventConsumer(
+ definition, converter, builderInfo, new DefaultScheduledInstanceRunner(definition));
+ }
+
+ @Override
+ public Cancellable scheduleAfter(WorkflowDefinition definition, Duration delay) {
+ return new ScheduledServiceCancellable(
+ service.schedule(
+ new DefaultScheduledInstanceRunner(definition),
+ delay.toMillis(),
+ TimeUnit.MILLISECONDS));
+ }
+
+ @Override
+ public Cancellable scheduleEvery(WorkflowDefinition definition, Duration interval) {
+ long delay = interval.toMillis();
+ return new ScheduledServiceCancellable(
+ service.scheduleAtFixedRate(
+ new DefaultScheduledInstanceRunner(definition), delay, delay, TimeUnit.MILLISECONDS));
+ }
+
+ @Override
+ public Cancellable scheduleCron(WorkflowDefinition definition, String cron) {
+ return new CronResolverCancellable(definition, cronFactory.parseCron(cron));
}
private Collection theInstances(WorkflowDefinition definition) {
return instances.computeIfAbsent(definition, def -> new ArrayList<>());
}
+
+ private class CronResolverCancellable implements Cancellable {
+ private final WorkflowDefinition definition;
+ private final CronResolver cronResolver;
+
+ private AtomicReference> nextCron = new AtomicReference<>();
+ private AtomicBoolean cancelled = new AtomicBoolean();
+
+ public CronResolverCancellable(WorkflowDefinition definition, CronResolver cronResolver) {
+ this.definition = definition;
+ this.cronResolver = cronResolver;
+ scheduleNext();
+ }
+
+ private void scheduleNext() {
+ cronResolver
+ .nextExecution()
+ .ifPresent(
+ d ->
+ nextCron.set(
+ service.schedule(
+ new CronResolverIntanceRunner(definition),
+ d.toMillis(),
+ TimeUnit.MILLISECONDS)));
+ }
+
+ @Override
+ public void cancel() {
+ cancelled.set(true);
+ ScheduledFuture> toBeCancel = nextCron.get();
+ if (toBeCancel != null) {
+ toBeCancel.cancel(true);
+ }
+ }
+
+ private class CronResolverIntanceRunner extends DefaultScheduledInstanceRunner {
+ protected CronResolverIntanceRunner(WorkflowDefinition definition) {
+ super(definition);
+ }
+
+ @Override
+ public void accept(WorkflowModel model) {
+ if (!cancelled.get()) {
+ scheduleNext();
+ super.accept(model);
+ }
+ }
+ }
+ }
+
+ private class DefaultScheduledInstanceRunner extends ScheduledInstanceRunnable {
+ protected DefaultScheduledInstanceRunner(WorkflowDefinition definition) {
+ super(definition);
+ }
+
+ @Override
+ protected void addScheduledInstance(WorkflowInstance instance) {
+ theInstances(definition).add(instance);
+ }
+ }
}
diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java
index 34746355..76e6ccb9 100644
--- a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java
+++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java
@@ -17,7 +17,6 @@
import io.cloudevents.CloudEvent;
import io.serverlessworkflow.impl.WorkflowDefinition;
-import io.serverlessworkflow.impl.WorkflowInstance;
import io.serverlessworkflow.impl.WorkflowModel;
import io.serverlessworkflow.impl.WorkflowModelCollection;
import io.serverlessworkflow.impl.events.EventConsumer;
@@ -31,22 +30,25 @@
import java.util.Map;
import java.util.function.Function;
-public abstract class ScheduledEventConsumer implements AutoCloseable {
+public class ScheduledEventConsumer implements AutoCloseable {
private final Function converter;
private final WorkflowDefinition definition;
private final EventRegistrationBuilderInfo builderInfo;
private final EventConsumer eventConsumer;
+ private final ScheduledInstanceRunnable instanceRunner;
private Map> correlatedEvents;
private Collection registrations = new ArrayList<>();
protected ScheduledEventConsumer(
WorkflowDefinition definition,
Function converter,
- EventRegistrationBuilderInfo builderInfo) {
+ EventRegistrationBuilderInfo builderInfo,
+ ScheduledInstanceRunnable instanceRunner) {
this.definition = definition;
this.converter = converter;
this.builderInfo = builderInfo;
+ this.instanceRunner = instanceRunner;
this.eventConsumer = definition.application().eventConsumer();
if (builderInfo.registrations().isAnd()
&& builderInfo.registrations().registrations().size() > 1) {
@@ -100,19 +102,13 @@ private boolean satisfyCondition() {
protected void start(CloudEvent ce) {
WorkflowModelCollection model = definition.application().modelFactory().createCollection();
model.add(converter.apply(ce));
- start(model);
+ instanceRunner.accept(model);
}
protected void start(Collection ces) {
WorkflowModelCollection model = definition.application().modelFactory().createCollection();
ces.forEach(ce -> model.add(converter.apply(ce)));
- start(model);
- }
-
- private void start(WorkflowModel model) {
- WorkflowInstance instance = definition.instance(model);
- addScheduledInstance(instance);
- instance.start();
+ instanceRunner.accept(model);
}
public void close() {
@@ -121,6 +117,4 @@ public void close() {
}
registrations.forEach(eventConsumer::unregister);
}
-
- protected abstract void addScheduledInstance(WorkflowInstance instace);
}
diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledInstanceRunnable.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledInstanceRunnable.java
new file mode 100644
index 00000000..cdb98e58
--- /dev/null
+++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledInstanceRunnable.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.scheduler;
+
+import io.serverlessworkflow.impl.WorkflowDefinition;
+import io.serverlessworkflow.impl.WorkflowInstance;
+import io.serverlessworkflow.impl.WorkflowModel;
+import java.util.function.Consumer;
+
+public abstract class ScheduledInstanceRunnable implements Runnable, Consumer {
+
+ protected final WorkflowDefinition definition;
+
+ protected ScheduledInstanceRunnable(WorkflowDefinition definition) {
+ this.definition = definition;
+ }
+
+ @Override
+ public void run() {
+ accept(definition.application().modelFactory().fromNull());
+ }
+
+ @Override
+ public void accept(WorkflowModel model) {
+ WorkflowInstance instance = definition.instance(model);
+ addScheduledInstance(instance);
+ definition.application().executorService().execute(() -> instance.start());
+ }
+
+ protected abstract void addScheduledInstance(WorkflowInstance instance);
+}
diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledServiceCancellable.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledServiceCancellable.java
new file mode 100644
index 00000000..2341f910
--- /dev/null
+++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledServiceCancellable.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.scheduler;
+
+import java.util.concurrent.ScheduledFuture;
+
+class ScheduledServiceCancellable implements Cancellable {
+
+ private final ScheduledFuture> cancellable;
+
+ public ScheduledServiceCancellable(ScheduledFuture> cancellable) {
+ this.cancellable = cancellable;
+ }
+
+ @Override
+ public void cancel() {
+ cancellable.cancel(true);
+ }
+}
diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowScheduler.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/WorkflowScheduler.java
similarity index 54%
rename from impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowScheduler.java
rename to impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/WorkflowScheduler.java
index 84f5b913..27059f1a 100644
--- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowScheduler.java
+++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/WorkflowScheduler.java
@@ -13,11 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package io.serverlessworkflow.impl;
+package io.serverlessworkflow.impl.scheduler;
import io.cloudevents.CloudEvent;
+import io.serverlessworkflow.impl.WorkflowDefinition;
+import io.serverlessworkflow.impl.WorkflowInstance;
+import io.serverlessworkflow.impl.WorkflowModel;
import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo;
-import io.serverlessworkflow.impl.scheduler.ScheduledEventConsumer;
+import java.time.Duration;
import java.util.Collection;
import java.util.function.Function;
@@ -28,4 +31,19 @@ ScheduledEventConsumer eventConsumer(
WorkflowDefinition definition,
Function converter,
EventRegistrationBuilderInfo info);
+
+ /**
+ * Periodically instantiate a workflow instance from the given definition at the given interval.
+ * It continue creating workflow instances till cancelled.
+ */
+ Cancellable scheduleEvery(WorkflowDefinition definition, Duration interval);
+
+ /** Creates one workflow instance after the specified delay. */
+ Cancellable scheduleAfter(WorkflowDefinition definition, Duration delay);
+
+ /**
+ * Creates one or more workflow instances according to the specified Cron expression. It continue
+ * creating workflow instances till the Cron expression indicates so or it is cancelled
+ */
+ Cancellable scheduleCron(WorkflowDefinition definition, String cron);
}
diff --git a/impl/pom.xml b/impl/pom.xml
index 00d900f9..e4698812 100644
--- a/impl/pom.xml
+++ b/impl/pom.xml
@@ -14,6 +14,7 @@
4.0.0
1.6.0
3.1.11
+ 9.2.1
@@ -124,6 +125,11 @@
h2-mvstore
${version.com.h2database}
+
+ com.cronutils
+ cron-utils
+ ${version.com.cronutils}
+
diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/SchedulerTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/SchedulerTest.java
new file mode 100644
index 00000000..43c19644
--- /dev/null
+++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/SchedulerTest.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.test;
+
+import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+import io.serverlessworkflow.impl.WorkflowApplication;
+import io.serverlessworkflow.impl.WorkflowDefinition;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+class SchedulerTest {
+
+ private static WorkflowApplication appl;
+
+ @BeforeAll
+ static void init() throws IOException {
+ appl = WorkflowApplication.builder().build();
+ }
+
+ @AfterAll
+ static void tearDown() throws IOException {
+ appl.close();
+ }
+
+ @Test
+ void testAfter() throws IOException, InterruptedException, ExecutionException {
+ try (WorkflowDefinition def =
+ appl.workflowDefinition(readWorkflowFromClasspath("workflows-samples/after-start.yaml"))) {
+ def.instance(Map.of()).start().join();
+ assertThat(appl.scheduler().scheduledInstances(def)).isEmpty();
+ await()
+ .pollDelay(Duration.ofMillis(50))
+ .atMost(Duration.ofMillis(200))
+ .until(() -> appl.scheduler().scheduledInstances(def).size() >= 1);
+ }
+ }
+
+ @Test
+ void testEvery() throws IOException, InterruptedException, ExecutionException {
+ try (WorkflowDefinition def =
+ appl.workflowDefinition(readWorkflowFromClasspath("workflows-samples/every-start.yaml"))) {
+ await()
+ .pollDelay(Duration.ofMillis(20))
+ .atMost(Duration.ofMillis(200))
+ .until(() -> appl.scheduler().scheduledInstances(def).size() >= 5);
+ }
+ }
+
+ @Test
+ @Disabled("too long test, since cron cannot be under a minute")
+ void testCron() throws IOException, InterruptedException, ExecutionException {
+ try (WorkflowDefinition def =
+ appl.workflowDefinition(readWorkflowFromClasspath("workflows-samples/cron-start.yaml"))) {
+ await()
+ .atMost(Duration.ofMinutes(1).plus(Duration.ofSeconds(10)))
+ .until(() -> appl.scheduler().scheduledInstances(def).size() == 1);
+ await()
+ .atMost(Duration.ofMinutes(1).plus(Duration.ofSeconds(10)))
+ .until(() -> appl.scheduler().scheduledInstances(def).size() == 2);
+ }
+ }
+}
diff --git a/impl/test/src/test/resources/workflows-samples/after-start.yaml b/impl/test/src/test/resources/workflows-samples/after-start.yaml
new file mode 100644
index 00000000..cdefccc4
--- /dev/null
+++ b/impl/test/src/test/resources/workflows-samples/after-start.yaml
@@ -0,0 +1,12 @@
+document:
+ dsl: '1.0.1'
+ namespace: test
+ name: after-driven-schedule
+ version: '0.1.0'
+schedule:
+ after:
+ milliseconds: 50
+do:
+ - recovered:
+ set:
+ recovered: true
diff --git a/impl/test/src/test/resources/workflows-samples/cron-start.yaml b/impl/test/src/test/resources/workflows-samples/cron-start.yaml
new file mode 100644
index 00000000..06dc2cf0
--- /dev/null
+++ b/impl/test/src/test/resources/workflows-samples/cron-start.yaml
@@ -0,0 +1,11 @@
+document:
+ dsl: '1.0.1'
+ namespace: test
+ name: cron-driven-schedule
+ version: '0.1.0'
+schedule:
+ cron: "* * * * *"
+do:
+ - recovered:
+ set:
+ recovered: true
diff --git a/impl/test/src/test/resources/workflows-samples/every-start.yaml b/impl/test/src/test/resources/workflows-samples/every-start.yaml
new file mode 100644
index 00000000..cf79681b
--- /dev/null
+++ b/impl/test/src/test/resources/workflows-samples/every-start.yaml
@@ -0,0 +1,12 @@
+document:
+ dsl: '1.0.1'
+ namespace: test
+ name: every-driven-schedule
+ version: '0.1.0'
+schedule:
+ every:
+ milliseconds: 10
+do:
+ - recovered:
+ set:
+ recovered: true