Skip to content

Commit e4b5acf

Browse files
committed
[Fix #932] Workflow scheduler
Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent 9338a05 commit e4b5acf

19 files changed

+604
-34
lines changed

impl/core/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,9 @@
2424
<groupId>de.huxhorn.sulky</groupId>
2525
<artifactId>de.huxhorn.sulky.ulid</artifactId>
2626
</dependency>
27+
<dependency>
28+
<groupId>com.cronutils</groupId>
29+
<artifactId>cron-utils</artifactId>
30+
</dependency>
2731
</dependencies>
2832
</project>
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl;
17+
18+
import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent;
19+
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener;
20+
import io.serverlessworkflow.impl.scheduler.Cancellable;
21+
import io.serverlessworkflow.impl.scheduler.WorkflowScheduler;
22+
import java.time.Duration;
23+
import java.util.Map;
24+
import java.util.concurrent.ConcurrentHashMap;
25+
26+
class SchedulerListener implements WorkflowExecutionListener, AutoCloseable {
27+
28+
private final WorkflowScheduler scheduler;
29+
private final Map<WorkflowDefinition, WorkflowValueResolver<Duration>> afterMap =
30+
new ConcurrentHashMap<>();
31+
private final Map<WorkflowDefinition, Cancellable> cancellableMap = new ConcurrentHashMap<>();
32+
33+
public SchedulerListener(WorkflowScheduler scheduler) {
34+
this.scheduler = scheduler;
35+
}
36+
37+
public void addAfter(WorkflowDefinition definition, WorkflowValueResolver<Duration> after) {
38+
afterMap.put(definition, after);
39+
}
40+
41+
@Override
42+
public void onWorkflowCompleted(WorkflowCompletedEvent ev) {
43+
WorkflowDefinition workflowDefinition = (WorkflowDefinition) ev.workflowContext().definition();
44+
WorkflowValueResolver<Duration> after = afterMap.get(workflowDefinition);
45+
if (after != null) {
46+
cancellableMap.put(
47+
workflowDefinition,
48+
scheduler.scheduleAfter(
49+
workflowDefinition,
50+
after.apply((WorkflowContext) ev.workflowContext(), null, ev.output())));
51+
}
52+
}
53+
54+
public void removeAfter(WorkflowDefinition definition) {
55+
if (afterMap.remove(definition) != null) {
56+
Cancellable cancellable = cancellableMap.remove(definition);
57+
if (cancellable != null) {
58+
cancellable.cancel();
59+
}
60+
}
61+
}
62+
63+
@Override
64+
public void close() {
65+
cancellableMap.values().forEach(c -> c.cancel());
66+
cancellableMap.clear();
67+
afterMap.clear();
68+
}
69+
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import io.serverlessworkflow.impl.resources.ExternalResourceHandler;
3737
import io.serverlessworkflow.impl.resources.ResourceLoaderFactory;
3838
import io.serverlessworkflow.impl.scheduler.DefaultWorkflowScheduler;
39+
import io.serverlessworkflow.impl.scheduler.WorkflowScheduler;
3940
import io.serverlessworkflow.impl.schema.SchemaValidator;
4041
import io.serverlessworkflow.impl.schema.SchemaValidatorFactory;
4142
import java.util.ArrayList;
@@ -71,6 +72,7 @@ public class WorkflowApplication implements AutoCloseable {
7172
private final Map<String, WorkflowAdditionalObject<?>> additionalObjects;
7273
private final ConfigManager configManager;
7374
private final SecretManager secretManager;
75+
private final SchedulerListener schedulerListener;
7476

7577
private WorkflowApplication(Builder builder) {
7678
this.taskFactory = builder.taskFactory;
@@ -81,13 +83,14 @@ private WorkflowApplication(Builder builder) {
8183
this.idFactory = builder.idFactory;
8284
this.runtimeDescriptorFactory = builder.descriptorFactory;
8385
this.executorFactory = builder.executorFactory;
84-
this.listeners = builder.listeners != null ? builder.listeners : Collections.emptySet();
86+
this.listeners = builder.listeners;
8587
this.definitions = new ConcurrentHashMap<>();
8688
this.eventConsumer = builder.eventConsumer;
8789
this.eventPublishers = builder.eventPublishers;
8890
this.lifeCycleCEPublishingEnabled = builder.lifeCycleCEPublishingEnabled;
8991
this.modelFactory = builder.modelFactory;
9092
this.scheduler = builder.scheduler;
93+
this.schedulerListener = builder.schedulerListener;
9194
this.additionalObjects = builder.additionalObjects;
9295
this.configManager = builder.configManager;
9396
this.secretManager = builder.secretManager;
@@ -169,6 +172,7 @@ public SchemaValidator getValidator(SchemaInline inline) {
169172
private Map<String, WorkflowAdditionalObject<?>> additionalObjects;
170173
private SecretManager secretManager;
171174
private ConfigManager configManager;
175+
private SchedulerListener schedulerListener;
172176

173177
private Builder() {}
174178

@@ -304,6 +308,8 @@ public WorkflowApplication build() {
304308
if (scheduler == null) {
305309
scheduler = new DefaultWorkflowScheduler();
306310
}
311+
schedulerListener = new SchedulerListener(scheduler);
312+
listeners.add(schedulerListener);
307313
if (additionalObjects == null) {
308314
additionalObjects = Collections.emptyMap();
309315
}
@@ -388,6 +394,10 @@ public SecretManager secretManager() {
388394
return secretManager;
389395
}
390396

397+
SchedulerListener schedulerListener() {
398+
return schedulerListener;
399+
}
400+
391401
public <T> Optional<T> additionalObject(
392402
String name, WorkflowContext workflowContext, TaskContext taskContext) {
393403
return Optional.ofNullable(additionalObjects.get(name))

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,27 @@
2020
import static io.serverlessworkflow.impl.WorkflowUtils.safeClose;
2121

2222
import io.serverlessworkflow.api.types.Input;
23-
import io.serverlessworkflow.api.types.ListenTo;
2423
import io.serverlessworkflow.api.types.Output;
2524
import io.serverlessworkflow.api.types.Schedule;
2625
import io.serverlessworkflow.api.types.Workflow;
2726
import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo;
2827
import io.serverlessworkflow.impl.executors.TaskExecutor;
2928
import io.serverlessworkflow.impl.executors.TaskExecutorHelper;
3029
import io.serverlessworkflow.impl.resources.ResourceLoader;
30+
import io.serverlessworkflow.impl.scheduler.Cancellable;
3131
import io.serverlessworkflow.impl.scheduler.ScheduledEventConsumer;
32+
import io.serverlessworkflow.impl.scheduler.WorkflowScheduler;
3233
import io.serverlessworkflow.impl.schema.SchemaValidator;
3334
import java.nio.file.Path;
3435
import java.util.HashMap;
3536
import java.util.Map;
37+
import java.util.Objects;
3638
import java.util.Optional;
3739

3840
public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData {
3941

4042
private final Workflow workflow;
43+
private final WorkflowDefinitionId definitionId;
4144
private Optional<SchemaValidator> inputSchemaValidator = Optional.empty();
4245
private Optional<SchemaValidator> outputSchemaValidator = Optional.empty();
4346
private Optional<WorkflowFilter> inputFilter = Optional.empty();
@@ -47,10 +50,13 @@ public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData
4750
private final ResourceLoader resourceLoader;
4851
private final Map<String, TaskExecutor<?>> executors = new HashMap<>();
4952
private ScheduledEventConsumer scheculedConsumer;
53+
private Cancellable everySchedule;
54+
private Cancellable cronSchedule;
5055

5156
private WorkflowDefinition(
5257
WorkflowApplication application, Workflow workflow, ResourceLoader resourceLoader) {
5358
this.workflow = workflow;
59+
this.definitionId = WorkflowDefinitionId.of(workflow);
5460
this.application = application;
5561
this.resourceLoader = resourceLoader;
5662

@@ -84,15 +90,28 @@ static WorkflowDefinition of(WorkflowApplication application, Workflow workflow,
8490
application.resourceLoaderFactory().getResourceLoader(application, path));
8591
Schedule schedule = workflow.getSchedule();
8692
if (schedule != null) {
87-
ListenTo to = schedule.getOn();
88-
if (to != null) {
93+
WorkflowScheduler scheduler = application.scheduler();
94+
if (schedule.getOn() != null) {
8995
definition.scheculedConsumer =
90-
application
91-
.scheduler()
92-
.eventConsumer(
93-
definition,
94-
application.modelFactory()::from,
95-
EventRegistrationBuilderInfo.from(application, to, x -> null));
96+
scheduler.eventConsumer(
97+
definition,
98+
application.modelFactory()::from,
99+
EventRegistrationBuilderInfo.from(application, schedule.getOn(), x -> null));
100+
}
101+
if (schedule.getAfter() != null) {
102+
application
103+
.schedulerListener()
104+
.addAfter(definition, WorkflowUtils.fromTimeoutAfter(application, schedule.getAfter()));
105+
}
106+
if (schedule.getCron() != null) {
107+
definition.cronSchedule = scheduler.scheduleCron(definition, schedule.getCron());
108+
}
109+
if (schedule.getEvery() != null) {
110+
definition.everySchedule =
111+
scheduler.scheduleEvery(
112+
definition,
113+
WorkflowUtils.fromTimeoutAfter(application, schedule.getEvery())
114+
.apply(null, null, application.modelFactory().fromNull()));
96115
}
97116
}
98117
return definition;
@@ -148,7 +167,28 @@ public void addTaskExecutor(WorkflowMutablePosition position, TaskExecutor<?> ta
148167

149168
@Override
150169
public void close() {
151-
safeClose(scheculedConsumer);
152170
safeClose(resourceLoader);
171+
safeClose(scheculedConsumer);
172+
application.schedulerListener().removeAfter(this);
173+
if (everySchedule != null) {
174+
everySchedule.cancel();
175+
}
176+
if (cronSchedule != null) {
177+
cronSchedule.cancel();
178+
}
179+
}
180+
181+
@Override
182+
public int hashCode() {
183+
return Objects.hash(definitionId);
184+
}
185+
186+
@Override
187+
public boolean equals(Object obj) {
188+
if (this == obj) return true;
189+
if (obj == null) return false;
190+
if (getClass() != obj.getClass()) return false;
191+
WorkflowDefinition other = (WorkflowDefinition) obj;
192+
return Objects.equals(definitionId, other.definitionId);
153193
}
154194
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.scheduler;
17+
18+
public interface Cancellable {
19+
void cancel();
20+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.scheduler;
17+
18+
import java.time.Duration;
19+
import java.util.Optional;
20+
21+
public interface CronResolver {
22+
Optional<Duration> nextExecution();
23+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.scheduler;
17+
18+
public interface CronResolverFactory {
19+
CronResolver parseClon(String cron);
20+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.scheduler;
17+
18+
import com.cronutils.model.Cron;
19+
import com.cronutils.model.time.ExecutionTime;
20+
import java.time.Duration;
21+
import java.time.ZonedDateTime;
22+
import java.util.Optional;
23+
24+
class CronUtilsResolver implements CronResolver {
25+
26+
private final ExecutionTime executionTime;
27+
28+
public CronUtilsResolver(Cron cron) {
29+
this.executionTime = ExecutionTime.forCron(cron);
30+
}
31+
32+
@Override
33+
public Optional<Duration> nextExecution() {
34+
return executionTime.timeToNextExecution(ZonedDateTime.now());
35+
}
36+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.scheduler;
17+
18+
import com.cronutils.model.CronType;
19+
import com.cronutils.model.definition.CronDefinitionBuilder;
20+
import com.cronutils.parser.CronParser;
21+
22+
class CronUtilsResolverFactory implements CronResolverFactory {
23+
24+
private final CronParser cronParser;
25+
26+
public CronUtilsResolverFactory() {
27+
this(CronType.UNIX);
28+
}
29+
30+
public CronUtilsResolverFactory(CronType type) {
31+
this.cronParser = new CronParser(CronDefinitionBuilder.instanceDefinitionFor(type));
32+
}
33+
34+
@Override
35+
public CronResolver parseClon(String cron) {
36+
return new CronUtilsResolver(cronParser.parse(cron));
37+
}
38+
}

0 commit comments

Comments
 (0)