Skip to content

Commit 361f7e5

Browse files
2465 - extract duplicated code to template method (+ required field validations should be part of try block)
1 parent 1660872 commit 361f7e5

File tree

8 files changed

+319
-315
lines changed

8 files changed

+319
-315
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2025 ByteChef
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.bytechef.atlas.coordinator.task.dispatcher;
18+
19+
import com.bytechef.atlas.coordinator.event.TaskExecutionErrorEvent;
20+
import com.bytechef.atlas.execution.domain.TaskExecution;
21+
import com.bytechef.error.ExecutionError;
22+
import java.util.Arrays;
23+
import org.apache.commons.lang3.exception.ExceptionUtils;
24+
import org.springframework.context.ApplicationEventPublisher;
25+
26+
/**
27+
* @author Matija Petanjek
28+
*/
29+
public abstract class ErrorHandlingTaskDispatcher implements TaskDispatcher<TaskExecution> {
30+
31+
private final ApplicationEventPublisher eventPublisher;
32+
33+
public ErrorHandlingTaskDispatcher(ApplicationEventPublisher eventPublisher) {
34+
this.eventPublisher = eventPublisher;
35+
}
36+
37+
public void dispatch(TaskExecution taskExecution) {
38+
try {
39+
doDispatch(taskExecution);
40+
} catch (Exception exception) {
41+
taskExecution.setError(
42+
new ExecutionError(exception.getMessage(), Arrays.asList(ExceptionUtils.getStackFrames(exception))));
43+
44+
eventPublisher.publishEvent(new TaskExecutionErrorEvent(taskExecution));
45+
}
46+
}
47+
48+
public abstract void doDispatch(TaskExecution taskExecution);
49+
}

server/libs/modules/task-dispatchers/branch/src/main/java/com/bytechef/task/dispatcher/branch/BranchTaskDispatcher.java

Lines changed: 44 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import com.bytechef.atlas.configuration.domain.Task;
2929
import com.bytechef.atlas.configuration.domain.WorkflowTask;
3030
import com.bytechef.atlas.coordinator.event.TaskExecutionCompleteEvent;
31-
import com.bytechef.atlas.coordinator.event.TaskExecutionErrorEvent;
31+
import com.bytechef.atlas.coordinator.task.dispatcher.ErrorHandlingTaskDispatcher;
3232
import com.bytechef.atlas.coordinator.task.dispatcher.TaskDispatcher;
3333
import com.bytechef.atlas.coordinator.task.dispatcher.TaskDispatcherResolver;
3434
import com.bytechef.atlas.execution.domain.Context.Classname;
@@ -37,18 +37,15 @@
3737
import com.bytechef.atlas.execution.service.TaskExecutionService;
3838
import com.bytechef.atlas.file.storage.TaskFileStorage;
3939
import com.bytechef.commons.util.MapUtils;
40-
import com.bytechef.error.ExecutionError;
4140
import com.bytechef.evaluator.Evaluator;
4241
import com.fasterxml.jackson.core.type.TypeReference;
4342
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
4443
import java.time.Instant;
45-
import java.util.Arrays;
4644
import java.util.Collections;
4745
import java.util.List;
4846
import java.util.Map;
4947
import java.util.Objects;
5048
import org.apache.commons.lang3.Validate;
51-
import org.apache.commons.lang3.exception.ExceptionUtils;
5249
import org.springframework.context.ApplicationEventPublisher;
5350

5451
/**
@@ -57,7 +54,7 @@
5754
* @since Jun 3, 2017
5855
* @see BranchTaskCompletionHandler
5956
*/
60-
public class BranchTaskDispatcher implements TaskDispatcher<TaskExecution>, TaskDispatcherResolver {
57+
public class BranchTaskDispatcher extends ErrorHandlingTaskDispatcher implements TaskDispatcherResolver {
6158

6259
private final ContextService contextService;
6360
private final Evaluator evaluator;
@@ -72,6 +69,8 @@ public BranchTaskDispatcher(
7269
TaskDispatcher<? super Task> taskDispatcher, TaskExecutionService taskExecutionService,
7370
TaskFileStorage taskFileStorage) {
7471

72+
super(eventPublisher);
73+
7574
this.evaluator = evaluator;
7675
this.contextService = contextService;
7776
this.eventPublisher = eventPublisher;
@@ -81,69 +80,64 @@ public BranchTaskDispatcher(
8180
}
8281

8382
@Override
84-
public void dispatch(TaskExecution taskExecution) {
83+
public void doDispatch(TaskExecution taskExecution) {
8584
taskExecution.setStartDate(Instant.now());
8685
taskExecution.setStatus(TaskExecution.Status.STARTED);
8786

8887
taskExecution = taskExecutionService.update(taskExecution);
8988

9089
Map<String, ?> selectedCase = resolveCase(taskExecution);
9190

92-
try {
93-
if (selectedCase.containsKey(TASKS)) {
94-
List<WorkflowTask> subWorkflowTasks = MapUtils.getList(
95-
selectedCase, TASKS, WorkflowTask.class, Collections.emptyList());
91+
if (selectedCase.containsKey(TASKS)) {
92+
List<WorkflowTask> subWorkflowTasks = MapUtils.getList(
93+
selectedCase, TASKS, WorkflowTask.class, Collections.emptyList());
9694

97-
if (subWorkflowTasks.isEmpty()) {
98-
taskExecution.setStartDate(Instant.now());
99-
taskExecution.setEndDate(Instant.now());
100-
taskExecution.setExecutionTime(0);
95+
if (subWorkflowTasks.isEmpty()) {
96+
taskExecution.setStartDate(Instant.now());
97+
taskExecution.setEndDate(Instant.now());
98+
taskExecution.setExecutionTime(0);
10199

102-
eventPublisher.publishEvent(new TaskExecutionCompleteEvent(taskExecution));
103-
} else {
104-
WorkflowTask subWorkflowTask = subWorkflowTasks.get(0);
100+
eventPublisher.publishEvent(new TaskExecutionCompleteEvent(taskExecution));
101+
} else {
102+
WorkflowTask subWorkflowTask = subWorkflowTasks.get(0);
105103

106-
TaskExecution subTaskExecution = TaskExecution.builder()
107-
.jobId(taskExecution.getJobId())
108-
.parentId(taskExecution.getId())
109-
.priority(taskExecution.getPriority())
110-
.taskNumber(1)
111-
.workflowTask(subWorkflowTask)
112-
.build();
104+
TaskExecution subTaskExecution = TaskExecution.builder()
105+
.jobId(taskExecution.getJobId())
106+
.parentId(taskExecution.getId())
107+
.priority(taskExecution.getPriority())
108+
.taskNumber(1)
109+
.workflowTask(subWorkflowTask)
110+
.build();
113111

114-
Map<String, ?> context = taskFileStorage.readContextValue(
115-
contextService.peek(Validate.notNull(taskExecution.getId(), "id"), Classname.TASK_EXECUTION));
112+
Map<String, ?> context = taskFileStorage.readContextValue(
113+
contextService.peek(Validate.notNull(taskExecution.getId(), "id"), Classname.TASK_EXECUTION));
116114

117-
subTaskExecution.evaluate(context, evaluator);
115+
subTaskExecution.evaluate(context, evaluator);
118116

119-
subTaskExecution = taskExecutionService.create(subTaskExecution);
117+
subTaskExecution = taskExecutionService.create(subTaskExecution);
120118

121-
contextService.push(
122-
Validate.notNull(subTaskExecution.getId(), "id"), Classname.TASK_EXECUTION,
123-
taskFileStorage.storeContextValue(
124-
Validate.notNull(subTaskExecution.getId(), "id"), Classname.TASK_EXECUTION, context));
119+
contextService.push(
120+
Validate.notNull(subTaskExecution.getId(), "id"), Classname.TASK_EXECUTION,
121+
taskFileStorage.storeContextValue(
122+
Validate.notNull(subTaskExecution.getId(), "id"), Classname.TASK_EXECUTION, context));
125123

126-
taskDispatcher.dispatch(subTaskExecution);
127-
}
128-
} else {
129-
taskExecution.setStartDate(Instant.now());
130-
taskExecution.setEndDate(Instant.now());
131-
taskExecution.setExecutionTime(0);
132-
// TODO check, it seems wrong
133-
134-
if (selectedCase.get("value") != null) {
135-
taskExecution.setOutput(
136-
taskFileStorage.storeTaskExecutionOutput(
137-
Validate.notNull(taskExecution.getId(), "id"), selectedCase.get("value")));
138-
}
139-
140-
eventPublisher.publishEvent(new TaskExecutionCompleteEvent(taskExecution));
124+
taskDispatcher.dispatch(subTaskExecution);
125+
}
126+
} else {
127+
taskExecution.setStartDate(Instant.now());
128+
taskExecution.setEndDate(Instant.now());
129+
taskExecution.setExecutionTime(0);
130+
// TODO check, it seems wrong
131+
132+
if (selectedCase.get("value") != null) {
133+
taskExecution.setOutput(
134+
taskFileStorage.storeTaskExecutionOutput(
135+
Validate.notNull(taskExecution.getId(), "id"), selectedCase.get("value")));
141136
}
142-
} catch (Exception e) {
143-
taskExecution.setError(new ExecutionError(e.getMessage(), Arrays.asList(ExceptionUtils.getStackFrames(e))));
144137

145-
eventPublisher.publishEvent(new TaskExecutionErrorEvent(taskExecution));
138+
eventPublisher.publishEvent(new TaskExecutionCompleteEvent(taskExecution));
146139
}
140+
147141
}
148142

149143
@Override

server/libs/modules/task-dispatchers/condition/src/main/java/com/bytechef/task/dispatcher/condition/ConditionTaskDispatcher.java

Lines changed: 45 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import com.bytechef.atlas.configuration.domain.Task;
2424
import com.bytechef.atlas.configuration.domain.WorkflowTask;
2525
import com.bytechef.atlas.coordinator.event.TaskExecutionCompleteEvent;
26-
import com.bytechef.atlas.coordinator.event.TaskExecutionErrorEvent;
26+
import com.bytechef.atlas.coordinator.task.dispatcher.ErrorHandlingTaskDispatcher;
2727
import com.bytechef.atlas.coordinator.task.dispatcher.TaskDispatcher;
2828
import com.bytechef.atlas.coordinator.task.dispatcher.TaskDispatcherResolver;
2929
import com.bytechef.atlas.execution.domain.Context;
@@ -32,25 +32,22 @@
3232
import com.bytechef.atlas.execution.service.TaskExecutionService;
3333
import com.bytechef.atlas.file.storage.TaskFileStorage;
3434
import com.bytechef.commons.util.MapUtils;
35-
import com.bytechef.error.ExecutionError;
3635
import com.bytechef.evaluator.Evaluator;
3736
import com.bytechef.task.dispatcher.condition.util.ConditionTaskUtils;
3837
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
3938
import java.time.Instant;
40-
import java.util.Arrays;
4139
import java.util.Collections;
4240
import java.util.List;
4341
import java.util.Map;
4442
import java.util.Objects;
4543
import org.apache.commons.lang3.Validate;
46-
import org.apache.commons.lang3.exception.ExceptionUtils;
4744
import org.springframework.context.ApplicationEventPublisher;
4845

4946
/**
5047
* @author Ivica Cardic
5148
* @author Matija Petanjek
5249
*/
53-
public class ConditionTaskDispatcher implements TaskDispatcher<TaskExecution>, TaskDispatcherResolver {
50+
public class ConditionTaskDispatcher extends ErrorHandlingTaskDispatcher implements TaskDispatcherResolver {
5451

5552
private final ContextService contextService;
5653
private final Evaluator evaluator;
@@ -65,6 +62,8 @@ public ConditionTaskDispatcher(
6562
TaskDispatcher<? super Task> taskDispatcher, TaskExecutionService taskExecutionService,
6663
TaskFileStorage taskFileStorage) {
6764

65+
super(eventPublisher);
66+
6867
this.contextService = contextService;
6968
this.evaluator = evaluator;
7069
this.eventPublisher = eventPublisher;
@@ -74,60 +73,55 @@ public ConditionTaskDispatcher(
7473
}
7574

7675
@Override
77-
public void dispatch(TaskExecution taskExecution) {
76+
public void doDispatch(TaskExecution taskExecution) {
7877
taskExecution.setStartDate(Instant.now());
7978
taskExecution.setStatus(TaskExecution.Status.STARTED);
8079

8180
taskExecution = taskExecutionService.update(taskExecution);
8281

8382
List<WorkflowTask> subWorkflowTasks;
8483

85-
try {
86-
if (ConditionTaskUtils.resolveCase(taskExecution)) {
87-
subWorkflowTasks = MapUtils.getList(
88-
taskExecution.getParameters(), CASE_TRUE, WorkflowTask.class, Collections.emptyList());
89-
} else {
90-
subWorkflowTasks = MapUtils.getList(
91-
taskExecution.getParameters(), CASE_FALSE, WorkflowTask.class, Collections.emptyList());
92-
}
93-
94-
if (!subWorkflowTasks.isEmpty()) {
95-
WorkflowTask subWorkflowTask = subWorkflowTasks.get(0);
96-
97-
TaskExecution subTaskExecution = TaskExecution.builder()
98-
.jobId(taskExecution.getJobId())
99-
.parentId(taskExecution.getId())
100-
.priority(taskExecution.getPriority())
101-
.taskNumber(1)
102-
.workflowTask(subWorkflowTask)
103-
.build();
104-
105-
Map<String, ?> context = taskFileStorage.readContextValue(
106-
contextService.peek(Validate.notNull(taskExecution.getId(), "id"),
107-
Context.Classname.TASK_EXECUTION));
108-
109-
subTaskExecution.evaluate(context, evaluator);
110-
111-
subTaskExecution = taskExecutionService.create(subTaskExecution);
112-
113-
contextService.push(
114-
Validate.notNull(subTaskExecution.getId(), "id"), Context.Classname.TASK_EXECUTION,
115-
taskFileStorage.storeContextValue(
116-
Validate.notNull(subTaskExecution.getId(), "id"), Context.Classname.TASK_EXECUTION, context));
117-
118-
taskDispatcher.dispatch(subTaskExecution);
119-
} else {
120-
taskExecution.setStartDate(Instant.now());
121-
taskExecution.setEndDate(Instant.now());
122-
taskExecution.setExecutionTime(0);
123-
124-
eventPublisher.publishEvent(new TaskExecutionCompleteEvent(taskExecution));
125-
}
126-
} catch (Exception e) {
127-
taskExecution.setError(new ExecutionError(e.getMessage(), Arrays.asList(ExceptionUtils.getStackFrames(e))));
128-
129-
eventPublisher.publishEvent(new TaskExecutionErrorEvent(taskExecution));
84+
if (ConditionTaskUtils.resolveCase(taskExecution)) {
85+
subWorkflowTasks = MapUtils.getList(
86+
taskExecution.getParameters(), CASE_TRUE, WorkflowTask.class, Collections.emptyList());
87+
} else {
88+
subWorkflowTasks = MapUtils.getList(
89+
taskExecution.getParameters(), CASE_FALSE, WorkflowTask.class, Collections.emptyList());
13090
}
91+
92+
if (!subWorkflowTasks.isEmpty()) {
93+
WorkflowTask subWorkflowTask = subWorkflowTasks.get(0);
94+
95+
TaskExecution subTaskExecution = TaskExecution.builder()
96+
.jobId(taskExecution.getJobId())
97+
.parentId(taskExecution.getId())
98+
.priority(taskExecution.getPriority())
99+
.taskNumber(1)
100+
.workflowTask(subWorkflowTask)
101+
.build();
102+
103+
Map<String, ?> context = taskFileStorage.readContextValue(
104+
contextService.peek(Validate.notNull(taskExecution.getId(), "id"),
105+
Context.Classname.TASK_EXECUTION));
106+
107+
subTaskExecution.evaluate(context, evaluator);
108+
109+
subTaskExecution = taskExecutionService.create(subTaskExecution);
110+
111+
contextService.push(
112+
Validate.notNull(subTaskExecution.getId(), "id"), Context.Classname.TASK_EXECUTION,
113+
taskFileStorage.storeContextValue(
114+
Validate.notNull(subTaskExecution.getId(), "id"), Context.Classname.TASK_EXECUTION, context));
115+
116+
taskDispatcher.dispatch(subTaskExecution);
117+
} else {
118+
taskExecution.setStartDate(Instant.now());
119+
taskExecution.setEndDate(Instant.now());
120+
taskExecution.setExecutionTime(0);
121+
122+
eventPublisher.publishEvent(new TaskExecutionCompleteEvent(taskExecution));
123+
}
124+
131125
}
132126

133127
@Override

0 commit comments

Comments
 (0)