@@ -23,110 +23,110 @@ import (
2323)
2424
2525// NewTaskRunner creates a TaskRunner instance based on the task type.
26- func NewTaskRunner (taskName string , task model.Task , taskSupport TaskSupport ) (TaskRunner , error ) {
26+ func NewTaskRunner (taskName string , task model.Task , workflowDef * model. Workflow ) (TaskRunner , error ) {
2727 switch t := task .(type ) {
2828 case * model.SetTask :
29- return NewSetTaskRunner (taskName , t , taskSupport )
29+ return NewSetTaskRunner (taskName , t )
3030 case * model.RaiseTask :
31- return NewRaiseTaskRunner (taskName , t , taskSupport )
31+ return NewRaiseTaskRunner (taskName , t , workflowDef )
3232 case * model.DoTask :
33- return NewDoTaskRunner (t .Do , taskSupport )
33+ return NewDoTaskRunner (t .Do )
3434 case * model.ForTask :
35- return NewForTaskRunner (taskName , t , taskSupport )
35+ return NewForTaskRunner (taskName , t )
36+ case * model.CallHTTP :
37+ return NewCallHttpRunner (taskName , t )
3638 default :
3739 return nil , fmt .Errorf ("unsupported task type '%T' for task '%s'" , t , taskName )
3840 }
3941}
4042
41- func NewDoTaskRunner (taskList * model.TaskList , taskSupport TaskSupport ) (* DoTaskRunner , error ) {
43+ func NewDoTaskRunner (taskList * model.TaskList ) (* DoTaskRunner , error ) {
4244 return & DoTaskRunner {
43- TaskList : taskList ,
44- TaskSupport : taskSupport ,
45+ TaskList : taskList ,
4546 }, nil
4647}
4748
4849type DoTaskRunner struct {
49- TaskList * model.TaskList
50- TaskSupport TaskSupport
50+ TaskList * model.TaskList
5151}
5252
53- func (d * DoTaskRunner ) Run (input interface {}) (output interface {}, err error ) {
53+ func (d * DoTaskRunner ) Run (input interface {}, taskSupport TaskSupport ) (output interface {}, err error ) {
5454 if d .TaskList == nil {
5555 return input , nil
5656 }
57- return d .runTasks (input , d . TaskList )
57+ return d .runTasks (input , taskSupport )
5858}
5959
6060func (d * DoTaskRunner ) GetTaskName () string {
6161 return ""
6262}
6363
6464// runTasks runs all defined tasks sequentially.
65- func (d * DoTaskRunner ) runTasks (input interface {}, tasks * model. TaskList ) (output interface {}, err error ) {
65+ func (d * DoTaskRunner ) runTasks (input interface {}, taskSupport TaskSupport ) (output interface {}, err error ) {
6666 output = input
67- if tasks == nil {
67+ if d . TaskList == nil {
6868 return output , nil
6969 }
7070
7171 idx := 0
72- currentTask := (* tasks )[idx ]
72+ currentTask := (* d . TaskList )[idx ]
7373
7474 for currentTask != nil {
75- if err = d . TaskSupport .SetTaskDef (currentTask ); err != nil {
75+ if err = taskSupport .SetTaskDef (currentTask ); err != nil {
7676 return nil , err
7777 }
78- if err = d . TaskSupport .SetTaskReferenceFromName (currentTask .Key ); err != nil {
78+ if err = taskSupport .SetTaskReferenceFromName (currentTask .Key ); err != nil {
7979 return nil , err
8080 }
8181
82- if shouldRun , err := d .shouldRunTask (input , currentTask ); err != nil {
82+ if shouldRun , err := d .shouldRunTask (input , taskSupport , currentTask ); err != nil {
8383 return output , err
8484 } else if ! shouldRun {
85- idx , currentTask = tasks .Next (idx )
85+ idx , currentTask = d . TaskList .Next (idx )
8686 continue
8787 }
8888
89- d . TaskSupport .SetTaskStatus (currentTask .Key , ctx .PendingStatus )
89+ taskSupport .SetTaskStatus (currentTask .Key , ctx .PendingStatus )
9090
9191 // Check if this task is a SwitchTask and handle it
9292 if switchTask , ok := currentTask .Task .(* model.SwitchTask ); ok {
93- flowDirective , err := d .evaluateSwitchTask (input , currentTask .Key , switchTask )
93+ flowDirective , err := d .evaluateSwitchTask (input , taskSupport , currentTask .Key , switchTask )
9494 if err != nil {
95- d . TaskSupport .SetTaskStatus (currentTask .Key , ctx .FaultedStatus )
95+ taskSupport .SetTaskStatus (currentTask .Key , ctx .FaultedStatus )
9696 return output , err
9797 }
98- d . TaskSupport .SetTaskStatus (currentTask .Key , ctx .CompletedStatus )
98+ taskSupport .SetTaskStatus (currentTask .Key , ctx .CompletedStatus )
9999
100100 // Process FlowDirective: update idx/currentTask accordingly
101- idx , currentTask = tasks .KeyAndIndex (flowDirective .Value )
101+ idx , currentTask = d . TaskList .KeyAndIndex (flowDirective .Value )
102102 if currentTask == nil {
103103 return nil , fmt .Errorf ("flow directive target '%s' not found" , flowDirective .Value )
104104 }
105105 continue
106106 }
107107
108- runner , err := NewTaskRunner (currentTask .Key , currentTask .Task , d . TaskSupport )
108+ runner , err := NewTaskRunner (currentTask .Key , currentTask .Task , taskSupport . GetWorkflowDef () )
109109 if err != nil {
110110 return output , err
111111 }
112112
113- d . TaskSupport .SetTaskStatus (currentTask .Key , ctx .RunningStatus )
114- if output , err = d .runTask (input , runner , currentTask .Task .GetBase ()); err != nil {
115- d . TaskSupport .SetTaskStatus (currentTask .Key , ctx .FaultedStatus )
113+ taskSupport .SetTaskStatus (currentTask .Key , ctx .RunningStatus )
114+ if output , err = d .runTask (input , taskSupport , runner , currentTask .Task .GetBase ()); err != nil {
115+ taskSupport .SetTaskStatus (currentTask .Key , ctx .FaultedStatus )
116116 return output , err
117117 }
118118
119- d . TaskSupport .SetTaskStatus (currentTask .Key , ctx .CompletedStatus )
119+ taskSupport .SetTaskStatus (currentTask .Key , ctx .CompletedStatus )
120120 input = deepCloneValue (output )
121- idx , currentTask = tasks .Next (idx )
121+ idx , currentTask = d . TaskList .Next (idx )
122122 }
123123
124124 return output , nil
125125}
126126
127- func (d * DoTaskRunner ) shouldRunTask (input interface {}, task * model.TaskItem ) (bool , error ) {
127+ func (d * DoTaskRunner ) shouldRunTask (input interface {}, taskSupport TaskSupport , task * model.TaskItem ) (bool , error ) {
128128 if task .GetBase ().If != nil {
129- output , err := traverseAndEvaluateBool (task .GetBase ().If .String (), input , d . TaskSupport .GetContext ())
129+ output , err := traverseAndEvaluateBool (task .GetBase ().If .String (), input , taskSupport .GetContext ())
130130 if err != nil {
131131 return false , model .NewErrExpression (err , task .Key )
132132 }
@@ -135,15 +135,15 @@ func (d *DoTaskRunner) shouldRunTask(input interface{}, task *model.TaskItem) (b
135135 return true , nil
136136}
137137
138- func (d * DoTaskRunner ) evaluateSwitchTask (input interface {}, taskKey string , switchTask * model.SwitchTask ) (* model.FlowDirective , error ) {
138+ func (d * DoTaskRunner ) evaluateSwitchTask (input interface {}, taskSupport TaskSupport , taskKey string , switchTask * model.SwitchTask ) (* model.FlowDirective , error ) {
139139 var defaultThen * model.FlowDirective
140140 for _ , switchItem := range switchTask .Switch {
141141 for _ , switchCase := range switchItem {
142142 if switchCase .When == nil {
143143 defaultThen = switchCase .Then
144144 continue
145145 }
146- result , err := traverseAndEvaluateBool (model .NormalizeExpr (switchCase .When .String ()), input , d . TaskSupport .GetContext ())
146+ result , err := traverseAndEvaluateBool (model .NormalizeExpr (switchCase .When .String ()), input , taskSupport .GetContext ())
147147 if err != nil {
148148 return nil , model .NewErrExpression (err , taskKey )
149149 }
@@ -162,39 +162,39 @@ func (d *DoTaskRunner) evaluateSwitchTask(input interface{}, taskKey string, swi
162162}
163163
164164// runTask executes an individual task.
165- func (d * DoTaskRunner ) runTask (input interface {}, runner TaskRunner , task * model.TaskBase ) (output interface {}, err error ) {
165+ func (d * DoTaskRunner ) runTask (input interface {}, taskSupport TaskSupport , runner TaskRunner , task * model.TaskBase ) (output interface {}, err error ) {
166166 taskName := runner .GetTaskName ()
167167
168- d . TaskSupport .SetTaskStartedAt (time .Now ())
169- d . TaskSupport .SetTaskRawInput (input )
170- d . TaskSupport .SetTaskName (taskName )
168+ taskSupport .SetTaskStartedAt (time .Now ())
169+ taskSupport .SetTaskRawInput (input )
170+ taskSupport .SetTaskName (taskName )
171171
172172 if task .Input != nil {
173- if input , err = d .processTaskInput (task , input , taskName ); err != nil {
173+ if input , err = d .processTaskInput (task , input , taskSupport , taskName ); err != nil {
174174 return nil , err
175175 }
176176 }
177177
178- output , err = runner .Run (input )
178+ output , err = runner .Run (input , taskSupport )
179179 if err != nil {
180180 return nil , err
181181 }
182182
183- d . TaskSupport .SetTaskRawOutput (output )
183+ taskSupport .SetTaskRawOutput (output )
184184
185- if output , err = d .processTaskOutput (task , output , taskName ); err != nil {
185+ if output , err = d .processTaskOutput (task , output , taskSupport , taskName ); err != nil {
186186 return nil , err
187187 }
188188
189- if err = d .processTaskExport (task , output , taskName ); err != nil {
189+ if err = d .processTaskExport (task , output , taskSupport , taskName ); err != nil {
190190 return nil , err
191191 }
192192
193193 return output , nil
194194}
195195
196196// processTaskInput processes task input validation and transformation.
197- func (d * DoTaskRunner ) processTaskInput (task * model.TaskBase , taskInput interface {}, taskName string ) (output interface {}, err error ) {
197+ func (d * DoTaskRunner ) processTaskInput (task * model.TaskBase , taskInput interface {}, taskSupport TaskSupport , taskName string ) (output interface {}, err error ) {
198198 if task .Input == nil {
199199 return taskInput , nil
200200 }
@@ -203,20 +203,20 @@ func (d *DoTaskRunner) processTaskInput(task *model.TaskBase, taskInput interfac
203203 return nil , err
204204 }
205205
206- if output , err = traverseAndEvaluate (task .Input .From , taskInput , taskName , d . TaskSupport .GetContext ()); err != nil {
206+ if output , err = traverseAndEvaluate (task .Input .From , taskInput , taskName , taskSupport .GetContext ()); err != nil {
207207 return nil , err
208208 }
209209
210210 return output , nil
211211}
212212
213213// processTaskOutput processes task output validation and transformation.
214- func (d * DoTaskRunner ) processTaskOutput (task * model.TaskBase , taskOutput interface {}, taskName string ) (output interface {}, err error ) {
214+ func (d * DoTaskRunner ) processTaskOutput (task * model.TaskBase , taskOutput interface {}, taskSupport TaskSupport , taskName string ) (output interface {}, err error ) {
215215 if task .Output == nil {
216216 return taskOutput , nil
217217 }
218218
219- if output , err = traverseAndEvaluate (task .Output .As , taskOutput , taskName , d . TaskSupport .GetContext ()); err != nil {
219+ if output , err = traverseAndEvaluate (task .Output .As , taskOutput , taskName , taskSupport .GetContext ()); err != nil {
220220 return nil , err
221221 }
222222
@@ -227,12 +227,12 @@ func (d *DoTaskRunner) processTaskOutput(task *model.TaskBase, taskOutput interf
227227 return output , nil
228228}
229229
230- func (d * DoTaskRunner ) processTaskExport (task * model.TaskBase , taskOutput interface {}, taskName string ) (err error ) {
230+ func (d * DoTaskRunner ) processTaskExport (task * model.TaskBase , taskOutput interface {}, taskSupport TaskSupport , taskName string ) (err error ) {
231231 if task .Export == nil {
232232 return nil
233233 }
234234
235- output , err := traverseAndEvaluate (task .Export .As , taskOutput , taskName , d . TaskSupport .GetContext ())
235+ output , err := traverseAndEvaluate (task .Export .As , taskOutput , taskName , taskSupport .GetContext ())
236236 if err != nil {
237237 return err
238238 }
@@ -241,7 +241,7 @@ func (d *DoTaskRunner) processTaskExport(task *model.TaskBase, taskOutput interf
241241 return nil
242242 }
243243
244- d . TaskSupport .SetWorkflowInstanceCtx (output )
244+ taskSupport .SetWorkflowInstanceCtx (output )
245245
246246 return nil
247247}
0 commit comments