diff --git a/src/DurableTask.SqlServer.AzureFunctions/SqlDurabilityProviderFactory.cs b/src/DurableTask.SqlServer.AzureFunctions/SqlDurabilityProviderFactory.cs index 0510f16..3a4a4a2 100644 --- a/src/DurableTask.SqlServer.AzureFunctions/SqlDurabilityProviderFactory.cs +++ b/src/DurableTask.SqlServer.AzureFunctions/SqlDurabilityProviderFactory.cs @@ -22,6 +22,8 @@ class SqlDurabilityProviderFactory : IDurabilityProviderFactory readonly ILoggerFactory loggerFactory; readonly IConnectionInfoResolver connectionInfoResolver; + readonly bool useSeparateQueueForEntityWorkItems = false; + SqlDurabilityOptions? defaultOptions; SqlDurabilityProvider? defaultProvider; @@ -37,11 +39,22 @@ class SqlDurabilityProviderFactory : IDurabilityProviderFactory public SqlDurabilityProviderFactory( IOptions extensionOptions, ILoggerFactory loggerFactory, - IConnectionInfoResolver connectionInfoResolver) + IConnectionInfoResolver connectionInfoResolver, +#pragma warning disable CS0612 // Type or member is obsolete + IPlatformInformation platformInfo) +#pragma warning restore CS0612 // Type or member is obsolete { this.extensionOptions = extensionOptions?.Value ?? throw new ArgumentNullException(nameof(extensionOptions)); this.loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory)); this.connectionInfoResolver = connectionInfoResolver ?? throw new ArgumentNullException(nameof(connectionInfoResolver)); + + WorkerRuntimeType runtimeType = platformInfo.GetWorkerRuntimeType(); + if (runtimeType == WorkerRuntimeType.DotNetIsolated || + runtimeType == WorkerRuntimeType.Java || + runtimeType == WorkerRuntimeType.Custom) + { + this.useSeparateQueueForEntityWorkItems = true; + } } // Called by the Durable trigger binding infrastructure @@ -85,9 +98,11 @@ public DurabilityProvider GetDurabilityProvider(DurableClientAttribute attribute SqlOrchestrationService GetOrchestrationService(SqlDurabilityOptions clientOptions) { - return new (clientOptions.GetOrchestrationServiceSettings( + SqlOrchestrationServiceSettings sqlOrchestrationServiceSettings = clientOptions.GetOrchestrationServiceSettings( this.extensionOptions, - this.connectionInfoResolver)); + this.connectionInfoResolver); + sqlOrchestrationServiceSettings.UseSeparateQueueForEntityWorkItems = this.useSeparateQueueForEntityWorkItems; + return new(sqlOrchestrationServiceSettings); } static string GetDurabilityProviderKey(DurableClientAttribute attribute) diff --git a/src/DurableTask.SqlServer/EntitySqlBackendQueries.cs b/src/DurableTask.SqlServer/EntitySqlBackendQueries.cs new file mode 100644 index 0000000..719baeb --- /dev/null +++ b/src/DurableTask.SqlServer/EntitySqlBackendQueries.cs @@ -0,0 +1,227 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace DurableTask.SqlServer +{ + using System; + using System.Threading; + using System.Threading.Tasks; + using System.Linq; + using DurableTask.Core; + using DurableTask.Core.Entities; + using System.Collections.Generic; + using System.Diagnostics; + + class EntitySqlBackendQueries : EntityBackendQueries + { + readonly SqlOrchestrationService orchestrationService; + + static TimeSpan timeLimitForCleanEntityStorageLoop = TimeSpan.FromSeconds(5); + + public EntitySqlBackendQueries( + SqlOrchestrationService orchestrationService) + { + this.orchestrationService = orchestrationService; + } + + public async override Task GetEntityAsync( + EntityId id, + bool includeState = false, + bool includeStateless = false, + CancellationToken cancellation = default) + { + OrchestrationState? state = (await this.orchestrationService.GetOrchestrationStateAsync(id.ToString(), allExecutions: false)).FirstOrDefault(); + return this.GetEntityMetadata(state, includeStateless, includeState); + } + + public async override Task QueryEntitiesAsync(EntityQuery filter, CancellationToken cancellation) + { + int pageNumber = 0; + if (!string.IsNullOrEmpty(filter.ContinuationToken) && !int.TryParse(filter.ContinuationToken, out pageNumber)) + { + throw new ArgumentException($"Invalid continuation token {filter.ContinuationToken}"); + } + + int retrievedResults = 0; + IEnumerable allResults = Array.Empty(); + var stopwatch = new Stopwatch(); + stopwatch.Start(); + do + { + SqlOrchestrationQuery entityInstancesQuery = new SqlOrchestrationQuery() + { + PageSize = filter.PageSize.GetValueOrDefault(100), + PageNumber = pageNumber, + InstanceIdPrefix = filter.InstanceIdStartsWith, + CreatedTimeFrom = filter.LastModifiedFrom.GetValueOrDefault(DateTime.MinValue), + CreatedTimeTo = filter.LastModifiedTo.GetValueOrDefault(DateTime.MaxValue), + FetchInput = filter.IncludeState, + }; + IReadOnlyCollection results = await this.orchestrationService.GetManyOrchestrationsAsync(entityInstancesQuery, cancellation); + allResults = allResults.Concat(results); + pageNumber++; + + retrievedResults = results.Count; + if (retrievedResults == 0) + { + pageNumber = -1; + } + } while (retrievedResults > 0 && stopwatch.ElapsedMilliseconds <= 100); + + IEnumerable entities = allResults.Select(result => this.GetEntityMetadata(result, filter.IncludeTransient, filter.IncludeState)) + .OfType(); + + return new EntityQueryResult() + { + Results = entities, + ContinuationToken = pageNumber < 0 ? null : pageNumber.ToString() + }; + } + + public async override Task CleanEntityStorageAsync(CleanEntityStorageRequest request = default, CancellationToken cancellation = default) + { + DateTime now = DateTime.UtcNow; + int emptyEntitiesRemoved = 0; + int orphanedLocksReleased = 0; + int pageNumber = 0; + if (!string.IsNullOrEmpty(request.ContinuationToken) && !int.TryParse(request.ContinuationToken, out pageNumber)) + { + throw new ArgumentException($"Invalid continuation token {request.ContinuationToken}"); + } + + int retrievedResults = 0; + IEnumerable allResults = Array.Empty(); + var stopwatch = new Stopwatch(); + stopwatch.Start(); + do + { + SqlOrchestrationQuery entityInstancesQuery = new SqlOrchestrationQuery() + { + PageSize = 100, + PageNumber = pageNumber, + InstanceIdPrefix = "@", + CreatedTimeFrom = DateTime.MinValue, + CreatedTimeTo = DateTime.MaxValue, + FetchInput = true, + }; + + IReadOnlyCollection page = await this.orchestrationService.GetManyOrchestrationsAsync(entityInstancesQuery, cancellation); + + pageNumber++; + retrievedResults = page.Count; + if (retrievedResults == 0) + { + pageNumber = -1; + break; + } + + var tasks = new List(); + IEnumerable emptyEntityIds = new List(); + + foreach (OrchestrationState state in page) + { + EntityStatus? status = ClientEntityHelpers.GetEntityStatus(state.Status); + if (status != null) + { + if (request.ReleaseOrphanedLocks && status.LockedBy != null) + { + tasks.Add(CheckForOrphanedLockAndFixIt(state, status.LockedBy)); + } + + if (request.RemoveEmptyEntities) + { + bool isEmptyEntity = !status.EntityExists && status.LockedBy == null && status.BacklogQueueSize == 0; + bool safeToRemoveWithoutBreakingMessageSorterLogic = + now - state.LastUpdatedTime > this.orchestrationService.EntityBackendProperties.EntityMessageReorderWindow; + if (isEmptyEntity && safeToRemoveWithoutBreakingMessageSorterLogic) + { + emptyEntityIds.Append(state.OrchestrationInstance.InstanceId); + orphanedLocksReleased++; + } + } + } + } + + async Task CheckForOrphanedLockAndFixIt(OrchestrationState state, string lockOwner) + { + OrchestrationState? ownerState + = (await this.orchestrationService.GetOrchestrationStateAsync(lockOwner, allExecutions: false)).FirstOrDefault(); + + bool OrchestrationIsRunning(OrchestrationStatus? status) + => status != null && (status == OrchestrationStatus.Running || status == OrchestrationStatus.Suspended); + + if (!OrchestrationIsRunning(ownerState?.OrchestrationStatus)) + { + // the owner is not a running orchestration. Send a lock release. + EntityMessageEvent eventToSend = ClientEntityHelpers.EmitUnlockForOrphanedLock(state.OrchestrationInstance, lockOwner); + await this.orchestrationService.SendTaskOrchestrationMessageAsync(eventToSend.AsTaskMessage()); + Interlocked.Increment(ref orphanedLocksReleased); + } + } + + await this.orchestrationService.PurgeOrchestrationHistoryAsync(emptyEntityIds); + + } while (retrievedResults > 0 && stopwatch.Elapsed <= timeLimitForCleanEntityStorageLoop); + + return new CleanEntityStorageResult() + { + EmptyEntitiesRemoved = emptyEntitiesRemoved, + OrphanedLocksReleased = orphanedLocksReleased, + ContinuationToken = pageNumber < 0 ? null : pageNumber.ToString() + }; + } + + EntityMetadata? GetEntityMetadata(OrchestrationState? state, bool includeTransient, bool includeState) + { + if (state == null) + { + return null; + } + + if (!includeState) + { + if (!includeTransient) + { + // it is possible that this entity was logically deleted even though its orchestration was not purged yet. + // we can check this efficiently (i.e. without deserializing anything) by looking at just the custom status + if (!EntityStatus.TestEntityExists(state.Status)) + { + return null; + } + } + + EntityStatus? status = ClientEntityHelpers.GetEntityStatus(state.Status); + + return new EntityMetadata() + { + EntityId = EntityId.FromString(state.OrchestrationInstance.InstanceId), + LastModifiedTime = state.CreatedTime, + BacklogQueueSize = status?.BacklogQueueSize ?? 0, + LockedBy = status?.LockedBy, + SerializedState = null, // we were instructed to not include the state + }; + } + else + { + // return the result to the user + if (!includeTransient && state.Input == null) + { + return null; + } + else + { + EntityStatus? status = ClientEntityHelpers.GetEntityStatus(state.Status); + + return new EntityMetadata() + { + EntityId = EntityId.FromString(state.OrchestrationInstance.InstanceId), + LastModifiedTime = state.CreatedTime, + BacklogQueueSize = status?.BacklogQueueSize ?? 0, + LockedBy = status?.LockedBy, + SerializedState = state.Input, + }; + } + } + } + } +} diff --git a/src/DurableTask.SqlServer/Scripts/logic.sql b/src/DurableTask.SqlServer/Scripts/logic.sql index 9ea7895..254b912 100644 --- a/src/DurableTask.SqlServer/Scripts/logic.sql +++ b/src/DurableTask.SqlServer/Scripts/logic.sql @@ -627,7 +627,9 @@ GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__._LockNextOrchestration @BatchSize int, @LockedBy varchar(100), - @LockExpiration datetime2 + @LockExpiration datetime2, + -- Orchestration type: NULL = any, 0 = orchestration, 1 = entity + @OrchestrationType BIT = NULL AS BEGIN DECLARE @now datetime2 = SYSUTCDATETIME() @@ -662,7 +664,11 @@ BEGIN WHERE I.TaskHub = @TaskHub AND (I.[LockExpiration] IS NULL OR I.[LockExpiration] < @now) AND - (E.[VisibleTime] IS NULL OR E.[VisibleTime] < @now) + (E.[VisibleTime] IS NULL OR E.[VisibleTime] < @now) AND + (@OrchestrationType IS NULL OR + (@OrchestrationType = 0 AND I.[InstanceID] NOT LIKE '@%@%') OR + (@OrchestrationType = 1 AND I.[InstanceID] LIKE '@%@%') + ) -- Result #1: The list of new events to fetch. -- IMPORTANT: DO NOT CHANGE THE ORDER OF RETURNED COLUMNS! diff --git a/src/DurableTask.SqlServer/SqlOrchestrationService.cs b/src/DurableTask.SqlServer/SqlOrchestrationService.cs index 3dfe161..73ba7a2 100644 --- a/src/DurableTask.SqlServer/SqlOrchestrationService.cs +++ b/src/DurableTask.SqlServer/SqlOrchestrationService.cs @@ -14,6 +14,7 @@ namespace DurableTask.SqlServer using System.Threading.Tasks; using DurableTask.Core; using DurableTask.Core.Common; + using DurableTask.Core.Entities; using DurableTask.Core.Exceptions; using DurableTask.Core.History; using DurableTask.Core.Query; @@ -53,6 +54,19 @@ public SqlOrchestrationService(SqlOrchestrationServiceSettings? settings) public override int MaxConcurrentTaskActivityWorkItems => this.settings.MaxConcurrentActivities; + public override EntityBackendProperties EntityBackendProperties + => new EntityBackendProperties() + { + EntityMessageReorderWindow = TimeSpan.FromMinutes(this.settings.EntityMessageReorderWindowInMinutes), + MaxEntityOperationBatchSize = this.settings.MaxEntityOperationBatchSize, + MaxConcurrentTaskEntityWorkItems = this.settings.MaxConcurrentTaskEntityWorkItems, + SupportsImplicitEntityDeletion = false, // not supported by this backend + MaximumSignalDelayTime = TimeSpan.MaxValue, + UseSeparateQueueForEntityWorkItems = this.settings.UseSeparateQueueForEntityWorkItems, + }; + + public override EntityBackendQueries? EntityBackendQueries => new EntitySqlBackendQueries(this); + static SqlOrchestrationServiceSettings? ValidateSettings(SqlOrchestrationServiceSettings? settings) { if (settings != null) @@ -112,9 +126,40 @@ public override Task DeleteAsync(bool deleteInstanceStore) return this.dbManager.DeleteSchemaAsync(); } + public override async Task LockNextOrchestrationWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken) + { +#pragma warning disable CS8603 // Possible null reference return. Need to update base signature in IEntityOrchestrationService + return await this.LockNextWorkItemAsync( + receiveTimeout, + cancellationToken, + OrchestrationFilterType.OrchestrationsOnly); +# pragma warning restore CS8603 + } + + public override async Task LockNextEntityWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken) + { +#pragma warning disable CS8603 // Possible null reference return. Need to update base signature in IEntityOrchestrationService + return await this.LockNextWorkItemAsync( + receiveTimeout, + cancellationToken, + OrchestrationFilterType.EntitiesOnly); +# pragma warning restore CS8603 + } + public override async Task LockNextTaskOrchestrationWorkItemAsync( TimeSpan receiveTimeout, CancellationToken cancellationToken) + { + return await this.LockNextWorkItemAsync( + receiveTimeout, + cancellationToken, + OrchestrationFilterType.OrchestrationsOnly); + } + + async Task LockNextWorkItemAsync( + TimeSpan receiveTimeout, + CancellationToken cancellationToken, + OrchestrationFilterType orchestrationFilterType = OrchestrationFilterType.All) { bool isWaiting = false; Stopwatch stopwatch = Stopwatch.StartNew(); @@ -129,8 +174,16 @@ public override Task DeleteAsync(bool deleteInstanceStore) command.Parameters.Add("@BatchSize", SqlDbType.Int).Value = batchSize; command.Parameters.Add("@LockedBy", SqlDbType.VarChar, 100).Value = this.lockedByValue; command.Parameters.Add("@LockExpiration", SqlDbType.DateTime2).Value = lockExpiration; + if (orchestrationFilterType == OrchestrationFilterType.OrchestrationsOnly) + { + command.Parameters.Add("@OrchestrationType", SqlDbType.Int).Value = 0; + } + else if (orchestrationFilterType == OrchestrationFilterType.EntitiesOnly) + { + command.Parameters.Add("@OrchestrationType", SqlDbType.Int).Value = 1; + } - DbDataReader reader; + DbDataReader reader; try { @@ -724,7 +777,7 @@ public override async Task PurgeInstanceStateAsync(PurgeInstanceFil FetchInput = false, FetchOutput = false, }; - + if (purgeInstanceFilter.CreatedTimeTo != null) { purgeQuery.CreatedTimeTo = purgeInstanceFilter.CreatedTimeTo.Value; @@ -734,9 +787,9 @@ public override async Task PurgeInstanceStateAsync(PurgeInstanceFil { purgeQuery.StatusFilter = new HashSet(purgeInstanceFilter.RuntimeStatus); } - + IReadOnlyCollection results = await this.GetManyOrchestrationsAsync(purgeQuery, CancellationToken.None); - + IEnumerable instanceIds = results.Select(r => r.OrchestrationInstance.InstanceId); int purgedInstanceCount = await this.PurgeOrchestrationHistoryAsync(instanceIds); return new PurgeResult(purgedInstanceCount); @@ -779,7 +832,7 @@ public override async Task GetOrchestrationWithQueryAs } IReadOnlyCollection results = await this.GetManyOrchestrationsAsync(sqlOrchestrationQuery, cancellationToken); - string? continuationToken = + string? continuationToken = results.Count == sqlOrchestrationQuery.PageSize ? (sqlOrchestrationQuery.PageNumber + 1).ToString() : null; return new OrchestrationQueryResult(results, continuationToken); } @@ -919,5 +972,12 @@ public ExtendedActivityWorkItem(TaskScheduledEvent scheduledEvent) public TaskScheduledEvent ScheduledEvent { get; } } + + enum OrchestrationFilterType + { + All, + EntitiesOnly, + OrchestrationsOnly + } } } diff --git a/src/DurableTask.SqlServer/SqlOrchestrationServiceSettings.cs b/src/DurableTask.SqlServer/SqlOrchestrationServiceSettings.cs index 048a723..b8b7702 100644 --- a/src/DurableTask.SqlServer/SqlOrchestrationServiceSettings.cs +++ b/src/DurableTask.SqlServer/SqlOrchestrationServiceSettings.cs @@ -93,6 +93,13 @@ public SqlOrchestrationServiceSettings(string connectionString, string? taskHubN [JsonProperty("maxActiveOrchestrations")] public int MaxActiveOrchestrations { get; set; } = Environment.ProcessorCount; + /// + /// Gets or sets the maximum number of entity operation batches that can be processed concurrently on a single node. + /// The default value is 100. + /// + [JsonProperty("maxConcurrentTaskEntityWorkItems")] + public int MaxConcurrentTaskEntityWorkItems { get; set; } = 100; + /// /// Gets or sets the minimum interval to poll for orchestrations. /// Polling interval increases when no orchestrations or activities are found. @@ -141,6 +148,35 @@ public SqlOrchestrationServiceSettings(string connectionString, string? taskHubN [JsonProperty("deltaBackoffActivityPollingInterval")] public TimeSpan DeltaBackoffActivityPollingInterval { get; set; } = TimeSpan.FromMilliseconds(50); + /// + /// Gets or sets the limit on the number of entity operations that should be processed as a single batch. + /// A null value indicates that no particular limit should be enforced. + /// + /// + /// Limiting the batch size can help to avoid timeouts in execution environments that impose time limitations on work items. + /// If set to 1, batching is disabled, and each operation executes as a separate work item. + /// + /// + /// A positive integer, or null. + /// + [JsonProperty("maxEntityOperationBatchSize")] + public int? MaxEntityOperationBatchSize { get; set; } = null; + + /// + /// Gets or sets the time window within which entity messages get deduplicated and reordered. + /// If set to zero, there is no sorting or deduplication, and all messages are just passed through. + /// + [JsonProperty("entityMessageReorderWindowInMinutes")] + public int EntityMessageReorderWindowInMinutes { get; set; } = 30; + + /// + /// Whether to use separate work item queues for entities and orchestrators. + /// This defaults to false, to avoid issues when using this provider from code that does not support separate dispatch. + /// Consumers that require separate dispatch (such as the new out-of-proc v2 SDKs) must set this to true. + /// + [JsonProperty("useSeparateQueueForEntityWorkItems")] + public bool UseSeparateQueueForEntityWorkItems { get; set; } = false; + /// /// Gets or sets a flag indicating whether the database should be automatically created if it does not exist. /// diff --git a/src/DurableTask.SqlServer/Utils/OrchestrationServiceBase.cs b/src/DurableTask.SqlServer/Utils/OrchestrationServiceBase.cs index 3ec3efe..48bbba2 100644 --- a/src/DurableTask.SqlServer/Utils/OrchestrationServiceBase.cs +++ b/src/DurableTask.SqlServer/Utils/OrchestrationServiceBase.cs @@ -9,10 +9,12 @@ namespace DurableTask.SqlServer.Utils using System.Threading; using System.Threading.Tasks; using DurableTask.Core; + using DurableTask.Core.Entities; using DurableTask.Core.Query; public abstract class OrchestrationServiceBase : IOrchestrationService, + IEntityOrchestrationService, IOrchestrationServiceClient, IOrchestrationServiceQueryClient, IOrchestrationServicePurgeClient @@ -43,6 +45,12 @@ public virtual int MaxConcurrentTaskActivityWorkItems public virtual BehaviorOnContinueAsNew EventBehaviourForContinueAsNew => BehaviorOnContinueAsNew.Carryover; + public virtual EntityBackendProperties? EntityBackendProperties + => null; + + public virtual EntityBackendQueries? EntityBackendQueries + => null; + public virtual Task CreateAsync() => this.CreateAsync(recreateInstanceStore: false); @@ -163,5 +171,9 @@ public abstract Task PurgeOrchestrationHistoryAsync( public abstract Task GetOrchestrationWithQueryAsync( OrchestrationQuery query, CancellationToken cancellationToken); + + public abstract Task LockNextOrchestrationWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken); + + public abstract Task LockNextEntityWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken); } } diff --git a/src/common.props b/src/common.props index a6e6463..7f63f5c 100644 --- a/src/common.props +++ b/src/common.props @@ -16,8 +16,8 @@ 1 - 5 - 2 + 6 + 0 $(MajorVersion).$(MinorVersion).$(PatchVersion) $(MajorVersion).$(MinorVersion).0.0 diff --git a/test/DurableTask.SqlServer.Tests/Integration/DatabaseManagement.cs b/test/DurableTask.SqlServer.Tests/Integration/DatabaseManagement.cs index 045ccf7..00bf82c 100644 --- a/test/DurableTask.SqlServer.Tests/Integration/DatabaseManagement.cs +++ b/test/DurableTask.SqlServer.Tests/Integration/DatabaseManagement.cs @@ -504,8 +504,8 @@ async Task ValidateDatabaseSchemaAsync(TestDatabase database, string schemaName database.ConnectionString, schemaName); Assert.Equal(1, currentSchemaVersion.Major); - Assert.Equal(5, currentSchemaVersion.Minor); - Assert.Equal(2, currentSchemaVersion.Patch); + Assert.Equal(6, currentSchemaVersion.Minor); + Assert.Equal(0, currentSchemaVersion.Patch); } sealed class TestDatabase : IDisposable