1111// See the License for the specific language governing permissions and
1212// limitations under the License.
1313
14+ using Neuroglia . Data . Infrastructure . ResourceOriented ;
1415using Neuroglia . Data . Infrastructure . Services ;
1516
1617namespace Synapse . Operator . Services ;
@@ -23,8 +24,9 @@ namespace Synapse.Operator.Services;
2324/// <param name="controllerOptions">The service used to access the current <see cref="IOptions{TOptions}"/></param>
2425/// <param name="repository">The service used to manage <see cref="IResource"/>s</param>
2526/// <param name="operatorController">The service used to access the current <see cref="Resources.Operator"/></param>
27+ /// <param name="workflowController">The service used to access all monitored <see cref="Workflow"/>s</param>
2628/// <param name="documents">The <see cref="IRepository"/> used to manage <see cref="Document"/>s</param>
27- public class WorkflowInstanceController ( IServiceProvider serviceProvider , ILoggerFactory loggerFactory , IOptions < ResourceControllerOptions < WorkflowInstance > > controllerOptions , IResourceRepository repository , IOperatorController operatorController , IRepository < Document , string > documents )
29+ public class WorkflowInstanceController ( IServiceProvider serviceProvider , ILoggerFactory loggerFactory , IOptions < ResourceControllerOptions < WorkflowInstance > > controllerOptions , IResourceRepository repository , IOperatorController operatorController , IWorkflowController workflowController , IRepository < Document , string > documents )
2830 : ResourceController < WorkflowInstance > ( loggerFactory , controllerOptions , repository )
2931{
3032
@@ -38,6 +40,11 @@ public class WorkflowInstanceController(IServiceProvider serviceProvider, ILogge
3840 /// </summary>
3941 protected IResourceMonitor < Resources . Operator > Operator => operatorController . Operator ;
4042
43+ /// <summary>
44+ /// Gets a dictionary containing all monitored <see cref="Workflow"/>s
45+ /// </summary>
46+ protected IReadOnlyDictionary < string , Workflow > Workflows => workflowController . Workflows ;
47+
4148 /// <summary>
4249 /// Gets the <see cref="IRepository"/> used to manage <see cref="Document"/>s
4350 /// </summary>
@@ -87,7 +94,8 @@ protected virtual async Task<WorkflowInstanceHandler> CreateWorkflowInstanceHand
8794 protected virtual async Task < bool > TryClaimAsync ( WorkflowInstance resource , CancellationToken cancellationToken )
8895 {
8996 ArgumentNullException . ThrowIfNull ( resource ) ;
90- if ( resource . Metadata . Labels != null && resource . Metadata . Labels . TryGetValue ( SynapseDefaults . Resources . Labels . Operator , out var operatorQualifiedName ) ) return operatorQualifiedName == this . Operator . Resource . GetQualifiedName ( ) ;
97+ var isClaimable = this . IsWorkflowInstanceClaimable ( resource ) ;
98+ if ( isClaimable . HasValue ) return isClaimable . Value ;
9199 try
92100 {
93101 var originalResource = resource . Clone ( ) ;
@@ -112,7 +120,8 @@ protected virtual async Task<bool> TryClaimAsync(WorkflowInstance resource, Canc
112120 protected virtual async Task < bool > TryReleaseAsync ( WorkflowInstance resource , CancellationToken cancellationToken )
113121 {
114122 ArgumentNullException . ThrowIfNull ( resource ) ;
115- if ( resource . Metadata . Labels != null && resource . Metadata . Labels . TryGetValue ( SynapseDefaults . Resources . Labels . Operator , out var operatorQualifiedName ) ) return operatorQualifiedName == this . Operator . Resource . GetQualifiedName ( ) ;
123+ var isClaimable = this . IsWorkflowInstanceClaimable ( resource ) ;
124+ if ( isClaimable . HasValue ) return isClaimable . Value ;
116125 try
117126 {
118127 var originalResource = resource . Clone ( ) ;
@@ -205,6 +214,20 @@ protected override async Task OnResourceDeletedAsync(WorkflowInstance workflowIn
205214 /// <returns>A new awaitable <see cref="Task"/></returns>
206215 protected virtual Task OnResourceSelectorChangedAsync ( IDictionary < string , string > ? selector ) => this . ReconcileAsync ( this . CancellationTokenSource . Token ) ;
207216
217+ /// <summary>
218+ /// Determines whether or not the specified <see cref="WorkflowInstance"/> can be claimed by the current <see cref="Resources.Operator"/>
219+ /// </summary>
220+ /// <param name="workflowInstance">The <see cref="WorkflowInstance"/> to check</param>
221+ /// <returns>A boolean indicating whether or not the specified <see cref="WorkflowInstance"/> can be claimed by the current <see cref="Resources.Operator"/></returns>
222+ protected virtual bool ? IsWorkflowInstanceClaimable ( WorkflowInstance workflowInstance )
223+ {
224+ ArgumentNullException . ThrowIfNull ( workflowInstance ) ;
225+ if ( workflowInstance . Metadata . Labels != null && workflowInstance . Metadata . Labels . TryGetValue ( SynapseDefaults . Resources . Labels . Operator , out var operatorQualifiedName ) ) return operatorQualifiedName == this . Operator . Resource . GetQualifiedName ( ) ;
226+ if ( this . Workflows . TryGetValue ( this . GetResourceCacheKey ( workflowInstance . Spec . Definition . Name , workflowInstance . Spec . Definition . Namespace ) , out var workflow ) && workflow != null
227+ && workflow . Metadata . Labels != null && workflow . Metadata . Labels . TryGetValue ( SynapseDefaults . Resources . Labels . Operator , out operatorQualifiedName ) ) return operatorQualifiedName == this . Operator . Resource . GetQualifiedName ( ) ;
228+ return null ;
229+ }
230+
208231 /// <inheritdoc/>
209232 protected override async ValueTask DisposeAsync ( bool disposing )
210233 {
0 commit comments