diff --git a/impl/core/pom.xml b/impl/core/pom.xml index 0ee45fe9..6fb99f6d 100644 --- a/impl/core/pom.xml +++ b/impl/core/pom.xml @@ -24,5 +24,9 @@ de.huxhorn.sulky de.huxhorn.sulky.ulid + + com.cronutils + cron-utils + diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/SchedulerListener.java b/impl/core/src/main/java/io/serverlessworkflow/impl/SchedulerListener.java new file mode 100644 index 00000000..2aa57694 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/SchedulerListener.java @@ -0,0 +1,69 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl; + +import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener; +import io.serverlessworkflow.impl.scheduler.Cancellable; +import io.serverlessworkflow.impl.scheduler.WorkflowScheduler; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +class SchedulerListener implements WorkflowExecutionListener, AutoCloseable { + + private final WorkflowScheduler scheduler; + private final Map> afterMap = + new ConcurrentHashMap<>(); + private final Map cancellableMap = new ConcurrentHashMap<>(); + + public SchedulerListener(WorkflowScheduler scheduler) { + this.scheduler = scheduler; + } + + public void addAfter(WorkflowDefinition definition, WorkflowValueResolver after) { + afterMap.put(definition, after); + } + + @Override + public void onWorkflowCompleted(WorkflowCompletedEvent ev) { + WorkflowDefinition workflowDefinition = (WorkflowDefinition) ev.workflowContext().definition(); + WorkflowValueResolver after = afterMap.get(workflowDefinition); + if (after != null) { + cancellableMap.put( + workflowDefinition, + scheduler.scheduleAfter( + workflowDefinition, + after.apply((WorkflowContext) ev.workflowContext(), null, ev.output()))); + } + } + + public void removeAfter(WorkflowDefinition definition) { + if (afterMap.remove(definition) != null) { + Cancellable cancellable = cancellableMap.remove(definition); + if (cancellable != null) { + cancellable.cancel(); + } + } + } + + @Override + public void close() { + cancellableMap.values().forEach(c -> c.cancel()); + cancellableMap.clear(); + afterMap.clear(); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java index 9865f72b..413bfbe8 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java @@ -36,6 +36,7 @@ import io.serverlessworkflow.impl.resources.ExternalResourceHandler; import io.serverlessworkflow.impl.resources.ResourceLoaderFactory; import io.serverlessworkflow.impl.scheduler.DefaultWorkflowScheduler; +import io.serverlessworkflow.impl.scheduler.WorkflowScheduler; import io.serverlessworkflow.impl.schema.SchemaValidator; import io.serverlessworkflow.impl.schema.SchemaValidatorFactory; import java.util.ArrayList; @@ -71,6 +72,7 @@ public class WorkflowApplication implements AutoCloseable { private final Map> additionalObjects; private final ConfigManager configManager; private final SecretManager secretManager; + private final SchedulerListener schedulerListener; private WorkflowApplication(Builder builder) { this.taskFactory = builder.taskFactory; @@ -81,13 +83,14 @@ private WorkflowApplication(Builder builder) { this.idFactory = builder.idFactory; this.runtimeDescriptorFactory = builder.descriptorFactory; this.executorFactory = builder.executorFactory; - this.listeners = builder.listeners != null ? builder.listeners : Collections.emptySet(); + this.listeners = builder.listeners; this.definitions = new ConcurrentHashMap<>(); this.eventConsumer = builder.eventConsumer; this.eventPublishers = builder.eventPublishers; this.lifeCycleCEPublishingEnabled = builder.lifeCycleCEPublishingEnabled; this.modelFactory = builder.modelFactory; this.scheduler = builder.scheduler; + this.schedulerListener = builder.schedulerListener; this.additionalObjects = builder.additionalObjects; this.configManager = builder.configManager; this.secretManager = builder.secretManager; @@ -169,6 +172,7 @@ public SchemaValidator getValidator(SchemaInline inline) { private Map> additionalObjects; private SecretManager secretManager; private ConfigManager configManager; + private SchedulerListener schedulerListener; private Builder() {} @@ -304,6 +308,8 @@ public WorkflowApplication build() { if (scheduler == null) { scheduler = new DefaultWorkflowScheduler(); } + schedulerListener = new SchedulerListener(scheduler); + listeners.add(schedulerListener); if (additionalObjects == null) { additionalObjects = Collections.emptyMap(); } @@ -388,6 +394,10 @@ public SecretManager secretManager() { return secretManager; } + SchedulerListener schedulerListener() { + return schedulerListener; + } + public Optional additionalObject( String name, WorkflowContext workflowContext, TaskContext taskContext) { return Optional.ofNullable(additionalObjects.get(name)) diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java index 41c5c2ee..6fe8856a 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java @@ -20,7 +20,6 @@ import static io.serverlessworkflow.impl.WorkflowUtils.safeClose; import io.serverlessworkflow.api.types.Input; -import io.serverlessworkflow.api.types.ListenTo; import io.serverlessworkflow.api.types.Output; import io.serverlessworkflow.api.types.Schedule; import io.serverlessworkflow.api.types.Workflow; @@ -28,16 +27,20 @@ import io.serverlessworkflow.impl.executors.TaskExecutor; import io.serverlessworkflow.impl.executors.TaskExecutorHelper; import io.serverlessworkflow.impl.resources.ResourceLoader; +import io.serverlessworkflow.impl.scheduler.Cancellable; import io.serverlessworkflow.impl.scheduler.ScheduledEventConsumer; +import io.serverlessworkflow.impl.scheduler.WorkflowScheduler; import io.serverlessworkflow.impl.schema.SchemaValidator; import java.nio.file.Path; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.Optional; public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData { private final Workflow workflow; + private final WorkflowDefinitionId definitionId; private Optional inputSchemaValidator = Optional.empty(); private Optional outputSchemaValidator = Optional.empty(); private Optional inputFilter = Optional.empty(); @@ -47,10 +50,13 @@ public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData private final ResourceLoader resourceLoader; private final Map> executors = new HashMap<>(); private ScheduledEventConsumer scheculedConsumer; + private Cancellable everySchedule; + private Cancellable cronSchedule; private WorkflowDefinition( WorkflowApplication application, Workflow workflow, ResourceLoader resourceLoader) { this.workflow = workflow; + this.definitionId = WorkflowDefinitionId.of(workflow); this.application = application; this.resourceLoader = resourceLoader; @@ -84,15 +90,28 @@ static WorkflowDefinition of(WorkflowApplication application, Workflow workflow, application.resourceLoaderFactory().getResourceLoader(application, path)); Schedule schedule = workflow.getSchedule(); if (schedule != null) { - ListenTo to = schedule.getOn(); - if (to != null) { + WorkflowScheduler scheduler = application.scheduler(); + if (schedule.getOn() != null) { definition.scheculedConsumer = - application - .scheduler() - .eventConsumer( - definition, - application.modelFactory()::from, - EventRegistrationBuilderInfo.from(application, to, x -> null)); + scheduler.eventConsumer( + definition, + application.modelFactory()::from, + EventRegistrationBuilderInfo.from(application, schedule.getOn(), x -> null)); + } + if (schedule.getAfter() != null) { + application + .schedulerListener() + .addAfter(definition, WorkflowUtils.fromTimeoutAfter(application, schedule.getAfter())); + } + if (schedule.getCron() != null) { + definition.cronSchedule = scheduler.scheduleCron(definition, schedule.getCron()); + } + if (schedule.getEvery() != null) { + definition.everySchedule = + scheduler.scheduleEvery( + definition, + WorkflowUtils.fromTimeoutAfter(application, schedule.getEvery()) + .apply(null, null, application.modelFactory().fromNull())); } } return definition; @@ -148,7 +167,28 @@ public void addTaskExecutor(WorkflowMutablePosition position, TaskExecutor ta @Override public void close() { - safeClose(scheculedConsumer); safeClose(resourceLoader); + safeClose(scheculedConsumer); + application.schedulerListener().removeAfter(this); + if (everySchedule != null) { + everySchedule.cancel(); + } + if (cronSchedule != null) { + cronSchedule.cancel(); + } + } + + @Override + public int hashCode() { + return Objects.hash(definitionId); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (getClass() != obj.getClass()) return false; + WorkflowDefinition other = (WorkflowDefinition) obj; + return Objects.equals(definitionId, other.definitionId); } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/Cancellable.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/Cancellable.java new file mode 100644 index 00000000..fef5b973 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/Cancellable.java @@ -0,0 +1,20 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.scheduler; + +public interface Cancellable { + void cancel(); +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/CronResolver.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/CronResolver.java new file mode 100644 index 00000000..df254895 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/CronResolver.java @@ -0,0 +1,23 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.scheduler; + +import java.time.Duration; +import java.util.Optional; + +public interface CronResolver { + Optional nextExecution(); +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/CronResolverFactory.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/CronResolverFactory.java new file mode 100644 index 00000000..4c67f062 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/CronResolverFactory.java @@ -0,0 +1,20 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.scheduler; + +public interface CronResolverFactory { + CronResolver parseCron(String cron); +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/CronUtilsResolver.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/CronUtilsResolver.java new file mode 100644 index 00000000..fcb7086b --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/CronUtilsResolver.java @@ -0,0 +1,36 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.scheduler; + +import com.cronutils.model.Cron; +import com.cronutils.model.time.ExecutionTime; +import java.time.Duration; +import java.time.ZonedDateTime; +import java.util.Optional; + +class CronUtilsResolver implements CronResolver { + + private final ExecutionTime executionTime; + + public CronUtilsResolver(Cron cron) { + this.executionTime = ExecutionTime.forCron(cron); + } + + @Override + public Optional nextExecution() { + return executionTime.timeToNextExecution(ZonedDateTime.now()); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/CronUtilsResolverFactory.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/CronUtilsResolverFactory.java new file mode 100644 index 00000000..3facca0e --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/CronUtilsResolverFactory.java @@ -0,0 +1,38 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.scheduler; + +import com.cronutils.model.CronType; +import com.cronutils.model.definition.CronDefinitionBuilder; +import com.cronutils.parser.CronParser; + +class CronUtilsResolverFactory implements CronResolverFactory { + + private final CronParser cronParser; + + public CronUtilsResolverFactory() { + this(CronType.UNIX); + } + + public CronUtilsResolverFactory(CronType type) { + this.cronParser = new CronParser(CronDefinitionBuilder.instanceDefinitionFor(type)); + } + + @Override + public CronResolver parseCron(String cron) { + return new CronUtilsResolver(cronParser.parse(cron)); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/DefaultWorkflowScheduler.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/DefaultWorkflowScheduler.java index 5e3338e3..338fbca7 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/DefaultWorkflowScheduler.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/DefaultWorkflowScheduler.java @@ -19,20 +19,39 @@ import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowInstance; import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowScheduler; import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; public class DefaultWorkflowScheduler implements WorkflowScheduler { - private Map> instances = + private final Map> instances = new ConcurrentHashMap<>(); + private final ScheduledExecutorService service; + private final CronResolverFactory cronFactory; + + public DefaultWorkflowScheduler() { + this(Executors.newSingleThreadScheduledExecutor(), new CronUtilsResolverFactory()); + } + + public DefaultWorkflowScheduler( + ScheduledExecutorService service, CronResolverFactory cronFactory) { + this.service = service; + this.cronFactory = cronFactory; + } + @Override public Collection scheduledInstances(WorkflowDefinition definition) { return Collections.unmodifiableCollection(theInstances(definition)); @@ -43,15 +62,93 @@ public ScheduledEventConsumer eventConsumer( WorkflowDefinition definition, Function converter, EventRegistrationBuilderInfo builderInfo) { - return new ScheduledEventConsumer(definition, converter, builderInfo) { - @Override - protected void addScheduledInstance(WorkflowInstance instance) { - theInstances(definition).add(instance); - } - }; + return new ScheduledEventConsumer( + definition, converter, builderInfo, new DefaultScheduledInstanceRunner(definition)); + } + + @Override + public Cancellable scheduleAfter(WorkflowDefinition definition, Duration delay) { + return new ScheduledServiceCancellable( + service.schedule( + new DefaultScheduledInstanceRunner(definition), + delay.toMillis(), + TimeUnit.MILLISECONDS)); + } + + @Override + public Cancellable scheduleEvery(WorkflowDefinition definition, Duration interval) { + long delay = interval.toMillis(); + return new ScheduledServiceCancellable( + service.scheduleAtFixedRate( + new DefaultScheduledInstanceRunner(definition), delay, delay, TimeUnit.MILLISECONDS)); + } + + @Override + public Cancellable scheduleCron(WorkflowDefinition definition, String cron) { + return new CronResolverCancellable(definition, cronFactory.parseCron(cron)); } private Collection theInstances(WorkflowDefinition definition) { return instances.computeIfAbsent(definition, def -> new ArrayList<>()); } + + private class CronResolverCancellable implements Cancellable { + private final WorkflowDefinition definition; + private final CronResolver cronResolver; + + private AtomicReference> nextCron = new AtomicReference<>(); + private AtomicBoolean cancelled = new AtomicBoolean(); + + public CronResolverCancellable(WorkflowDefinition definition, CronResolver cronResolver) { + this.definition = definition; + this.cronResolver = cronResolver; + scheduleNext(); + } + + private void scheduleNext() { + cronResolver + .nextExecution() + .ifPresent( + d -> + nextCron.set( + service.schedule( + new CronResolverIntanceRunner(definition), + d.toMillis(), + TimeUnit.MILLISECONDS))); + } + + @Override + public void cancel() { + cancelled.set(true); + ScheduledFuture toBeCancel = nextCron.get(); + if (toBeCancel != null) { + toBeCancel.cancel(true); + } + } + + private class CronResolverIntanceRunner extends DefaultScheduledInstanceRunner { + protected CronResolverIntanceRunner(WorkflowDefinition definition) { + super(definition); + } + + @Override + public void accept(WorkflowModel model) { + if (!cancelled.get()) { + scheduleNext(); + super.accept(model); + } + } + } + } + + private class DefaultScheduledInstanceRunner extends ScheduledInstanceRunnable { + protected DefaultScheduledInstanceRunner(WorkflowDefinition definition) { + super(definition); + } + + @Override + protected void addScheduledInstance(WorkflowInstance instance) { + theInstances(definition).add(instance); + } + } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java index 34746355..76e6ccb9 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java @@ -17,7 +17,6 @@ import io.cloudevents.CloudEvent; import io.serverlessworkflow.impl.WorkflowDefinition; -import io.serverlessworkflow.impl.WorkflowInstance; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowModelCollection; import io.serverlessworkflow.impl.events.EventConsumer; @@ -31,22 +30,25 @@ import java.util.Map; import java.util.function.Function; -public abstract class ScheduledEventConsumer implements AutoCloseable { +public class ScheduledEventConsumer implements AutoCloseable { private final Function converter; private final WorkflowDefinition definition; private final EventRegistrationBuilderInfo builderInfo; private final EventConsumer eventConsumer; + private final ScheduledInstanceRunnable instanceRunner; private Map> correlatedEvents; private Collection registrations = new ArrayList<>(); protected ScheduledEventConsumer( WorkflowDefinition definition, Function converter, - EventRegistrationBuilderInfo builderInfo) { + EventRegistrationBuilderInfo builderInfo, + ScheduledInstanceRunnable instanceRunner) { this.definition = definition; this.converter = converter; this.builderInfo = builderInfo; + this.instanceRunner = instanceRunner; this.eventConsumer = definition.application().eventConsumer(); if (builderInfo.registrations().isAnd() && builderInfo.registrations().registrations().size() > 1) { @@ -100,19 +102,13 @@ private boolean satisfyCondition() { protected void start(CloudEvent ce) { WorkflowModelCollection model = definition.application().modelFactory().createCollection(); model.add(converter.apply(ce)); - start(model); + instanceRunner.accept(model); } protected void start(Collection ces) { WorkflowModelCollection model = definition.application().modelFactory().createCollection(); ces.forEach(ce -> model.add(converter.apply(ce))); - start(model); - } - - private void start(WorkflowModel model) { - WorkflowInstance instance = definition.instance(model); - addScheduledInstance(instance); - instance.start(); + instanceRunner.accept(model); } public void close() { @@ -121,6 +117,4 @@ public void close() { } registrations.forEach(eventConsumer::unregister); } - - protected abstract void addScheduledInstance(WorkflowInstance instace); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledInstanceRunnable.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledInstanceRunnable.java new file mode 100644 index 00000000..cdb98e58 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledInstanceRunnable.java @@ -0,0 +1,44 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.scheduler; + +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowInstance; +import io.serverlessworkflow.impl.WorkflowModel; +import java.util.function.Consumer; + +public abstract class ScheduledInstanceRunnable implements Runnable, Consumer { + + protected final WorkflowDefinition definition; + + protected ScheduledInstanceRunnable(WorkflowDefinition definition) { + this.definition = definition; + } + + @Override + public void run() { + accept(definition.application().modelFactory().fromNull()); + } + + @Override + public void accept(WorkflowModel model) { + WorkflowInstance instance = definition.instance(model); + addScheduledInstance(instance); + definition.application().executorService().execute(() -> instance.start()); + } + + protected abstract void addScheduledInstance(WorkflowInstance instance); +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledServiceCancellable.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledServiceCancellable.java new file mode 100644 index 00000000..2341f910 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledServiceCancellable.java @@ -0,0 +1,32 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.scheduler; + +import java.util.concurrent.ScheduledFuture; + +class ScheduledServiceCancellable implements Cancellable { + + private final ScheduledFuture cancellable; + + public ScheduledServiceCancellable(ScheduledFuture cancellable) { + this.cancellable = cancellable; + } + + @Override + public void cancel() { + cancellable.cancel(true); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowScheduler.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/WorkflowScheduler.java similarity index 54% rename from impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowScheduler.java rename to impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/WorkflowScheduler.java index 84f5b913..27059f1a 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowScheduler.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/WorkflowScheduler.java @@ -13,11 +13,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverlessworkflow.impl; +package io.serverlessworkflow.impl.scheduler; import io.cloudevents.CloudEvent; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowInstance; +import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo; -import io.serverlessworkflow.impl.scheduler.ScheduledEventConsumer; +import java.time.Duration; import java.util.Collection; import java.util.function.Function; @@ -28,4 +31,19 @@ ScheduledEventConsumer eventConsumer( WorkflowDefinition definition, Function converter, EventRegistrationBuilderInfo info); + + /** + * Periodically instantiate a workflow instance from the given definition at the given interval. + * It continue creating workflow instances till cancelled. + */ + Cancellable scheduleEvery(WorkflowDefinition definition, Duration interval); + + /** Creates one workflow instance after the specified delay. */ + Cancellable scheduleAfter(WorkflowDefinition definition, Duration delay); + + /** + * Creates one or more workflow instances according to the specified Cron expression. It continue + * creating workflow instances till the Cron expression indicates so or it is cancelled + */ + Cancellable scheduleCron(WorkflowDefinition definition, String cron); } diff --git a/impl/pom.xml b/impl/pom.xml index 00d900f9..e4698812 100644 --- a/impl/pom.xml +++ b/impl/pom.xml @@ -14,6 +14,7 @@ 4.0.0 1.6.0 3.1.11 + 9.2.1 @@ -124,6 +125,11 @@ h2-mvstore ${version.com.h2database} + + com.cronutils + cron-utils + ${version.com.cronutils} + diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/SchedulerTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/SchedulerTest.java new file mode 100644 index 00000000..43c19644 --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/SchedulerTest.java @@ -0,0 +1,84 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test; + +import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; +import java.io.IOException; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +class SchedulerTest { + + private static WorkflowApplication appl; + + @BeforeAll + static void init() throws IOException { + appl = WorkflowApplication.builder().build(); + } + + @AfterAll + static void tearDown() throws IOException { + appl.close(); + } + + @Test + void testAfter() throws IOException, InterruptedException, ExecutionException { + try (WorkflowDefinition def = + appl.workflowDefinition(readWorkflowFromClasspath("workflows-samples/after-start.yaml"))) { + def.instance(Map.of()).start().join(); + assertThat(appl.scheduler().scheduledInstances(def)).isEmpty(); + await() + .pollDelay(Duration.ofMillis(50)) + .atMost(Duration.ofMillis(200)) + .until(() -> appl.scheduler().scheduledInstances(def).size() >= 1); + } + } + + @Test + void testEvery() throws IOException, InterruptedException, ExecutionException { + try (WorkflowDefinition def = + appl.workflowDefinition(readWorkflowFromClasspath("workflows-samples/every-start.yaml"))) { + await() + .pollDelay(Duration.ofMillis(20)) + .atMost(Duration.ofMillis(200)) + .until(() -> appl.scheduler().scheduledInstances(def).size() >= 5); + } + } + + @Test + @Disabled("too long test, since cron cannot be under a minute") + void testCron() throws IOException, InterruptedException, ExecutionException { + try (WorkflowDefinition def = + appl.workflowDefinition(readWorkflowFromClasspath("workflows-samples/cron-start.yaml"))) { + await() + .atMost(Duration.ofMinutes(1).plus(Duration.ofSeconds(10))) + .until(() -> appl.scheduler().scheduledInstances(def).size() == 1); + await() + .atMost(Duration.ofMinutes(1).plus(Duration.ofSeconds(10))) + .until(() -> appl.scheduler().scheduledInstances(def).size() == 2); + } + } +} diff --git a/impl/test/src/test/resources/workflows-samples/after-start.yaml b/impl/test/src/test/resources/workflows-samples/after-start.yaml new file mode 100644 index 00000000..cdefccc4 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/after-start.yaml @@ -0,0 +1,12 @@ +document: + dsl: '1.0.1' + namespace: test + name: after-driven-schedule + version: '0.1.0' +schedule: + after: + milliseconds: 50 +do: + - recovered: + set: + recovered: true diff --git a/impl/test/src/test/resources/workflows-samples/cron-start.yaml b/impl/test/src/test/resources/workflows-samples/cron-start.yaml new file mode 100644 index 00000000..06dc2cf0 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/cron-start.yaml @@ -0,0 +1,11 @@ +document: + dsl: '1.0.1' + namespace: test + name: cron-driven-schedule + version: '0.1.0' +schedule: + cron: "* * * * *" +do: + - recovered: + set: + recovered: true diff --git a/impl/test/src/test/resources/workflows-samples/every-start.yaml b/impl/test/src/test/resources/workflows-samples/every-start.yaml new file mode 100644 index 00000000..cf79681b --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/every-start.yaml @@ -0,0 +1,12 @@ +document: + dsl: '1.0.1' + namespace: test + name: every-driven-schedule + version: '0.1.0' +schedule: + every: + milliseconds: 10 +do: + - recovered: + set: + recovered: true