Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions impl/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,9 @@
<groupId>de.huxhorn.sulky</groupId>
<artifactId>de.huxhorn.sulky.ulid</artifactId>
</dependency>
<dependency>
<groupId>com.cronutils</groupId>
<artifactId>cron-utils</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl;

import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent;
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener;
import io.serverlessworkflow.impl.scheduler.Cancellable;
import io.serverlessworkflow.impl.scheduler.WorkflowScheduler;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

class SchedulerListener implements WorkflowExecutionListener, AutoCloseable {

private final WorkflowScheduler scheduler;
private final Map<WorkflowDefinition, WorkflowValueResolver<Duration>> afterMap =
new ConcurrentHashMap<>();
private final Map<WorkflowDefinition, Cancellable> cancellableMap = new ConcurrentHashMap<>();

public SchedulerListener(WorkflowScheduler scheduler) {
this.scheduler = scheduler;
}

public void addAfter(WorkflowDefinition definition, WorkflowValueResolver<Duration> after) {
afterMap.put(definition, after);
}

@Override
public void onWorkflowCompleted(WorkflowCompletedEvent ev) {
WorkflowDefinition workflowDefinition = (WorkflowDefinition) ev.workflowContext().definition();
WorkflowValueResolver<Duration> after = afterMap.get(workflowDefinition);
if (after != null) {
cancellableMap.put(
workflowDefinition,
scheduler.scheduleAfter(
workflowDefinition,
after.apply((WorkflowContext) ev.workflowContext(), null, ev.output())));
}
}

public void removeAfter(WorkflowDefinition definition) {
if (afterMap.remove(definition) != null) {
Cancellable cancellable = cancellableMap.remove(definition);
if (cancellable != null) {
cancellable.cancel();
}
}
}

@Override
public void close() {
cancellableMap.values().forEach(c -> c.cancel());
cancellableMap.clear();
afterMap.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.serverlessworkflow.impl.resources.ExternalResourceHandler;
import io.serverlessworkflow.impl.resources.ResourceLoaderFactory;
import io.serverlessworkflow.impl.scheduler.DefaultWorkflowScheduler;
import io.serverlessworkflow.impl.scheduler.WorkflowScheduler;
import io.serverlessworkflow.impl.schema.SchemaValidator;
import io.serverlessworkflow.impl.schema.SchemaValidatorFactory;
import java.util.ArrayList;
Expand Down Expand Up @@ -71,6 +72,7 @@ public class WorkflowApplication implements AutoCloseable {
private final Map<String, WorkflowAdditionalObject<?>> additionalObjects;
private final ConfigManager configManager;
private final SecretManager secretManager;
private final SchedulerListener schedulerListener;

private WorkflowApplication(Builder builder) {
this.taskFactory = builder.taskFactory;
Expand All @@ -81,13 +83,14 @@ private WorkflowApplication(Builder builder) {
this.idFactory = builder.idFactory;
this.runtimeDescriptorFactory = builder.descriptorFactory;
this.executorFactory = builder.executorFactory;
this.listeners = builder.listeners != null ? builder.listeners : Collections.emptySet();
this.listeners = builder.listeners;
this.definitions = new ConcurrentHashMap<>();
this.eventConsumer = builder.eventConsumer;
this.eventPublishers = builder.eventPublishers;
this.lifeCycleCEPublishingEnabled = builder.lifeCycleCEPublishingEnabled;
this.modelFactory = builder.modelFactory;
this.scheduler = builder.scheduler;
this.schedulerListener = builder.schedulerListener;
this.additionalObjects = builder.additionalObjects;
this.configManager = builder.configManager;
this.secretManager = builder.secretManager;
Expand Down Expand Up @@ -169,6 +172,7 @@ public SchemaValidator getValidator(SchemaInline inline) {
private Map<String, WorkflowAdditionalObject<?>> additionalObjects;
private SecretManager secretManager;
private ConfigManager configManager;
private SchedulerListener schedulerListener;

private Builder() {}

Expand Down Expand Up @@ -304,6 +308,8 @@ public WorkflowApplication build() {
if (scheduler == null) {
scheduler = new DefaultWorkflowScheduler();
}
schedulerListener = new SchedulerListener(scheduler);
listeners.add(schedulerListener);
if (additionalObjects == null) {
additionalObjects = Collections.emptyMap();
}
Expand Down Expand Up @@ -388,6 +394,10 @@ public SecretManager secretManager() {
return secretManager;
}

SchedulerListener schedulerListener() {
return schedulerListener;
}

public <T> Optional<T> additionalObject(
String name, WorkflowContext workflowContext, TaskContext taskContext) {
return Optional.ofNullable(additionalObjects.get(name))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,27 @@
import static io.serverlessworkflow.impl.WorkflowUtils.safeClose;

import io.serverlessworkflow.api.types.Input;
import io.serverlessworkflow.api.types.ListenTo;
import io.serverlessworkflow.api.types.Output;
import io.serverlessworkflow.api.types.Schedule;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo;
import io.serverlessworkflow.impl.executors.TaskExecutor;
import io.serverlessworkflow.impl.executors.TaskExecutorHelper;
import io.serverlessworkflow.impl.resources.ResourceLoader;
import io.serverlessworkflow.impl.scheduler.Cancellable;
import io.serverlessworkflow.impl.scheduler.ScheduledEventConsumer;
import io.serverlessworkflow.impl.scheduler.WorkflowScheduler;
import io.serverlessworkflow.impl.schema.SchemaValidator;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData {

private final Workflow workflow;
private final WorkflowDefinitionId definitionId;
private Optional<SchemaValidator> inputSchemaValidator = Optional.empty();
private Optional<SchemaValidator> outputSchemaValidator = Optional.empty();
private Optional<WorkflowFilter> inputFilter = Optional.empty();
Expand All @@ -47,10 +50,13 @@ public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData
private final ResourceLoader resourceLoader;
private final Map<String, TaskExecutor<?>> executors = new HashMap<>();
private ScheduledEventConsumer scheculedConsumer;
private Cancellable everySchedule;
private Cancellable cronSchedule;

private WorkflowDefinition(
WorkflowApplication application, Workflow workflow, ResourceLoader resourceLoader) {
this.workflow = workflow;
this.definitionId = WorkflowDefinitionId.of(workflow);
this.application = application;
this.resourceLoader = resourceLoader;

Expand Down Expand Up @@ -84,15 +90,28 @@ static WorkflowDefinition of(WorkflowApplication application, Workflow workflow,
application.resourceLoaderFactory().getResourceLoader(application, path));
Schedule schedule = workflow.getSchedule();
if (schedule != null) {
ListenTo to = schedule.getOn();
if (to != null) {
WorkflowScheduler scheduler = application.scheduler();
if (schedule.getOn() != null) {
definition.scheculedConsumer =
application
.scheduler()
.eventConsumer(
definition,
application.modelFactory()::from,
EventRegistrationBuilderInfo.from(application, to, x -> null));
scheduler.eventConsumer(
definition,
application.modelFactory()::from,
EventRegistrationBuilderInfo.from(application, schedule.getOn(), x -> null));
}
if (schedule.getAfter() != null) {
application
.schedulerListener()
.addAfter(definition, WorkflowUtils.fromTimeoutAfter(application, schedule.getAfter()));
}
if (schedule.getCron() != null) {
definition.cronSchedule = scheduler.scheduleCron(definition, schedule.getCron());
}
if (schedule.getEvery() != null) {
definition.everySchedule =
scheduler.scheduleEvery(
definition,
WorkflowUtils.fromTimeoutAfter(application, schedule.getEvery())
.apply(null, null, application.modelFactory().fromNull()));
}
}
return definition;
Expand Down Expand Up @@ -148,7 +167,28 @@ public void addTaskExecutor(WorkflowMutablePosition position, TaskExecutor<?> ta

@Override
public void close() {
safeClose(scheculedConsumer);
safeClose(resourceLoader);
safeClose(scheculedConsumer);
application.schedulerListener().removeAfter(this);
if (everySchedule != null) {
everySchedule.cancel();
}
if (cronSchedule != null) {
cronSchedule.cancel();
}
}

@Override
public int hashCode() {
return Objects.hash(definitionId);
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
WorkflowDefinition other = (WorkflowDefinition) obj;
return Objects.equals(definitionId, other.definitionId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.scheduler;

public interface Cancellable {
void cancel();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.scheduler;

import java.time.Duration;
import java.util.Optional;

public interface CronResolver {
Optional<Duration> nextExecution();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.scheduler;

public interface CronResolverFactory {
CronResolver parseCron(String cron);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.scheduler;

import com.cronutils.model.Cron;
import com.cronutils.model.time.ExecutionTime;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.Optional;

class CronUtilsResolver implements CronResolver {

private final ExecutionTime executionTime;

public CronUtilsResolver(Cron cron) {
this.executionTime = ExecutionTime.forCron(cron);
}

@Override
public Optional<Duration> nextExecution() {
return executionTime.timeToNextExecution(ZonedDateTime.now());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.scheduler;

import com.cronutils.model.CronType;
import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.parser.CronParser;

class CronUtilsResolverFactory implements CronResolverFactory {

private final CronParser cronParser;

public CronUtilsResolverFactory() {
this(CronType.UNIX);
}

public CronUtilsResolverFactory(CronType type) {
this.cronParser = new CronParser(CronDefinitionBuilder.instanceDefinitionFor(type));
}

@Override
public CronResolver parseCron(String cron) {
return new CronUtilsResolver(cronParser.parse(cron));
}
}
Loading