@@ -65,6 +65,7 @@ use crate::{
6565use anyhow:: { anyhow, bail, Context } ;
6666use app_data:: AppData ;
6767use futures:: { future:: BoxFuture , FutureExt , StreamExt , TryFutureExt , TryStreamExt } ;
68+ use serde:: Serialize ;
6869use std:: {
6970 any:: { Any , TypeId } ,
7071 cell:: RefCell ,
@@ -147,7 +148,7 @@ struct WorkflowData {
147148 activation_chan : UnboundedSender < WorkflowActivation > ,
148149}
149150
150- struct WorkflowFutureHandle < F : Future < Output = Result < WorkflowResult < ( ) > , JoinError > > > {
151+ struct WorkflowFutureHandle < F : Future < Output = Result < WorkflowResult < Payload > , JoinError > > > {
151152 join_handle : F ,
152153 run_id : String ,
153154}
@@ -193,10 +194,10 @@ impl Worker {
193194
194195 /// Register a Workflow function to invoke when the Worker is asked to run a workflow of
195196 /// `workflow_type`
196- pub fn register_wf < F : Into < WorkflowFunction > > (
197+ pub fn register_wf (
197198 & mut self ,
198199 workflow_type : impl Into < String > ,
199- wf_function : F ,
200+ wf_function : impl Into < WorkflowFunction > ,
200201 ) {
201202 self . workflow_half
202203 . workflow_fns
@@ -383,7 +384,9 @@ impl WorkflowHalf {
383384 activation : WorkflowActivation ,
384385 completions_tx : & UnboundedSender < WorkflowActivationCompletion > ,
385386 ) -> Result <
386- Option < WorkflowFutureHandle < impl Future < Output = Result < WorkflowResult < ( ) > , JoinError > > > > ,
387+ Option <
388+ WorkflowFutureHandle < impl Future < Output = Result < WorkflowResult < Payload > , JoinError > > > ,
389+ > ,
387390 anyhow:: Error ,
388391 > {
389392 let mut res = None ;
@@ -694,17 +697,21 @@ struct CommandSubscribeChildWorkflowCompletion {
694697 unblocker : oneshot:: Sender < UnblockEvent > ,
695698}
696699
697- type WfFunc = dyn Fn ( WfContext ) -> BoxFuture < ' static , WorkflowResult < ( ) > > + Send + Sync + ' static ;
700+ type WfFunc = dyn Fn ( WfContext ) -> BoxFuture < ' static , Result < WfExitValue < Payload > , anyhow:: Error > >
701+ + Send
702+ + Sync
703+ + ' static ;
698704
699705/// The user's async function / workflow code
700706pub struct WorkflowFunction {
701707 wf_func : Box < WfFunc > ,
702708}
703709
704- impl < F , Fut > From < F > for WorkflowFunction
710+ impl < F , Fut , O > From < F > for WorkflowFunction
705711where
706712 F : Fn ( WfContext ) -> Fut + Send + Sync + ' static ,
707- Fut : Future < Output = WorkflowResult < ( ) > > + Send + ' static ,
713+ Fut : Future < Output = Result < WfExitValue < O > , anyhow:: Error > > + Send + ' static ,
714+ O : Serialize + Debug ,
708715{
709716 fn from ( wf_func : F ) -> Self {
710717 Self :: new ( wf_func)
@@ -713,13 +720,27 @@ where
713720
714721impl WorkflowFunction {
715722 /// Build a workflow function from a closure or function pointer which accepts a [WfContext]
716- pub fn new < F , Fut > ( wf_func : F ) -> Self
723+ pub fn new < F , Fut , O > ( f : F ) -> Self
717724 where
718725 F : Fn ( WfContext ) -> Fut + Send + Sync + ' static ,
719- Fut : Future < Output = WorkflowResult < ( ) > > + Send + ' static ,
726+ Fut : Future < Output = Result < WfExitValue < O > , anyhow:: Error > > + Send + ' static ,
727+ O : Serialize + Debug ,
720728 {
721729 Self {
722- wf_func : Box :: new ( move |ctx : WfContext | wf_func ( ctx) . boxed ( ) ) ,
730+ wf_func : Box :: new ( move |ctx : WfContext | {
731+ ( f) ( ctx)
732+ . map ( |r| {
733+ r. and_then ( |r| {
734+ Ok ( match r {
735+ WfExitValue :: ContinueAsNew ( b) => WfExitValue :: ContinueAsNew ( b) ,
736+ WfExitValue :: Cancelled => WfExitValue :: Cancelled ,
737+ WfExitValue :: Evicted => WfExitValue :: Evicted ,
738+ WfExitValue :: Normal ( o) => WfExitValue :: Normal ( o. as_json_payload ( ) ?) ,
739+ } )
740+ } )
741+ } )
742+ . boxed ( )
743+ } ) ,
723744 }
724745 }
725746}
0 commit comments