Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -294,4 +294,30 @@ public ActionResult<Account> ExampleCustomTopicMetadata(Transaction transaction)
{
return Ok();
}

/// <summary>
/// Example demonstrating multiple subscriptions to the same topic using subscription names.
/// This handler processes deposits for accounting purposes.
/// </summary>
[Topic("pubsub", "multisub-deposit", subscriptionName: "deposit-accounting-subscription")]
[HttpPost("multisub/deposit/accounting")]
public ActionResult<Account> MultiSubDepositAccounting(Transaction transaction)
{
logger.LogInformation("Accounting handler: Processing deposit {Id} for amount {Amount}",
transaction.Id, transaction.Amount);
return Ok(new { handler = "accounting", transactionId = transaction.Id });
}

/// <summary>
/// Example demonstrating multiple subscriptions to the same topic using subscription names.
/// This handler processes deposits for notification purposes.
/// </summary>
[Topic("pubsub", "multisub-deposit", subscriptionName: "deposit-notification-subscription")]
[HttpPost("multisub/deposit/notifications")]
public ActionResult<Account> MultiSubDepositNotifications(Transaction transaction)
{
logger.LogInformation("Notification handler: Processing deposit {Id} for amount {Amount}",
transaction.Id, transaction.Amount);
return Ok(new { handler = "notifications", transactionId = transaction.Id });
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,7 @@ public CustomTopicAttribute(string pubsubName, string name)

/// <inheritdoc/>
public int Priority { get; }

/// <inheritdoc/>
public string SubscriptionName { get; }
}
14 changes: 8 additions & 6 deletions src/Dapr.AspNetCore/DaprEndpointRouteBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ private static IEndpointConventionBuilder CreateSubscribeEndPoint(IEndpointRoute
var originalTopicMetadata = e.Metadata.GetOrderedMetadata<IOriginalTopicMetadata>();
var bulkSubscribeMetadata = e.Metadata.GetOrderedMetadata<IBulkSubscribeMetadata>();

var subs = new List<(string PubsubName, string Name, string DeadLetterTopic, bool? EnableRawPayload,
string Match, int Priority, Dictionary<string, string[]> OriginalTopicMetadata,
string MetadataSeparator, RoutePattern RoutePattern, DaprTopicBulkSubscribe bulkSubscribe)>();
var subs = new List<(string PubsubName, string Name, string DeadLetterTopic, bool? EnableRawPayload,
string Match, int Priority, Dictionary<string, string[]> OriginalTopicMetadata,
string MetadataSeparator, RoutePattern RoutePattern, DaprTopicBulkSubscribe bulkSubscribe, string SubscriptionName)>();

for (int i = 0; i < topicMetadata.Count(); i++)
{
Expand Down Expand Up @@ -109,13 +109,14 @@ private static IEndpointConventionBuilder CreateSubscribeEndPoint(IEndpointRoute
.ToDictionary(m => m.Key, m => m.Select(c => c.Value).Distinct().ToArray()),
(topicMetadata[i] as IOwnedOriginalTopicMetadata)?.MetadataSeparator,
e.RoutePattern,
bulkSubscribe));
bulkSubscribe,
topicMetadata[i].SubscriptionName));
}

return subs;
})
.Distinct()
.GroupBy(e => new { e.PubsubName, e.Name })
.GroupBy(e => new { e.PubsubName, e.Name, e.SubscriptionName })
.Select(e => e.OrderBy(e => e.Priority))
.Select(e =>
{
Expand Down Expand Up @@ -155,7 +156,8 @@ private static IEndpointConventionBuilder CreateSubscribeEndPoint(IEndpointRoute
Topic = first.Name,
PubsubName = first.PubsubName,
Metadata = metadata.Count > 0 ? metadata : null,
BulkSubscribe = first.bulkSubscribe
BulkSubscribe = first.bulkSubscribe,
Name = first.SubscriptionName
};

if (first.DeadLetterTopic != null)
Expand Down
7 changes: 7 additions & 0 deletions src/Dapr.AspNetCore/ITopicMetadata.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,11 @@ public interface ITopicMetadata
/// The priority in which this rule should be evaluated (lower to higher).
/// </summary>
int Priority { get; }

/// <summary>
/// Gets the subscription name. This is optional and allows multiple subscriptions
/// to the same topic within a single application. If not specified, the subscription
/// is identified by the combination of PubsubName and topic Name.
/// </summary>
string SubscriptionName { get; }
}
8 changes: 7 additions & 1 deletion src/Dapr.AspNetCore/Subscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ internal class Subscription
/// Gets or sets the metadata.
/// </summary>
public Metadata Metadata { get; set; }

/// <summary>
/// Gets or sets the deadletter topic.
/// </summary>
Expand All @@ -54,6 +54,12 @@ internal class Subscription
/// Gets or sets the bulk subscribe options.
/// </summary>
public DaprTopicBulkSubscribe BulkSubscribe { get; set; }

/// <summary>
/// Gets or sets the subscription name. This is optional and allows multiple subscriptions
/// to the same topic within a single application.
/// </summary>
public string Name { get; set; }
}

/// <summary>
Expand Down
23 changes: 18 additions & 5 deletions src/Dapr.AspNetCore/TopicAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public class TopicAttribute : Attribute, ITopicMetadata, IRawTopicMetadata, IOwn
/// <param name="name">The topic name.</param>
/// <param name="ownedMetadatas">The topic owned metadata ids.</param>
/// <param name="metadataSeparator">Separator to use for metadata.</param>
public TopicAttribute(string pubsubName, string name, string[] ownedMetadatas = null, string metadataSeparator = null)
/// <param name="subscriptionName">The subscription name (optional). Allows multiple subscriptions to the same topic.</param>
public TopicAttribute(string pubsubName, string name, string[] ownedMetadatas = null, string metadataSeparator = null, string subscriptionName = null)
{
ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName));
ArgumentVerifier.ThrowIfNullOrEmpty(name, nameof(name));
Expand All @@ -37,6 +38,7 @@ public TopicAttribute(string pubsubName, string name, string[] ownedMetadatas =
this.PubsubName = pubsubName;
this.OwnedMetadatas = ownedMetadatas;
this.MetadataSeparator = metadataSeparator;
this.SubscriptionName = subscriptionName;
}

/// <summary>
Expand All @@ -47,7 +49,8 @@ public TopicAttribute(string pubsubName, string name, string[] ownedMetadatas =
/// <param name="enableRawPayload">The enable/disable raw pay load flag.</param>
/// <param name="ownedMetadatas">The topic owned metadata ids.</param>
/// <param name="metadataSeparator">Separator to use for metadata.</param>
public TopicAttribute(string pubsubName, string name, bool enableRawPayload, string[] ownedMetadatas = null, string metadataSeparator = null)
/// <param name="subscriptionName">The subscription name (optional). Allows multiple subscriptions to the same topic.</param>
public TopicAttribute(string pubsubName, string name, bool enableRawPayload, string[] ownedMetadatas = null, string metadataSeparator = null, string subscriptionName = null)
{
ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName));
ArgumentVerifier.ThrowIfNullOrEmpty(name, nameof(name));
Expand All @@ -57,6 +60,7 @@ public TopicAttribute(string pubsubName, string name, bool enableRawPayload, str
this.EnableRawPayload = enableRawPayload;
this.OwnedMetadatas = ownedMetadatas;
this.MetadataSeparator = metadataSeparator;
this.SubscriptionName = subscriptionName;
}

/// <summary>
Expand All @@ -68,7 +72,8 @@ public TopicAttribute(string pubsubName, string name, bool enableRawPayload, str
/// <param name="priority">The priority of the rule (low-to-high values).</param>
/// <param name="ownedMetadatas">The topic owned metadata ids.</param>
/// <param name="metadataSeparator">Separator to use for metadata.</param>
public TopicAttribute(string pubsubName, string name, string match, int priority, string[] ownedMetadatas = null, string metadataSeparator = null)
/// <param name="subscriptionName">The subscription name (optional). Allows multiple subscriptions to the same topic.</param>
public TopicAttribute(string pubsubName, string name, string match, int priority, string[] ownedMetadatas = null, string metadataSeparator = null, string subscriptionName = null)
{
ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName));
ArgumentVerifier.ThrowIfNullOrEmpty(name, nameof(name));
Expand All @@ -79,6 +84,7 @@ public TopicAttribute(string pubsubName, string name, string match, int priority
this.Priority = priority;
this.OwnedMetadatas = ownedMetadatas;
this.MetadataSeparator = metadataSeparator;
this.SubscriptionName = subscriptionName;
}

/// <summary>
Expand All @@ -91,7 +97,8 @@ public TopicAttribute(string pubsubName, string name, string match, int priority
/// <param name="priority">The priority of the rule (low-to-high values).</param>
/// <param name="ownedMetadatas">The topic owned metadata ids.</param>
/// <param name="metadataSeparator">Separator to use for metadata.</param>
public TopicAttribute(string pubsubName, string name, bool enableRawPayload, string match, int priority, string[] ownedMetadatas = null, string metadataSeparator = null)
/// <param name="subscriptionName">The subscription name (optional). Allows multiple subscriptions to the same topic.</param>
public TopicAttribute(string pubsubName, string name, bool enableRawPayload, string match, int priority, string[] ownedMetadatas = null, string metadataSeparator = null, string subscriptionName = null)
{
ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName));
ArgumentVerifier.ThrowIfNullOrEmpty(name, nameof(name));
Expand All @@ -103,6 +110,7 @@ public TopicAttribute(string pubsubName, string name, bool enableRawPayload, str
this.Priority = priority;
this.OwnedMetadatas = ownedMetadatas;
this.MetadataSeparator = metadataSeparator;
this.SubscriptionName = subscriptionName;
}

/// <summary>
Expand All @@ -114,7 +122,8 @@ public TopicAttribute(string pubsubName, string name, bool enableRawPayload, str
/// <param name="enableRawPayload">The enable/disable raw pay load flag.</param>
/// <param name="ownedMetadatas">The topic owned metadata ids.</param>
/// <param name="metadataSeparator">Separator to use for metadata.</param>
public TopicAttribute(string pubsubName, string name, string deadLetterTopic, bool enableRawPayload, string[] ownedMetadatas = null, string metadataSeparator = null)
/// <param name="subscriptionName">The subscription name (optional). Allows multiple subscriptions to the same topic.</param>
public TopicAttribute(string pubsubName, string name, string deadLetterTopic, bool enableRawPayload, string[] ownedMetadatas = null, string metadataSeparator = null, string subscriptionName = null)
{
ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName));
ArgumentVerifier.ThrowIfNullOrEmpty(name, nameof(name));
Expand All @@ -125,6 +134,7 @@ public TopicAttribute(string pubsubName, string name, string deadLetterTopic, bo
this.EnableRawPayload = enableRawPayload;
this.OwnedMetadatas = ownedMetadatas;
this.MetadataSeparator = metadataSeparator;
this.SubscriptionName = subscriptionName;
}

/// <inheritdoc/>
Expand All @@ -150,4 +160,7 @@ public TopicAttribute(string pubsubName, string name, string deadLetterTopic, bo

/// <inheritdoc/>
public string DeadLetterTopic { get; set; }

/// <inheritdoc/>
public string SubscriptionName { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ public CustomTopicAttribute(string pubsubName, string name)
public new string Match { get; }

public int Priority { get; }

public string SubscriptionName { get; }
}
13 changes: 13 additions & 0 deletions test/Dapr.AspNetCore.IntegrationTest.App/DaprController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,17 @@ public ActionResult<UserInfo> RequiresApiToken(UserInfo user)
{
return user;
}

// Test subscription names - multiple subscriptions to same topic
[Topic("pubsub", "H", subscriptionName: "subscription-h-1")]
[HttpPost("/H-Handler1")]
public void TopicHHandler1()
{
}

[Topic("pubsub", "H", subscriptionName: "subscription-h-2")]
[HttpPost("/H-Handler2")]
public void TopicHHandler2()
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public async Task SubscribeEndpoint_ReportsTopics()
var json = await JsonSerializer.DeserializeAsync<JsonElement>(stream);

json.ValueKind.ShouldBe(JsonValueKind.Array);
json.GetArrayLength().ShouldBe(18);
json.GetArrayLength().ShouldBe(20); // Updated from 18 to 20 to account for 2 new subscription name tests

var subscriptions = new List<(string PubsubName, string Topic, string Route, string rawPayload,
string match, string metadata, string DeadLetterTopic, string bulkSubscribeMetadata)>();
Expand Down Expand Up @@ -131,6 +131,9 @@ public async Task SubscribeEndpoint_ReportsTopics()
"{\"enabled\":true,\"maxMessagesCount\":500,\"maxAwaitDurationMs\":2000}"));
subscriptions.ShouldContain(("pubsub", "splitMetadataTopicBuilder", "splitMetadataTopics", string.Empty, string.Empty, "n1=v1;n2=v1", string.Empty, String.Empty));
subscriptions.ShouldContain(("pubsub", "metadataseparatorbyemptytring", "topicmetadataseparatorattrbyemptytring", string.Empty, string.Empty, "n1=v1,", string.Empty, String.Empty));
// Test subscription names - multiple subscriptions to same topic
subscriptions.ShouldContain(("pubsub", "H", "H-Handler1", string.Empty, string.Empty, string.Empty, string.Empty, String.Empty));
subscriptions.ShouldContain(("pubsub", "H", "H-Handler2", string.Empty, string.Empty, string.Empty, string.Empty, String.Empty));
// Test priority route sorting
var eTopic = subscriptions.FindAll(e => e.Topic == "E");
eTopic.Count.ShouldBe(3);
Expand Down