From 58e20751e9896de4b0f258ef38167c2037a25c97 Mon Sep 17 00:00:00 2001 From: Anatoliy Kolodkin Date: Fri, 21 Nov 2025 15:04:13 -0600 Subject: [PATCH] feat(pubsub): add support for subscription names to enable multiple subscriptions per topic Introduces subscription name support across topic metadata, attributes, and routing logic to allow multiple distinct subscriptions to the same topic within a single application. Enhances topic attributes and subscription models with an optional subscription name to differentiate subscription handlers. Updates routing and subscription grouping to incorporate subscription names for precise subscription management. Adds example handlers and integration tests demonstrating multiple subscriptions to the same topic distinguished by subscription names. This improvement enables advanced pubsub scenarios where separate subscription handlers can independently process messages from the same topic without conflict. --- .../Controllers/SampleController.cs | 26 +++++++++++++++++++ .../ControllerSample/CustomTopicAttribute.cs | 3 +++ .../DaprEndpointRouteBuilderExtensions.cs | 14 +++++----- src/Dapr.AspNetCore/ITopicMetadata.cs | 7 +++++ src/Dapr.AspNetCore/Subscription.cs | 8 +++++- src/Dapr.AspNetCore/TopicAttribute.cs | 23 ++++++++++++---- .../CustomTopicAttribute.cs | 2 ++ .../DaprController.cs | 13 ++++++++++ .../SubscribeEndpointTest.cs | 5 +++- 9 files changed, 88 insertions(+), 13 deletions(-) diff --git a/examples/AspNetCore/ControllerSample/Controllers/SampleController.cs b/examples/AspNetCore/ControllerSample/Controllers/SampleController.cs index 195c5ea50..628e75782 100644 --- a/examples/AspNetCore/ControllerSample/Controllers/SampleController.cs +++ b/examples/AspNetCore/ControllerSample/Controllers/SampleController.cs @@ -294,4 +294,30 @@ public ActionResult ExampleCustomTopicMetadata(Transaction transaction) { return Ok(); } + + /// + /// Example demonstrating multiple subscriptions to the same topic using subscription names. + /// This handler processes deposits for accounting purposes. + /// + [Topic("pubsub", "multisub-deposit", subscriptionName: "deposit-accounting-subscription")] + [HttpPost("multisub/deposit/accounting")] + public ActionResult MultiSubDepositAccounting(Transaction transaction) + { + logger.LogInformation("Accounting handler: Processing deposit {Id} for amount {Amount}", + transaction.Id, transaction.Amount); + return Ok(new { handler = "accounting", transactionId = transaction.Id }); + } + + /// + /// Example demonstrating multiple subscriptions to the same topic using subscription names. + /// This handler processes deposits for notification purposes. + /// + [Topic("pubsub", "multisub-deposit", subscriptionName: "deposit-notification-subscription")] + [HttpPost("multisub/deposit/notifications")] + public ActionResult MultiSubDepositNotifications(Transaction transaction) + { + logger.LogInformation("Notification handler: Processing deposit {Id} for amount {Amount}", + transaction.Id, transaction.Amount); + return Ok(new { handler = "notifications", transactionId = transaction.Id }); + } } \ No newline at end of file diff --git a/examples/AspNetCore/ControllerSample/CustomTopicAttribute.cs b/examples/AspNetCore/ControllerSample/CustomTopicAttribute.cs index eb96ba894..ca366dfef 100644 --- a/examples/AspNetCore/ControllerSample/CustomTopicAttribute.cs +++ b/examples/AspNetCore/ControllerSample/CustomTopicAttribute.cs @@ -41,4 +41,7 @@ public CustomTopicAttribute(string pubsubName, string name) /// public int Priority { get; } + + /// + public string SubscriptionName { get; } } \ No newline at end of file diff --git a/src/Dapr.AspNetCore/DaprEndpointRouteBuilderExtensions.cs b/src/Dapr.AspNetCore/DaprEndpointRouteBuilderExtensions.cs index 9749a87d4..341249766 100644 --- a/src/Dapr.AspNetCore/DaprEndpointRouteBuilderExtensions.cs +++ b/src/Dapr.AspNetCore/DaprEndpointRouteBuilderExtensions.cs @@ -74,9 +74,9 @@ private static IEndpointConventionBuilder CreateSubscribeEndPoint(IEndpointRoute var originalTopicMetadata = e.Metadata.GetOrderedMetadata(); var bulkSubscribeMetadata = e.Metadata.GetOrderedMetadata(); - var subs = new List<(string PubsubName, string Name, string DeadLetterTopic, bool? EnableRawPayload, - string Match, int Priority, Dictionary OriginalTopicMetadata, - string MetadataSeparator, RoutePattern RoutePattern, DaprTopicBulkSubscribe bulkSubscribe)>(); + var subs = new List<(string PubsubName, string Name, string DeadLetterTopic, bool? EnableRawPayload, + string Match, int Priority, Dictionary OriginalTopicMetadata, + string MetadataSeparator, RoutePattern RoutePattern, DaprTopicBulkSubscribe bulkSubscribe, string SubscriptionName)>(); for (int i = 0; i < topicMetadata.Count(); i++) { @@ -109,13 +109,14 @@ private static IEndpointConventionBuilder CreateSubscribeEndPoint(IEndpointRoute .ToDictionary(m => m.Key, m => m.Select(c => c.Value).Distinct().ToArray()), (topicMetadata[i] as IOwnedOriginalTopicMetadata)?.MetadataSeparator, e.RoutePattern, - bulkSubscribe)); + bulkSubscribe, + topicMetadata[i].SubscriptionName)); } return subs; }) .Distinct() - .GroupBy(e => new { e.PubsubName, e.Name }) + .GroupBy(e => new { e.PubsubName, e.Name, e.SubscriptionName }) .Select(e => e.OrderBy(e => e.Priority)) .Select(e => { @@ -155,7 +156,8 @@ private static IEndpointConventionBuilder CreateSubscribeEndPoint(IEndpointRoute Topic = first.Name, PubsubName = first.PubsubName, Metadata = metadata.Count > 0 ? metadata : null, - BulkSubscribe = first.bulkSubscribe + BulkSubscribe = first.bulkSubscribe, + Name = first.SubscriptionName }; if (first.DeadLetterTopic != null) diff --git a/src/Dapr.AspNetCore/ITopicMetadata.cs b/src/Dapr.AspNetCore/ITopicMetadata.cs index 81e0459c5..277977b45 100644 --- a/src/Dapr.AspNetCore/ITopicMetadata.cs +++ b/src/Dapr.AspNetCore/ITopicMetadata.cs @@ -37,4 +37,11 @@ public interface ITopicMetadata /// The priority in which this rule should be evaluated (lower to higher). /// int Priority { get; } + + /// + /// Gets the subscription name. This is optional and allows multiple subscriptions + /// to the same topic within a single application. If not specified, the subscription + /// is identified by the combination of PubsubName and topic Name. + /// + string SubscriptionName { get; } } \ No newline at end of file diff --git a/src/Dapr.AspNetCore/Subscription.cs b/src/Dapr.AspNetCore/Subscription.cs index 7b0e127b4..9c978bbde 100644 --- a/src/Dapr.AspNetCore/Subscription.cs +++ b/src/Dapr.AspNetCore/Subscription.cs @@ -44,7 +44,7 @@ internal class Subscription /// Gets or sets the metadata. /// public Metadata Metadata { get; set; } - + /// /// Gets or sets the deadletter topic. /// @@ -54,6 +54,12 @@ internal class Subscription /// Gets or sets the bulk subscribe options. /// public DaprTopicBulkSubscribe BulkSubscribe { get; set; } + + /// + /// Gets or sets the subscription name. This is optional and allows multiple subscriptions + /// to the same topic within a single application. + /// + public string Name { get; set; } } /// diff --git a/src/Dapr.AspNetCore/TopicAttribute.cs b/src/Dapr.AspNetCore/TopicAttribute.cs index 1d1f7a1ee..6544845bb 100644 --- a/src/Dapr.AspNetCore/TopicAttribute.cs +++ b/src/Dapr.AspNetCore/TopicAttribute.cs @@ -28,7 +28,8 @@ public class TopicAttribute : Attribute, ITopicMetadata, IRawTopicMetadata, IOwn /// The topic name. /// The topic owned metadata ids. /// Separator to use for metadata. - public TopicAttribute(string pubsubName, string name, string[] ownedMetadatas = null, string metadataSeparator = null) + /// The subscription name (optional). Allows multiple subscriptions to the same topic. + public TopicAttribute(string pubsubName, string name, string[] ownedMetadatas = null, string metadataSeparator = null, string subscriptionName = null) { ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName)); ArgumentVerifier.ThrowIfNullOrEmpty(name, nameof(name)); @@ -37,6 +38,7 @@ public TopicAttribute(string pubsubName, string name, string[] ownedMetadatas = this.PubsubName = pubsubName; this.OwnedMetadatas = ownedMetadatas; this.MetadataSeparator = metadataSeparator; + this.SubscriptionName = subscriptionName; } /// @@ -47,7 +49,8 @@ public TopicAttribute(string pubsubName, string name, string[] ownedMetadatas = /// The enable/disable raw pay load flag. /// The topic owned metadata ids. /// Separator to use for metadata. - public TopicAttribute(string pubsubName, string name, bool enableRawPayload, string[] ownedMetadatas = null, string metadataSeparator = null) + /// The subscription name (optional). Allows multiple subscriptions to the same topic. + public TopicAttribute(string pubsubName, string name, bool enableRawPayload, string[] ownedMetadatas = null, string metadataSeparator = null, string subscriptionName = null) { ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName)); ArgumentVerifier.ThrowIfNullOrEmpty(name, nameof(name)); @@ -57,6 +60,7 @@ public TopicAttribute(string pubsubName, string name, bool enableRawPayload, str this.EnableRawPayload = enableRawPayload; this.OwnedMetadatas = ownedMetadatas; this.MetadataSeparator = metadataSeparator; + this.SubscriptionName = subscriptionName; } /// @@ -68,7 +72,8 @@ public TopicAttribute(string pubsubName, string name, bool enableRawPayload, str /// The priority of the rule (low-to-high values). /// The topic owned metadata ids. /// Separator to use for metadata. - public TopicAttribute(string pubsubName, string name, string match, int priority, string[] ownedMetadatas = null, string metadataSeparator = null) + /// The subscription name (optional). Allows multiple subscriptions to the same topic. + public TopicAttribute(string pubsubName, string name, string match, int priority, string[] ownedMetadatas = null, string metadataSeparator = null, string subscriptionName = null) { ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName)); ArgumentVerifier.ThrowIfNullOrEmpty(name, nameof(name)); @@ -79,6 +84,7 @@ public TopicAttribute(string pubsubName, string name, string match, int priority this.Priority = priority; this.OwnedMetadatas = ownedMetadatas; this.MetadataSeparator = metadataSeparator; + this.SubscriptionName = subscriptionName; } /// @@ -91,7 +97,8 @@ public TopicAttribute(string pubsubName, string name, string match, int priority /// The priority of the rule (low-to-high values). /// The topic owned metadata ids. /// Separator to use for metadata. - public TopicAttribute(string pubsubName, string name, bool enableRawPayload, string match, int priority, string[] ownedMetadatas = null, string metadataSeparator = null) + /// The subscription name (optional). Allows multiple subscriptions to the same topic. + public TopicAttribute(string pubsubName, string name, bool enableRawPayload, string match, int priority, string[] ownedMetadatas = null, string metadataSeparator = null, string subscriptionName = null) { ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName)); ArgumentVerifier.ThrowIfNullOrEmpty(name, nameof(name)); @@ -103,6 +110,7 @@ public TopicAttribute(string pubsubName, string name, bool enableRawPayload, str this.Priority = priority; this.OwnedMetadatas = ownedMetadatas; this.MetadataSeparator = metadataSeparator; + this.SubscriptionName = subscriptionName; } /// @@ -114,7 +122,8 @@ public TopicAttribute(string pubsubName, string name, bool enableRawPayload, str /// The enable/disable raw pay load flag. /// The topic owned metadata ids. /// Separator to use for metadata. - public TopicAttribute(string pubsubName, string name, string deadLetterTopic, bool enableRawPayload, string[] ownedMetadatas = null, string metadataSeparator = null) + /// The subscription name (optional). Allows multiple subscriptions to the same topic. + public TopicAttribute(string pubsubName, string name, string deadLetterTopic, bool enableRawPayload, string[] ownedMetadatas = null, string metadataSeparator = null, string subscriptionName = null) { ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName)); ArgumentVerifier.ThrowIfNullOrEmpty(name, nameof(name)); @@ -125,6 +134,7 @@ public TopicAttribute(string pubsubName, string name, string deadLetterTopic, bo this.EnableRawPayload = enableRawPayload; this.OwnedMetadatas = ownedMetadatas; this.MetadataSeparator = metadataSeparator; + this.SubscriptionName = subscriptionName; } /// @@ -150,4 +160,7 @@ public TopicAttribute(string pubsubName, string name, string deadLetterTopic, bo /// public string DeadLetterTopic { get; set; } + + /// + public string SubscriptionName { get; set; } } \ No newline at end of file diff --git a/test/Dapr.AspNetCore.IntegrationTest.App/CustomTopicAttribute.cs b/test/Dapr.AspNetCore.IntegrationTest.App/CustomTopicAttribute.cs index 1888ca36c..978313ab6 100644 --- a/test/Dapr.AspNetCore.IntegrationTest.App/CustomTopicAttribute.cs +++ b/test/Dapr.AspNetCore.IntegrationTest.App/CustomTopicAttribute.cs @@ -30,4 +30,6 @@ public CustomTopicAttribute(string pubsubName, string name) public new string Match { get; } public int Priority { get; } + + public string SubscriptionName { get; } } \ No newline at end of file diff --git a/test/Dapr.AspNetCore.IntegrationTest.App/DaprController.cs b/test/Dapr.AspNetCore.IntegrationTest.App/DaprController.cs index 31f31b017..5c5a9e08a 100644 --- a/test/Dapr.AspNetCore.IntegrationTest.App/DaprController.cs +++ b/test/Dapr.AspNetCore.IntegrationTest.App/DaprController.cs @@ -170,4 +170,17 @@ public ActionResult RequiresApiToken(UserInfo user) { return user; } + + // Test subscription names - multiple subscriptions to same topic + [Topic("pubsub", "H", subscriptionName: "subscription-h-1")] + [HttpPost("/H-Handler1")] + public void TopicHHandler1() + { + } + + [Topic("pubsub", "H", subscriptionName: "subscription-h-2")] + [HttpPost("/H-Handler2")] + public void TopicHHandler2() + { + } } \ No newline at end of file diff --git a/test/Dapr.AspNetCore.IntegrationTest/SubscribeEndpointTest.cs b/test/Dapr.AspNetCore.IntegrationTest/SubscribeEndpointTest.cs index e0fdaba6d..bcebfc071 100644 --- a/test/Dapr.AspNetCore.IntegrationTest/SubscribeEndpointTest.cs +++ b/test/Dapr.AspNetCore.IntegrationTest/SubscribeEndpointTest.cs @@ -40,7 +40,7 @@ public async Task SubscribeEndpoint_ReportsTopics() var json = await JsonSerializer.DeserializeAsync(stream); json.ValueKind.ShouldBe(JsonValueKind.Array); - json.GetArrayLength().ShouldBe(18); + json.GetArrayLength().ShouldBe(20); // Updated from 18 to 20 to account for 2 new subscription name tests var subscriptions = new List<(string PubsubName, string Topic, string Route, string rawPayload, string match, string metadata, string DeadLetterTopic, string bulkSubscribeMetadata)>(); @@ -131,6 +131,9 @@ public async Task SubscribeEndpoint_ReportsTopics() "{\"enabled\":true,\"maxMessagesCount\":500,\"maxAwaitDurationMs\":2000}")); subscriptions.ShouldContain(("pubsub", "splitMetadataTopicBuilder", "splitMetadataTopics", string.Empty, string.Empty, "n1=v1;n2=v1", string.Empty, String.Empty)); subscriptions.ShouldContain(("pubsub", "metadataseparatorbyemptytring", "topicmetadataseparatorattrbyemptytring", string.Empty, string.Empty, "n1=v1,", string.Empty, String.Empty)); + // Test subscription names - multiple subscriptions to same topic + subscriptions.ShouldContain(("pubsub", "H", "H-Handler1", string.Empty, string.Empty, string.Empty, string.Empty, String.Empty)); + subscriptions.ShouldContain(("pubsub", "H", "H-Handler2", string.Empty, string.Empty, string.Empty, string.Empty, String.Empty)); // Test priority route sorting var eTopic = subscriptions.FindAll(e => e.Topic == "E"); eTopic.Count.ShouldBe(3);