@@ -55,7 +55,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
5555 finally
5656 {
5757 WorkflowActivity . Enrich ( result ) ;
58- await _persistenceStore . PersistWorkflow ( workflow , result . Subscriptions , cancellationToken ) ;
58+ await _persistenceStore . PersistWorkflow ( workflow , result . Subscriptions ) ;
5959 await QueueProvider . QueueWork ( itemId , QueueType . Index ) ;
6060 _greylist . Remove ( $ "wf:{ itemId } ") ;
6161 }
@@ -68,10 +68,10 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
6868 {
6969 foreach ( var sub in result . Subscriptions )
7070 {
71- await TryProcessSubscription ( sub , _persistenceStore , cancellationToken ) ;
71+ await TryProcessSubscription ( sub , _persistenceStore ) ;
7272 }
7373
74- await _persistenceStore . PersistErrors ( result . Errors , cancellationToken ) ;
74+ await _persistenceStore . PersistErrors ( result . Errors ) ;
7575
7676 if ( ( workflow . Status == WorkflowStatus . Runnable ) && workflow . NextExecution . HasValue )
7777 {
@@ -98,24 +98,28 @@ await _persistenceStore.ScheduleCommand(new ScheduledCommand()
9898
9999 }
100100
101- private async Task TryProcessSubscription ( EventSubscription subscription , IPersistenceProvider persistenceStore , CancellationToken cancellationToken )
101+ private async Task TryProcessSubscription ( EventSubscription subscription , IPersistenceProvider persistenceStore )
102102 {
103+ //TODO: move to own class
104+ Logger . LogDebug ( "Subscribing to event {0} {1} for workflow {2} step {3}" , subscription . EventName , subscription . EventKey , subscription . WorkflowId , subscription . StepId ) ;
105+
106+ await persistenceStore . CreateEventSubscription ( subscription , cancellationToken ) ;
103107 if ( subscription . EventName != Event . EventTypeActivity )
104108 {
105- var events = await persistenceStore . GetEvents ( subscription . EventName , subscription . EventKey , subscription . SubscribeAsOf , cancellationToken ) ;
109+ var events = await persistenceStore . GetEvents ( subscription . EventName , subscription . EventKey , subscription . SubscribeAsOf ) ;
106110
107111 foreach ( var evt in events )
108112 {
109113 var eventKey = $ "evt:{ evt } ";
110114 bool acquiredLock = false ;
111115 try
112116 {
113- acquiredLock = await _lockProvider . AcquireLock ( eventKey , cancellationToken ) ;
117+ acquiredLock = await _lockProvider . AcquireLock ( eventKey , CancellationToken . None ) ;
114118 int attempt = 0 ;
115119 while ( ! acquiredLock && attempt < 10 )
116120 {
117- await Task . Delay ( Options . IdleTime , cancellationToken ) ;
118- acquiredLock = await _lockProvider . AcquireLock ( eventKey , cancellationToken ) ;
121+ await Task . Delay ( Options . IdleTime ) ;
122+ acquiredLock = await _lockProvider . AcquireLock ( eventKey , CancellationToken . None ) ;
119123
120124 attempt ++ ;
121125 }
@@ -127,7 +131,7 @@ private async Task TryProcessSubscription(EventSubscription subscription, IPersi
127131 else
128132 {
129133 _greylist . Remove ( eventKey ) ;
130- await persistenceStore . MarkEventUnprocessed ( evt , cancellationToken ) ;
134+ await persistenceStore . MarkEventUnprocessed ( evt ) ;
131135 await QueueProvider . QueueWork ( evt , QueueType . Event ) ;
132136 }
133137 }
0 commit comments