From abcd5ad7cae18eb5008d82a40a36a4a676c962c9 Mon Sep 17 00:00:00 2001 From: Yogesh Kumar Date: Thu, 16 Oct 2025 11:29:53 -0400 Subject: [PATCH 1/6] Stream resumability support with inline dictionary as datastore in StreamableHttpHandler --- .../AspNetCoreMcpServer.csproj | 1 - samples/AspNetCoreMcpServer/Program.cs | 8 +- .../Tools/CollectUserInformationTool.cs | 143 ++++++++++++++++++ .../StreamableHttpHandler.cs | 50 +++++- .../Server/SseWriter.cs | 36 ++++- .../Server/StreamableHttpPostTransport.cs | 13 +- .../Server/StreamableHttpServerTransport.cs | 8 +- 7 files changed, 241 insertions(+), 18 deletions(-) create mode 100644 samples/AspNetCoreMcpServer/Tools/CollectUserInformationTool.cs diff --git a/samples/AspNetCoreMcpServer/AspNetCoreMcpServer.csproj b/samples/AspNetCoreMcpServer/AspNetCoreMcpServer.csproj index 59ab49828..5a23275ef 100644 --- a/samples/AspNetCoreMcpServer/AspNetCoreMcpServer.csproj +++ b/samples/AspNetCoreMcpServer/AspNetCoreMcpServer.csproj @@ -4,7 +4,6 @@ net9.0 enable enable - true diff --git a/samples/AspNetCoreMcpServer/Program.cs b/samples/AspNetCoreMcpServer/Program.cs index 96f89bffa..05f9b9b6a 100644 --- a/samples/AspNetCoreMcpServer/Program.cs +++ b/samples/AspNetCoreMcpServer/Program.cs @@ -1,15 +1,15 @@ +using AspNetCoreMcpServer.Resources; +using AspNetCoreMcpServer.Tools; using OpenTelemetry; using OpenTelemetry.Metrics; using OpenTelemetry.Trace; -using AspNetCoreMcpServer.Tools; -using AspNetCoreMcpServer.Resources; using System.Net.Http.Headers; var builder = WebApplication.CreateBuilder(args); builder.Services.AddMcpServer() .WithHttpTransport() .WithTools() - .WithTools() + .WithTools() // this tool collect user information through elicitation .WithTools() .WithResources(); @@ -32,6 +32,6 @@ var app = builder.Build(); -app.MapMcp(); +app.MapMcp("/mcp"); app.Run(); diff --git a/samples/AspNetCoreMcpServer/Tools/CollectUserInformationTool.cs b/samples/AspNetCoreMcpServer/Tools/CollectUserInformationTool.cs new file mode 100644 index 000000000..e5d9b56b8 --- /dev/null +++ b/samples/AspNetCoreMcpServer/Tools/CollectUserInformationTool.cs @@ -0,0 +1,143 @@ +using ModelContextProtocol; +using ModelContextProtocol.Protocol; +using ModelContextProtocol.Server; +using System.ComponentModel; +using System.Text.Json; + +namespace AspNetCoreMcpServer.Tools; + +[McpServerToolType] +public sealed class CollectUserInformationTool +{ + public enum InfoType + { + contact, + preferences, + feedback + } + + [McpServerTool(Name = "collect-user-info"), Description("A tool that collects user information through elicitation")] + public static async Task ElicitationEcho(McpServer thisServer, [Description("Type of information to collect")] InfoType infoType) + { + ElicitRequestParams elicitRequestParams; + switch (infoType) + { + case InfoType.contact: + elicitRequestParams = new ElicitRequestParams() + { + Message = "Please provide your contact information", + RequestedSchema = new ElicitRequestParams.RequestSchema + { + Properties = new Dictionary() + { + ["name"] = new ElicitRequestParams.StringSchema + { + Title = "Full name", + Description = "Your full name", + }, + ["email"] = new ElicitRequestParams.StringSchema + { + Title = "Email address", + Description = "Your email address", + Format = "email", + }, + ["phone"] = new ElicitRequestParams.StringSchema + { + Title = "Phone number", + Description = "Your phone number (optional)", + } + }, + Required = new List { "name", "email" } + } + }; + break; + + case InfoType.preferences: + elicitRequestParams = new ElicitRequestParams() + { + Message = "Please set your preferences", + RequestedSchema = new ElicitRequestParams.RequestSchema + { + Properties = new Dictionary() + { + ["theme"] = new ElicitRequestParams.EnumSchema + { + Title = "Theme", + Description = "Choose your preferred theme", + Enum = new List { "light", "dark", "auto" }, + EnumNames = new List { "Light", "Dark", "Auto" } + }, + ["notifications"] = new ElicitRequestParams.BooleanSchema + { + Title = "Enable notifications", + Description = "Would you like to receive notifications?", + Default = true, + }, + ["frequency"] = new ElicitRequestParams.EnumSchema + { + Title = "Notification frequency", + Description = "How often would you like notifications?", + Enum = new List { "daily", "weekly", "monthly" }, + EnumNames = new List { "Daily", "Weekly", "Monthly" } + } + }, + Required = new List { "theme" } + } + }; + + break; + + case InfoType.feedback: + elicitRequestParams = new ElicitRequestParams() + { + Message = "Please provide your feedback", + RequestedSchema = new ElicitRequestParams.RequestSchema + { + Properties = new Dictionary() + { + ["rating"] = new ElicitRequestParams.NumberSchema + { + Title = "Rating", + Description = "Rate your experience (1-5)", + Minimum = 1, + Maximum = 5, + }, + ["comments"] = new ElicitRequestParams.StringSchema + { + Title = "Comments", + Description = "Additional comments (optional)", + MaxLength = 500, + }, + ["recommend"] = new ElicitRequestParams.BooleanSchema + { + Title = "Would you recommend this?", + Description = "Would you recommend this to others?", + } + }, + Required = new List { "rating", "recommend" } + } + }; + + break; + + default: + throw new Exception($"Unknown info type: ${infoType}"); + + } + + + var result = await thisServer.ElicitAsync(elicitRequestParams); + var textResult = result.Action switch + { + "accept" => $"Thank you! Collected ${infoType} information: {JsonSerializer.Serialize(result.Content, McpJsonUtilities.DefaultOptions.GetTypeInfo(typeof(IDictionary)))}", + "decline" => "No information was collected. User declined ${infoType} information request.", + "cancel" => "Information collection was cancelled by the user.", + _ => "Error collecting ${infoType} information: ${error}" + }; + + return new CallToolResult() + { + Content = [ new TextContentBlock { Text = textResult } ], + }; + } +} diff --git a/src/ModelContextProtocol.AspNetCore/StreamableHttpHandler.cs b/src/ModelContextProtocol.AspNetCore/StreamableHttpHandler.cs index 9f4af7ea5..aaa936d8b 100644 --- a/src/ModelContextProtocol.AspNetCore/StreamableHttpHandler.cs +++ b/src/ModelContextProtocol.AspNetCore/StreamableHttpHandler.cs @@ -7,8 +7,11 @@ using Microsoft.Net.Http.Headers; using ModelContextProtocol.Protocol; using ModelContextProtocol.Server; +using System.Collections.Concurrent; +using System.Net.ServerSentEvents; using System.Security.Claims; using System.Security.Cryptography; +using System.Text.Json; using System.Text.Json.Serialization.Metadata; namespace ModelContextProtocol.AspNetCore; @@ -23,9 +26,11 @@ internal sealed class StreamableHttpHandler( ILoggerFactory loggerFactory) { private const string McpSessionIdHeaderName = "Mcp-Session-Id"; + private const string LastEventIdHeaderName = "Last-Event-Id"; private static readonly JsonTypeInfo s_messageTypeInfo = GetRequiredJsonTypeInfo(); private static readonly JsonTypeInfo s_errorTypeInfo = GetRequiredJsonTypeInfo(); + private static ConcurrentDictionary>> s_inMemoryEventStore = new(2, 15); public HttpServerTransportOptions HttpServerTransportOptions => httpServerTransportOptions.Value; @@ -88,6 +93,31 @@ await WriteJsonRpcErrorAsync(context, return; } + // eventId format is _ + var lastEventId = context.Request.Headers[LastEventIdHeaderName].ToString(); + if (!string.IsNullOrEmpty(lastEventId)) + { + InitializeSseResponse(context); + var streamId = lastEventId.Split('_')[0]; + var events = s_inMemoryEventStore.GetValueOrDefault(streamId, new()); + var sortedAndFilteredEventsToSend = events + .Where(e => e.Data is not null && e.EventId != null) + .OrderBy(e => e.EventId) + .SkipWhile(e => string.Compare(e.EventId!, lastEventId, StringComparison.Ordinal) < 0) + .Select(e => + new SseItem(e.Data!, e.EventType) + { + EventId = e.EventId, + ReconnectionInterval = e.ReconnectionInterval + }); + await SseFormatter.WriteAsync( + SseItemsAsyncEnumerable(sortedAndFilteredEventsToSend), + context.Response.Body, + (item, bufferWriter) => JsonSerializer.Serialize(new Utf8JsonWriter(bufferWriter), item.Data, s_messageTypeInfo), + context.RequestAborted); + return; + } + if (!session.TryStartGetRequest()) { await WriteJsonRpcErrorAsync(context, @@ -105,11 +135,11 @@ await WriteJsonRpcErrorAsync(context, try { await using var _ = await session.AcquireReferenceAsync(cancellationToken); - InitializeSseResponse(context); + InitializeSseResponse(context); - // We should flush headers to indicate a 200 success quickly, because the initialization response - // will be sent in response to a different POST request. It might be a while before we send a message - // over this response body. + // We should flush headers to indicate a 200 success quickly, because the initialization response + // will be sent in response to a different POST request. It might be a while before we send a message + // over this response body. await context.Response.Body.FlushAsync(cancellationToken); await session.Transport.HandleGetRequestAsync(context.Response.Body, cancellationToken); } @@ -190,7 +220,7 @@ private async ValueTask StartNewSessionAsync(HttpContext if (!HttpServerTransportOptions.Stateless) { sessionId = MakeNewSessionId(); - transport = new() + transport = new(s_inMemoryEventStore) { SessionId = sessionId, FlowExecutionContextFromRequests = !HttpServerTransportOptions.PerSessionExecutionContext, @@ -273,7 +303,7 @@ internal static string MakeNewSessionId() { Span buffer = stackalloc byte[16]; RandomNumberGenerator.Fill(buffer); - return WebEncoders.Base64UrlEncode(buffer); + return WebEncoders.Base64UrlEncode(buffer).Replace("_", "-"); } internal static async Task ReadJsonRpcMessageAsync(HttpContext context) { @@ -322,4 +352,12 @@ private static bool MatchesApplicationJsonMediaType(MediaTypeHeaderValue acceptH private static bool MatchesTextEventStreamMediaType(MediaTypeHeaderValue acceptHeaderValue) => acceptHeaderValue.MatchesMediaType("text/event-stream"); + + private static async IAsyncEnumerable> SseItemsAsyncEnumerable(IEnumerable> enumerableItems) + { + foreach (var sseItem in enumerableItems) + { + yield return sseItem; + } + } } diff --git a/src/ModelContextProtocol.Core/Server/SseWriter.cs b/src/ModelContextProtocol.Core/Server/SseWriter.cs index a2314e623..e2c4c2877 100644 --- a/src/ModelContextProtocol.Core/Server/SseWriter.cs +++ b/src/ModelContextProtocol.Core/Server/SseWriter.cs @@ -1,5 +1,6 @@ using ModelContextProtocol.Protocol; using System.Buffers; +using System.Collections.Concurrent; using System.Net.ServerSentEvents; using System.Text; using System.Text.Json; @@ -7,7 +8,10 @@ namespace ModelContextProtocol.Server; -internal sealed class SseWriter(string? messageEndpoint = null, BoundedChannelOptions? channelOptions = null) : IAsyncDisposable +internal sealed class SseWriter( + string? messageEndpoint = null, + BoundedChannelOptions? channelOptions = null, + ConcurrentDictionary>>? inMemoryEventStore = null) : IAsyncDisposable { private readonly Channel> _messages = Channel.CreateBounded>(channelOptions ?? new BoundedChannelOptions(1) { @@ -60,8 +64,36 @@ public async Task SendMessageAsync(JsonRpcMessage message, CancellationTok return false; } + var transport = message.Context?.RelatedTransport; + var sseItem = new SseItem(message, SseParser.EventTypeDefault); + + if (inMemoryEventStore is not null + && transport is StreamableHttpPostTransport postTransport + && !string.IsNullOrEmpty(postTransport.pendingStreamId)) + { + var streamId = postTransport.pendingStreamId!; + sseItem = new SseItem(message, SseParser.EventTypeDefault) + { + EventId = $"{streamId}_{DateTime.UtcNow.Ticks}" + }; + + // remove ElicitationCreate method check to support resumability for other type of requests + if (message is JsonRpcRequest jsonRpcReq && jsonRpcReq.Method == RequestMethods.ElicitationCreate) + { + var sseItemList = inMemoryEventStore.GetOrAdd(streamId, (key) => new List>()); + sseItemList.Add(sseItem); + } + + if (message is JsonRpcResponse jsonRpcResp + && jsonRpcResp.Id == postTransport.pendingRequestId + && inMemoryEventStore.TryGetValue(streamId, out var itemList)) + { + itemList.Add(sseItem); + } + } + // Emit redundant "event: message" lines for better compatibility with other SDKs. - await _messages.Writer.WriteAsync(new SseItem(message, SseParser.EventTypeDefault), cancellationToken).ConfigureAwait(false); + await _messages.Writer.WriteAsync(sseItem, cancellationToken).ConfigureAwait(false); return true; } diff --git a/src/ModelContextProtocol.Core/Server/StreamableHttpPostTransport.cs b/src/ModelContextProtocol.Core/Server/StreamableHttpPostTransport.cs index 1109c2b2b..8a5168982 100644 --- a/src/ModelContextProtocol.Core/Server/StreamableHttpPostTransport.cs +++ b/src/ModelContextProtocol.Core/Server/StreamableHttpPostTransport.cs @@ -1,4 +1,5 @@ using ModelContextProtocol.Protocol; +using System.Collections.Concurrent; using System.Diagnostics; using System.IO.Pipelines; using System.Net.ServerSentEvents; @@ -13,15 +14,22 @@ namespace ModelContextProtocol.Server; /// Handles processing the request/response body pairs for the Streamable HTTP transport. /// This is typically used via . /// -internal sealed class StreamableHttpPostTransport(StreamableHttpServerTransport parentTransport, Stream responseStream) : ITransport +internal sealed class StreamableHttpPostTransport( + StreamableHttpServerTransport parentTransport, + Stream responseStream, + ConcurrentDictionary>>? inMemoryEventStore = null) : ITransport { - private readonly SseWriter _sseWriter = new(); + private readonly SseWriter _sseWriter = new(inMemoryEventStore: inMemoryEventStore); private RequestId _pendingRequest; + private string? _pendingStreamId; public ChannelReader MessageReader => throw new NotSupportedException("JsonRpcMessage.Context.RelatedTransport should only be used for sending messages."); string? ITransport.SessionId => parentTransport.SessionId; + public string? pendingStreamId => _pendingStreamId; + public RequestId pendingRequestId => _pendingRequest; + /// /// True, if data was written to the respond body. /// False, if nothing was written because the request body did not contain any messages to respond to. @@ -34,6 +42,7 @@ public async ValueTask HandlePostAsync(JsonRpcMessage message, Cancellatio if (message is JsonRpcRequest request) { _pendingRequest = request.Id; + _pendingStreamId = Guid.NewGuid().ToString(); // Invoke the initialize request callback if applicable. if (parentTransport.OnInitRequestReceived is { } onInitRequest && request.Method == RequestMethods.Initialize) diff --git a/src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs b/src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs index ee943ea70..30c18e19d 100644 --- a/src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs +++ b/src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs @@ -1,5 +1,7 @@ using ModelContextProtocol.Protocol; +using System.Collections.Concurrent; using System.IO.Pipelines; +using System.Net.ServerSentEvents; using System.Security.Claims; using System.Threading.Channels; @@ -19,7 +21,7 @@ namespace ModelContextProtocol.Server; /// such as when streaming completion results or providing progress updates during long-running operations. /// /// -public sealed class StreamableHttpServerTransport : ITransport +public sealed class StreamableHttpServerTransport(ConcurrentDictionary>>? inMemoryEventStore = null) : ITransport { // For JsonRpcMessages without a RelatedTransport, we don't want to block just because the client didn't make a GET request to handle unsolicited messages. private readonly SseWriter _sseWriter = new(channelOptions: new BoundedChannelOptions(1) @@ -27,7 +29,7 @@ public sealed class StreamableHttpServerTransport : ITransport SingleReader = true, SingleWriter = false, FullMode = BoundedChannelFullMode.DropOldest, - }); + }, inMemoryEventStore: inMemoryEventStore); private readonly Channel _incomingChannel = Channel.CreateBounded(new BoundedChannelOptions(1) { SingleReader = true, @@ -117,7 +119,7 @@ public async Task HandlePostRequestAsync(JsonRpcMessage message, Stream re Throw.IfNull(responseStream); using var postCts = CancellationTokenSource.CreateLinkedTokenSource(_disposeCts.Token, cancellationToken); - await using var postTransport = new StreamableHttpPostTransport(this, responseStream); + await using var postTransport = new StreamableHttpPostTransport(this, responseStream, inMemoryEventStore); return await postTransport.HandlePostAsync(message, postCts.Token).ConfigureAwait(false); } From 0da87a12d1e953317695d6f626ba9dcda05df3c3 Mon Sep 17 00:00:00 2001 From: Yogesh Kumar Date: Thu, 16 Oct 2025 15:11:58 -0400 Subject: [PATCH 2/6] Adding IEventStore and InMemoryEventStore --- samples/AspNetCoreMcpServer/Program.cs | 4 +- .../HttpMcpServerBuilderExtensions.cs | 8 ++- .../InMemoryEventStore.cs | 52 +++++++++++++++++++ .../StreamableHttpHandler.cs | 40 ++++---------- .../Server/IEventStore.cs | 29 +++++++++++ .../Server/SseWriter.cs | 24 +++------ .../Server/StreamableHttpPostTransport.cs | 4 +- .../Server/StreamableHttpServerTransport.cs | 6 +-- 8 files changed, 113 insertions(+), 54 deletions(-) create mode 100644 src/ModelContextProtocol.AspNetCore/InMemoryEventStore.cs create mode 100644 src/ModelContextProtocol.Core/Server/IEventStore.cs diff --git a/samples/AspNetCoreMcpServer/Program.cs b/samples/AspNetCoreMcpServer/Program.cs index 05f9b9b6a..efba5a610 100644 --- a/samples/AspNetCoreMcpServer/Program.cs +++ b/samples/AspNetCoreMcpServer/Program.cs @@ -7,7 +7,7 @@ var builder = WebApplication.CreateBuilder(args); builder.Services.AddMcpServer() - .WithHttpTransport() + .WithHttpTransport(withInMemoryEventStore: true) .WithTools() .WithTools() // this tool collect user information through elicitation .WithTools() @@ -32,6 +32,6 @@ var app = builder.Build(); -app.MapMcp("/mcp"); +app.MapMcp(); app.Run(); diff --git a/src/ModelContextProtocol.AspNetCore/HttpMcpServerBuilderExtensions.cs b/src/ModelContextProtocol.AspNetCore/HttpMcpServerBuilderExtensions.cs index 313cbfa99..d5bc51cac 100644 --- a/src/ModelContextProtocol.AspNetCore/HttpMcpServerBuilderExtensions.cs +++ b/src/ModelContextProtocol.AspNetCore/HttpMcpServerBuilderExtensions.cs @@ -18,13 +18,19 @@ public static class HttpMcpServerBuilderExtensions /// /// The builder instance. /// Configures options for the Streamable HTTP transport. This allows configuring per-session + /// If , an in-memory event store is used to retain events for clients that reconnect after a network interruption. /// and running logic before and after a session. /// The builder provided in . /// is . - public static IMcpServerBuilder WithHttpTransport(this IMcpServerBuilder builder, Action? configureOptions = null) + public static IMcpServerBuilder WithHttpTransport(this IMcpServerBuilder builder, Action? configureOptions = null, bool withInMemoryEventStore = false) { ArgumentNullException.ThrowIfNull(builder); + if ( withInMemoryEventStore ) + { + builder.Services.TryAddSingleton(); + } + builder.Services.TryAddSingleton(); builder.Services.TryAddSingleton(); builder.Services.TryAddSingleton(); diff --git a/src/ModelContextProtocol.AspNetCore/InMemoryEventStore.cs b/src/ModelContextProtocol.AspNetCore/InMemoryEventStore.cs new file mode 100644 index 000000000..2083aca79 --- /dev/null +++ b/src/ModelContextProtocol.AspNetCore/InMemoryEventStore.cs @@ -0,0 +1,52 @@ +using ModelContextProtocol.Protocol; +using ModelContextProtocol.Server; +using System.Collections.Concurrent; +using System.Net.ServerSentEvents; + +namespace ModelContextProtocol.AspNetCore; +internal sealed class InMemoryEventStore : IEventStore +{ + private ConcurrentDictionary>> eventStore = new(); + + public void storeEvent(string streamId, SseItem messageItem) + { + // remove ElicitationCreate method check to support resumability for other type of requests + if (messageItem.Data is JsonRpcRequest jsonRpcReq && jsonRpcReq.Method == RequestMethods.ElicitationCreate) + { + var sseItemList = eventStore.GetOrAdd(streamId, (key) => new List>()); + sseItemList.Add(messageItem); + } + + if (messageItem.Data is JsonRpcResponse jsonRpcResp && + eventStore.TryGetValue(streamId, out var itemList)) + { + itemList.Add(messageItem); + } + } + + public async Task replayEventsAfter(string lastEventId, Action>> sendEvents) + { + var streamId = lastEventId.Split('_')[0]; + var events = eventStore.GetValueOrDefault(streamId, new()); + var sortedAndFilteredEventsToSend = events + .Where(e => e.Data is not null && e.EventId != null) + .OrderBy(e => e.EventId) + // sending events with EventId greater than or equal to lastEventId + .SkipWhile(e => string.Compare(e.EventId!, lastEventId, StringComparison.Ordinal) < 0) + .Select(e => + new SseItem(e.Data!, e.EventType) + { + EventId = e.EventId, + ReconnectionInterval = e.ReconnectionInterval + }); + sendEvents(SseItemsAsyncEnumerable(sortedAndFilteredEventsToSend)); + } + + private static async IAsyncEnumerable> SseItemsAsyncEnumerable(IEnumerable> enumerableItems) + { + foreach (var sseItem in enumerableItems) + { + yield return sseItem; + } + } +} diff --git a/src/ModelContextProtocol.AspNetCore/StreamableHttpHandler.cs b/src/ModelContextProtocol.AspNetCore/StreamableHttpHandler.cs index aaa936d8b..bb9abfb2e 100644 --- a/src/ModelContextProtocol.AspNetCore/StreamableHttpHandler.cs +++ b/src/ModelContextProtocol.AspNetCore/StreamableHttpHandler.cs @@ -7,7 +7,6 @@ using Microsoft.Net.Http.Headers; using ModelContextProtocol.Protocol; using ModelContextProtocol.Server; -using System.Collections.Concurrent; using System.Net.ServerSentEvents; using System.Security.Claims; using System.Security.Cryptography; @@ -23,14 +22,14 @@ internal sealed class StreamableHttpHandler( StatefulSessionManager sessionManager, IHostApplicationLifetime hostApplicationLifetime, IServiceProvider applicationServices, - ILoggerFactory loggerFactory) + ILoggerFactory loggerFactory, + IEventStore? eventStore = null) { private const string McpSessionIdHeaderName = "Mcp-Session-Id"; private const string LastEventIdHeaderName = "Last-Event-Id"; private static readonly JsonTypeInfo s_messageTypeInfo = GetRequiredJsonTypeInfo(); private static readonly JsonTypeInfo s_errorTypeInfo = GetRequiredJsonTypeInfo(); - private static ConcurrentDictionary>> s_inMemoryEventStore = new(2, 15); public HttpServerTransportOptions HttpServerTransportOptions => httpServerTransportOptions.Value; @@ -95,26 +94,15 @@ await WriteJsonRpcErrorAsync(context, // eventId format is _ var lastEventId = context.Request.Headers[LastEventIdHeaderName].ToString(); - if (!string.IsNullOrEmpty(lastEventId)) + if (!string.IsNullOrEmpty(lastEventId) && eventStore is not null) { InitializeSseResponse(context); - var streamId = lastEventId.Split('_')[0]; - var events = s_inMemoryEventStore.GetValueOrDefault(streamId, new()); - var sortedAndFilteredEventsToSend = events - .Where(e => e.Data is not null && e.EventId != null) - .OrderBy(e => e.EventId) - .SkipWhile(e => string.Compare(e.EventId!, lastEventId, StringComparison.Ordinal) < 0) - .Select(e => - new SseItem(e.Data!, e.EventType) - { - EventId = e.EventId, - ReconnectionInterval = e.ReconnectionInterval - }); - await SseFormatter.WriteAsync( - SseItemsAsyncEnumerable(sortedAndFilteredEventsToSend), - context.Response.Body, - (item, bufferWriter) => JsonSerializer.Serialize(new Utf8JsonWriter(bufferWriter), item.Data, s_messageTypeInfo), - context.RequestAborted); + await eventStore.replayEventsAfter(lastEventId, async (enumerableEvents) => + await SseFormatter.WriteAsync(enumerableEvents, + context.Response.Body, + (item, bufferWriter) => JsonSerializer.Serialize(new Utf8JsonWriter(bufferWriter), item.Data, s_messageTypeInfo), + context.RequestAborted)); + return; } @@ -220,7 +208,7 @@ private async ValueTask StartNewSessionAsync(HttpContext if (!HttpServerTransportOptions.Stateless) { sessionId = MakeNewSessionId(); - transport = new(s_inMemoryEventStore) + transport = new(eventStore) { SessionId = sessionId, FlowExecutionContextFromRequests = !HttpServerTransportOptions.PerSessionExecutionContext, @@ -352,12 +340,4 @@ private static bool MatchesApplicationJsonMediaType(MediaTypeHeaderValue acceptH private static bool MatchesTextEventStreamMediaType(MediaTypeHeaderValue acceptHeaderValue) => acceptHeaderValue.MatchesMediaType("text/event-stream"); - - private static async IAsyncEnumerable> SseItemsAsyncEnumerable(IEnumerable> enumerableItems) - { - foreach (var sseItem in enumerableItems) - { - yield return sseItem; - } - } } diff --git a/src/ModelContextProtocol.Core/Server/IEventStore.cs b/src/ModelContextProtocol.Core/Server/IEventStore.cs new file mode 100644 index 000000000..6499065e3 --- /dev/null +++ b/src/ModelContextProtocol.Core/Server/IEventStore.cs @@ -0,0 +1,29 @@ +using ModelContextProtocol.Protocol; +using System.Net.ServerSentEvents; + +namespace ModelContextProtocol.Server; + +/// +/// Interface for resumability support via event storage +/// +public interface IEventStore +{ + /// + /// Stores an event in the specified stream and returns the unique identifier of the stored event. + /// + /// This method asynchronously stores the provided event in the specified stream. The returned + /// event identifier can be used to retrieve or reference the stored event in the future. + /// The identifier of the stream where the event will be stored. Cannot be null or empty. + /// The event item to be stored, which may contain a JSON-RPC message or be null. + void storeEvent(string streamId, SseItem messageItem); + + + /// + /// Replays events that occurred after the specified event ID. + /// + /// The ID of the last event that was processed. Events occurring after this ID will be replayed. + /// A callback action that processes the replayed events as an asynchronous enumerable of + /// containing objects. + /// A task that represents the asynchronous operation of replaying events. + Task replayEventsAfter(string lastEventId, Action>> sendEvents); +} diff --git a/src/ModelContextProtocol.Core/Server/SseWriter.cs b/src/ModelContextProtocol.Core/Server/SseWriter.cs index e2c4c2877..efe33706c 100644 --- a/src/ModelContextProtocol.Core/Server/SseWriter.cs +++ b/src/ModelContextProtocol.Core/Server/SseWriter.cs @@ -1,6 +1,5 @@ using ModelContextProtocol.Protocol; using System.Buffers; -using System.Collections.Concurrent; using System.Net.ServerSentEvents; using System.Text; using System.Text.Json; @@ -11,7 +10,7 @@ namespace ModelContextProtocol.Server; internal sealed class SseWriter( string? messageEndpoint = null, BoundedChannelOptions? channelOptions = null, - ConcurrentDictionary>>? inMemoryEventStore = null) : IAsyncDisposable + IEventStore? eventStore = null) : IAsyncDisposable { private readonly Channel> _messages = Channel.CreateBounded>(channelOptions ?? new BoundedChannelOptions(1) { @@ -67,9 +66,9 @@ public async Task SendMessageAsync(JsonRpcMessage message, CancellationTok var transport = message.Context?.RelatedTransport; var sseItem = new SseItem(message, SseParser.EventTypeDefault); - if (inMemoryEventStore is not null - && transport is StreamableHttpPostTransport postTransport - && !string.IsNullOrEmpty(postTransport.pendingStreamId)) + if (eventStore is not null && + transport is StreamableHttpPostTransport postTransport && + !string.IsNullOrEmpty(postTransport.pendingStreamId)) { var streamId = postTransport.pendingStreamId!; sseItem = new SseItem(message, SseParser.EventTypeDefault) @@ -77,18 +76,11 @@ public async Task SendMessageAsync(JsonRpcMessage message, CancellationTok EventId = $"{streamId}_{DateTime.UtcNow.Ticks}" }; - // remove ElicitationCreate method check to support resumability for other type of requests - if (message is JsonRpcRequest jsonRpcReq && jsonRpcReq.Method == RequestMethods.ElicitationCreate) + // store the requests and response to the pending request + if (message is JsonRpcRequest jsonRpcReq || + (message is JsonRpcResponse jsonRpcResp && jsonRpcResp.Id == postTransport.pendingRequestId)) { - var sseItemList = inMemoryEventStore.GetOrAdd(streamId, (key) => new List>()); - sseItemList.Add(sseItem); - } - - if (message is JsonRpcResponse jsonRpcResp - && jsonRpcResp.Id == postTransport.pendingRequestId - && inMemoryEventStore.TryGetValue(streamId, out var itemList)) - { - itemList.Add(sseItem); + eventStore.storeEvent(streamId, sseItem); } } diff --git a/src/ModelContextProtocol.Core/Server/StreamableHttpPostTransport.cs b/src/ModelContextProtocol.Core/Server/StreamableHttpPostTransport.cs index 8a5168982..9516f4503 100644 --- a/src/ModelContextProtocol.Core/Server/StreamableHttpPostTransport.cs +++ b/src/ModelContextProtocol.Core/Server/StreamableHttpPostTransport.cs @@ -17,9 +17,9 @@ namespace ModelContextProtocol.Server; internal sealed class StreamableHttpPostTransport( StreamableHttpServerTransport parentTransport, Stream responseStream, - ConcurrentDictionary>>? inMemoryEventStore = null) : ITransport + IEventStore? eventStore = null) : ITransport { - private readonly SseWriter _sseWriter = new(inMemoryEventStore: inMemoryEventStore); + private readonly SseWriter _sseWriter = new(eventStore: eventStore); private RequestId _pendingRequest; private string? _pendingStreamId; diff --git a/src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs b/src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs index 30c18e19d..d1d313478 100644 --- a/src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs +++ b/src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs @@ -21,7 +21,7 @@ namespace ModelContextProtocol.Server; /// such as when streaming completion results or providing progress updates during long-running operations. /// /// -public sealed class StreamableHttpServerTransport(ConcurrentDictionary>>? inMemoryEventStore = null) : ITransport +public sealed class StreamableHttpServerTransport(IEventStore? eventStore = null) : ITransport { // For JsonRpcMessages without a RelatedTransport, we don't want to block just because the client didn't make a GET request to handle unsolicited messages. private readonly SseWriter _sseWriter = new(channelOptions: new BoundedChannelOptions(1) @@ -29,7 +29,7 @@ public sealed class StreamableHttpServerTransport(ConcurrentDictionary _incomingChannel = Channel.CreateBounded(new BoundedChannelOptions(1) { SingleReader = true, @@ -119,7 +119,7 @@ public async Task HandlePostRequestAsync(JsonRpcMessage message, Stream re Throw.IfNull(responseStream); using var postCts = CancellationTokenSource.CreateLinkedTokenSource(_disposeCts.Token, cancellationToken); - await using var postTransport = new StreamableHttpPostTransport(this, responseStream, inMemoryEventStore); + await using var postTransport = new StreamableHttpPostTransport(this, responseStream, eventStore); return await postTransport.HandlePostAsync(message, postCts.Token).ConfigureAwait(false); } From 708f11cd5928a351210b0a8931b0a8d77e8c927b Mon Sep 17 00:00:00 2001 From: Yogesh Kumar Date: Thu, 16 Oct 2025 16:20:11 -0400 Subject: [PATCH 3/6] Moving event store implementation to sample project --- .../AspNetCoreMcpServer}/InMemoryEventStore.cs | 11 +++++++++-- samples/AspNetCoreMcpServer/Program.cs | 7 ++++++- .../HttpMcpServerBuilderExtensions.cs | 8 +------- 3 files changed, 16 insertions(+), 10 deletions(-) rename {src/ModelContextProtocol.AspNetCore => samples/AspNetCoreMcpServer}/InMemoryEventStore.cs (76%) diff --git a/src/ModelContextProtocol.AspNetCore/InMemoryEventStore.cs b/samples/AspNetCoreMcpServer/InMemoryEventStore.cs similarity index 76% rename from src/ModelContextProtocol.AspNetCore/InMemoryEventStore.cs rename to samples/AspNetCoreMcpServer/InMemoryEventStore.cs index 2083aca79..d7bc9c4bc 100644 --- a/src/ModelContextProtocol.AspNetCore/InMemoryEventStore.cs +++ b/samples/AspNetCoreMcpServer/InMemoryEventStore.cs @@ -3,8 +3,15 @@ using System.Collections.Concurrent; using System.Net.ServerSentEvents; -namespace ModelContextProtocol.AspNetCore; -internal sealed class InMemoryEventStore : IEventStore +/// +/// Represents an in-memory implementation of an event store that stores and replays events associated with specific +/// streams. This class is designed to handle events of type where the data payload is a . +/// +/// The provides functionality to store events for a given stream and +/// replay events after a specified event ID. It supports resumability for specific types of requests and ensures events +/// are replayed in the correct order. +public sealed class InMemoryEventStore : IEventStore { private ConcurrentDictionary>> eventStore = new(); diff --git a/samples/AspNetCoreMcpServer/Program.cs b/samples/AspNetCoreMcpServer/Program.cs index efba5a610..206077047 100644 --- a/samples/AspNetCoreMcpServer/Program.cs +++ b/samples/AspNetCoreMcpServer/Program.cs @@ -1,5 +1,7 @@ using AspNetCoreMcpServer.Resources; using AspNetCoreMcpServer.Tools; +using Microsoft.Extensions.DependencyInjection.Extensions; +using ModelContextProtocol.Server; using OpenTelemetry; using OpenTelemetry.Metrics; using OpenTelemetry.Trace; @@ -7,7 +9,7 @@ var builder = WebApplication.CreateBuilder(args); builder.Services.AddMcpServer() - .WithHttpTransport(withInMemoryEventStore: true) + .WithHttpTransport() .WithTools() .WithTools() // this tool collect user information through elicitation .WithTools() @@ -30,6 +32,9 @@ client.DefaultRequestHeaders.UserAgent.Add(new ProductInfoHeaderValue("weather-tool", "1.0")); }); +// adding InMemoryEventStore to support stream resumability +builder.Services.TryAddSingleton(); + var app = builder.Build(); app.MapMcp(); diff --git a/src/ModelContextProtocol.AspNetCore/HttpMcpServerBuilderExtensions.cs b/src/ModelContextProtocol.AspNetCore/HttpMcpServerBuilderExtensions.cs index d5bc51cac..313cbfa99 100644 --- a/src/ModelContextProtocol.AspNetCore/HttpMcpServerBuilderExtensions.cs +++ b/src/ModelContextProtocol.AspNetCore/HttpMcpServerBuilderExtensions.cs @@ -18,19 +18,13 @@ public static class HttpMcpServerBuilderExtensions /// /// The builder instance. /// Configures options for the Streamable HTTP transport. This allows configuring per-session - /// If , an in-memory event store is used to retain events for clients that reconnect after a network interruption. /// and running logic before and after a session. /// The builder provided in . /// is . - public static IMcpServerBuilder WithHttpTransport(this IMcpServerBuilder builder, Action? configureOptions = null, bool withInMemoryEventStore = false) + public static IMcpServerBuilder WithHttpTransport(this IMcpServerBuilder builder, Action? configureOptions = null) { ArgumentNullException.ThrowIfNull(builder); - if ( withInMemoryEventStore ) - { - builder.Services.TryAddSingleton(); - } - builder.Services.TryAddSingleton(); builder.Services.TryAddSingleton(); builder.Services.TryAddSingleton(); From f6d0003738d79d156a824e143d1bb04e4b35c6e0 Mon Sep 17 00:00:00 2001 From: Yogesh Kumar Date: Fri, 17 Oct 2025 10:31:23 -0400 Subject: [PATCH 4/6] Adding support for event store cleanup and implemented a simple background job in sample app --- .../EventStore/EventStoreCleanupService.cs | 47 +++++++++ .../EventStore/InMemoryEventStore.cs | 98 +++++++++++++++++++ .../AspNetCoreMcpServer/InMemoryEventStore.cs | 59 ----------- samples/AspNetCoreMcpServer/Program.cs | 4 +- samples/AspNetCoreMcpServer/appsettings.json | 6 +- .../StreamableHttpHandler.cs | 2 +- .../Server/IEventStore.cs | 20 +++- .../Server/SseWriter.cs | 4 +- 8 files changed, 174 insertions(+), 66 deletions(-) create mode 100644 samples/AspNetCoreMcpServer/EventStore/EventStoreCleanupService.cs create mode 100644 samples/AspNetCoreMcpServer/EventStore/InMemoryEventStore.cs delete mode 100644 samples/AspNetCoreMcpServer/InMemoryEventStore.cs diff --git a/samples/AspNetCoreMcpServer/EventStore/EventStoreCleanupService.cs b/samples/AspNetCoreMcpServer/EventStore/EventStoreCleanupService.cs new file mode 100644 index 000000000..d8f3604d3 --- /dev/null +++ b/samples/AspNetCoreMcpServer/EventStore/EventStoreCleanupService.cs @@ -0,0 +1,47 @@ +using ModelContextProtocol.Server; + +namespace AspNetCoreMcpServer.EventStore; + +public class EventStoreCleanupService : BackgroundService +{ + private readonly TimeSpan _jobRunFrequencyInMinutes; + private readonly ILogger _logger; + private readonly IEventStore? _eventStore; + + public EventStoreCleanupService(ILogger logger, IConfiguration configuration, IEventStore? eventStore = null) + { + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + + _eventStore = eventStore; + _jobRunFrequencyInMinutes = TimeSpan.FromMinutes(configuration.GetValue("EventStore:CleanupJobRunFrequencyInMinutes", 30)); + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + + if (_eventStore is null) + { + _logger.LogWarning("No event store implementation provided. Event store cleanup job will not run."); + return; + } + + _logger.LogInformation("Event store cleanup job started."); + + while (!stoppingToken.IsCancellationRequested) + { + try + { + _logger.LogInformation("Running event store cleanup job at {CurrentTimeInUtc}.", DateTime.UtcNow); + _eventStore.CleanupEventStore(); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error running event store cleanup job."); + } + + await Task.Delay(_jobRunFrequencyInMinutes, stoppingToken); + } + + _logger.LogInformation("Event store cleanup job stopping."); + } +} diff --git a/samples/AspNetCoreMcpServer/EventStore/InMemoryEventStore.cs b/samples/AspNetCoreMcpServer/EventStore/InMemoryEventStore.cs new file mode 100644 index 000000000..b0b961a26 --- /dev/null +++ b/samples/AspNetCoreMcpServer/EventStore/InMemoryEventStore.cs @@ -0,0 +1,98 @@ +using ModelContextProtocol.Protocol; +using ModelContextProtocol.Server; +using System.Collections.Concurrent; +using System.Net.ServerSentEvents; + +namespace AspNetCoreMcpServer.EventStore; + +/// +/// Represents an in-memory implementation of an event store that stores and replays events associated with specific +/// streams. This class is designed to handle events of type where the data payload is a . +/// +/// The provides functionality to store events for a given stream and +/// replay events after a specified event ID. It supports resumability for specific types of requests and ensures events +/// are replayed in the correct order. +public sealed class InMemoryEventStore : IEventStore +{ + public const string EventIdDelimiter = "_"; + private ConcurrentDictionary>> _eventStore = new(); + + private readonly ILogger _logger; + private readonly TimeSpan _eventsRetentionDurationInMinutes; + + public InMemoryEventStore(IConfiguration configuration, ILogger logger) + { + _eventsRetentionDurationInMinutes = TimeSpan.FromMinutes(configuration.GetValue("EventStore:EventsRetentionDurationInMinutes", 60)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public void StoreEvent(string streamId, SseItem messageItem) + { + // remove ElicitationCreate method check to support resumability for other type of requests + if (messageItem.Data is JsonRpcRequest jsonRpcReq && jsonRpcReq.Method == RequestMethods.ElicitationCreate) + { + var sseItemList = _eventStore.GetOrAdd(streamId, (key) => new List>()); + sseItemList.Add(messageItem); + } + + if (messageItem.Data is JsonRpcResponse jsonRpcResp && + _eventStore.TryGetValue(streamId, out var itemList)) + { + itemList.Add(messageItem); + } + } + + public async Task ReplayEventsAfter(string lastEventId, Action>> sendEvents) + { + var streamId = lastEventId.Split(EventIdDelimiter)[0]; + var events = _eventStore.GetValueOrDefault(streamId, new()); + var sortedAndFilteredEventsToSend = events + .Where(e => e.Data is not null && e.EventId != null) + .OrderBy(e => e.EventId) + // Sending events with EventId greater than or equal to lastEventId. + // As per MCP specs, the server should resend the events after lastEventId + .SkipWhile(e => string.Compare(e.EventId!, lastEventId, StringComparison.Ordinal) < 0) + .Select(e => + new SseItem(e.Data!, e.EventType) + { + EventId = e.EventId, + ReconnectionInterval = e.ReconnectionInterval + }); + sendEvents(SseItemsAsyncEnumerable(sortedAndFilteredEventsToSend)); + } + + private static async IAsyncEnumerable> SseItemsAsyncEnumerable(IEnumerable> enumerableItems) + { + foreach (var sseItem in enumerableItems) + { + yield return sseItem; + } + } + + public string? GetEventId(string streamId, JsonRpcMessage message) + { + return $"{streamId}{EventIdDelimiter}{DateTime.UtcNow.Ticks}"; + } + + public void CleanupEventStore() + { + var cutoffTime = DateTime.UtcNow - _eventsRetentionDurationInMinutes; + _logger.LogInformation("Cleaning up events older than {CutoffTime} from event store.", cutoffTime); + + foreach (var key in _eventStore.Keys) + { + if (_eventStore.TryGetValue(key, out var itemList)) + { + itemList.RemoveAll(item => item.EventId != null && + long.TryParse(item.EventId.Split(EventIdDelimiter)[1], out var ticks) && + new DateTime(ticks) < cutoffTime); + if (itemList.Count == 0) + { + _logger.LogInformation("Removing empty event stream with key {EventStreamKey} from event store.", key); + _eventStore.TryRemove(key, out _); + } + } + } + } +} diff --git a/samples/AspNetCoreMcpServer/InMemoryEventStore.cs b/samples/AspNetCoreMcpServer/InMemoryEventStore.cs deleted file mode 100644 index d7bc9c4bc..000000000 --- a/samples/AspNetCoreMcpServer/InMemoryEventStore.cs +++ /dev/null @@ -1,59 +0,0 @@ -using ModelContextProtocol.Protocol; -using ModelContextProtocol.Server; -using System.Collections.Concurrent; -using System.Net.ServerSentEvents; - -/// -/// Represents an in-memory implementation of an event store that stores and replays events associated with specific -/// streams. This class is designed to handle events of type where the data payload is a . -/// -/// The provides functionality to store events for a given stream and -/// replay events after a specified event ID. It supports resumability for specific types of requests and ensures events -/// are replayed in the correct order. -public sealed class InMemoryEventStore : IEventStore -{ - private ConcurrentDictionary>> eventStore = new(); - - public void storeEvent(string streamId, SseItem messageItem) - { - // remove ElicitationCreate method check to support resumability for other type of requests - if (messageItem.Data is JsonRpcRequest jsonRpcReq && jsonRpcReq.Method == RequestMethods.ElicitationCreate) - { - var sseItemList = eventStore.GetOrAdd(streamId, (key) => new List>()); - sseItemList.Add(messageItem); - } - - if (messageItem.Data is JsonRpcResponse jsonRpcResp && - eventStore.TryGetValue(streamId, out var itemList)) - { - itemList.Add(messageItem); - } - } - - public async Task replayEventsAfter(string lastEventId, Action>> sendEvents) - { - var streamId = lastEventId.Split('_')[0]; - var events = eventStore.GetValueOrDefault(streamId, new()); - var sortedAndFilteredEventsToSend = events - .Where(e => e.Data is not null && e.EventId != null) - .OrderBy(e => e.EventId) - // sending events with EventId greater than or equal to lastEventId - .SkipWhile(e => string.Compare(e.EventId!, lastEventId, StringComparison.Ordinal) < 0) - .Select(e => - new SseItem(e.Data!, e.EventType) - { - EventId = e.EventId, - ReconnectionInterval = e.ReconnectionInterval - }); - sendEvents(SseItemsAsyncEnumerable(sortedAndFilteredEventsToSend)); - } - - private static async IAsyncEnumerable> SseItemsAsyncEnumerable(IEnumerable> enumerableItems) - { - foreach (var sseItem in enumerableItems) - { - yield return sseItem; - } - } -} diff --git a/samples/AspNetCoreMcpServer/Program.cs b/samples/AspNetCoreMcpServer/Program.cs index 206077047..9a70be911 100644 --- a/samples/AspNetCoreMcpServer/Program.cs +++ b/samples/AspNetCoreMcpServer/Program.cs @@ -1,3 +1,4 @@ +using AspNetCoreMcpServer.EventStore; using AspNetCoreMcpServer.Resources; using AspNetCoreMcpServer.Tools; using Microsoft.Extensions.DependencyInjection.Extensions; @@ -32,8 +33,9 @@ client.DefaultRequestHeaders.UserAgent.Add(new ProductInfoHeaderValue("weather-tool", "1.0")); }); -// adding InMemoryEventStore to support stream resumability +// adding InMemoryEventStore to support stream resumability and background cleanup service builder.Services.TryAddSingleton(); +builder.Services.AddHostedService(); var app = builder.Build(); diff --git a/samples/AspNetCoreMcpServer/appsettings.json b/samples/AspNetCoreMcpServer/appsettings.json index 10f68b8c8..7e2f39d16 100644 --- a/samples/AspNetCoreMcpServer/appsettings.json +++ b/samples/AspNetCoreMcpServer/appsettings.json @@ -5,5 +5,9 @@ "Microsoft.AspNetCore": "Warning" } }, - "AllowedHosts": "*" + "AllowedHosts": "*", + "EventStore": { + "CleanupJobRunFrequencyInMinutes": 30, + "EventsRetentionDurationInMinutes": 60 + } } diff --git a/src/ModelContextProtocol.AspNetCore/StreamableHttpHandler.cs b/src/ModelContextProtocol.AspNetCore/StreamableHttpHandler.cs index bb9abfb2e..a4aec7336 100644 --- a/src/ModelContextProtocol.AspNetCore/StreamableHttpHandler.cs +++ b/src/ModelContextProtocol.AspNetCore/StreamableHttpHandler.cs @@ -97,7 +97,7 @@ await WriteJsonRpcErrorAsync(context, if (!string.IsNullOrEmpty(lastEventId) && eventStore is not null) { InitializeSseResponse(context); - await eventStore.replayEventsAfter(lastEventId, async (enumerableEvents) => + await eventStore.ReplayEventsAfter(lastEventId, async (enumerableEvents) => await SseFormatter.WriteAsync(enumerableEvents, context.Response.Body, (item, bufferWriter) => JsonSerializer.Serialize(new Utf8JsonWriter(bufferWriter), item.Data, s_messageTypeInfo), diff --git a/src/ModelContextProtocol.Core/Server/IEventStore.cs b/src/ModelContextProtocol.Core/Server/IEventStore.cs index 6499065e3..3b1a5c9ce 100644 --- a/src/ModelContextProtocol.Core/Server/IEventStore.cs +++ b/src/ModelContextProtocol.Core/Server/IEventStore.cs @@ -15,7 +15,7 @@ public interface IEventStore /// event identifier can be used to retrieve or reference the stored event in the future. /// The identifier of the stream where the event will be stored. Cannot be null or empty. /// The event item to be stored, which may contain a JSON-RPC message or be null. - void storeEvent(string streamId, SseItem messageItem); + void StoreEvent(string streamId, SseItem messageItem); /// @@ -25,5 +25,21 @@ public interface IEventStore /// A callback action that processes the replayed events as an asynchronous enumerable of /// containing objects. /// A task that represents the asynchronous operation of replaying events. - Task replayEventsAfter(string lastEventId, Action>> sendEvents); + Task ReplayEventsAfter(string lastEventId, Action>> sendEvents); + + /// + /// Retrieves the event identifier associated with a specific JSON-RPC message in the given stream. + /// + /// The unique identifier of the stream containing the message. + /// The JSON-RPC message for which the event identifier is to be retrieved. + /// The event identifier as a string, or if no event identifier is associated with the + /// message. + string? GetEventId(string streamId, JsonRpcMessage message); + + /// + /// Cleans up the event store by removing outdated or unnecessary events. + /// + /// This method is typically used to maintain the event store's size and performance by clearing + /// events that are no longer needed. + void CleanupEventStore(); } diff --git a/src/ModelContextProtocol.Core/Server/SseWriter.cs b/src/ModelContextProtocol.Core/Server/SseWriter.cs index efe33706c..dfde263b7 100644 --- a/src/ModelContextProtocol.Core/Server/SseWriter.cs +++ b/src/ModelContextProtocol.Core/Server/SseWriter.cs @@ -73,14 +73,14 @@ transport is StreamableHttpPostTransport postTransport && var streamId = postTransport.pendingStreamId!; sseItem = new SseItem(message, SseParser.EventTypeDefault) { - EventId = $"{streamId}_{DateTime.UtcNow.Ticks}" + EventId = eventStore.GetEventId(streamId, message) }; // store the requests and response to the pending request if (message is JsonRpcRequest jsonRpcReq || (message is JsonRpcResponse jsonRpcResp && jsonRpcResp.Id == postTransport.pendingRequestId)) { - eventStore.storeEvent(streamId, sseItem); + eventStore.StoreEvent(streamId, sseItem); } } From 237ccd8d6c2caeb9c79e5989424b41a5a65cc7dc Mon Sep 17 00:00:00 2001 From: Yogesh Kumar Date: Mon, 20 Oct 2025 17:45:20 -0400 Subject: [PATCH 5/6] send events after last-event-id --- samples/AspNetCoreMcpServer/EventStore/InMemoryEventStore.cs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/samples/AspNetCoreMcpServer/EventStore/InMemoryEventStore.cs b/samples/AspNetCoreMcpServer/EventStore/InMemoryEventStore.cs index b0b961a26..26bd5ae26 100644 --- a/samples/AspNetCoreMcpServer/EventStore/InMemoryEventStore.cs +++ b/samples/AspNetCoreMcpServer/EventStore/InMemoryEventStore.cs @@ -50,9 +50,8 @@ public async Task ReplayEventsAfter(string lastEventId, Action e.Data is not null && e.EventId != null) .OrderBy(e => e.EventId) - // Sending events with EventId greater than or equal to lastEventId. - // As per MCP specs, the server should resend the events after lastEventId - .SkipWhile(e => string.Compare(e.EventId!, lastEventId, StringComparison.Ordinal) < 0) + // Sending events with EventId greater than lastEventId. + .SkipWhile(e => string.Compare(e.EventId!, lastEventId, StringComparison.Ordinal) <= 0) .Select(e => new SseItem(e.Data!, e.EventType) { From 270d43c34a2e95580e0a1b629d41cd3431aae9ce Mon Sep 17 00:00:00 2001 From: Yogesh Kumar Date: Thu, 6 Nov 2025 12:37:50 -0500 Subject: [PATCH 6/6] Refactor event store cleanup to use IEventStoreCleaner Refactored `EventStoreCleanupService` to use a new `IEventStoreCleaner` interface, separating cleanup responsibilities from the `IEventStore` interface. - Introduced `IEventStoreCleaner` with a `CleanEventStore` method. - Updated `InMemoryEventStore` to implement `IEventStoreCleaner`. - Removed `CleanupEventStore` from `IEventStore`. - Registered `InMemoryEventStore` as both `IEventStore` and `IEventStoreCleaner` in DI. - Updated logging messages to reflect the new terminology. This change improves modularity and adheres to the Single Responsibility Principle. --- .../EventStore/EventStoreCleanupService.cs | 12 ++++++------ .../EventStore/IEventStoreCleaner.cs | 15 +++++++++++++++ .../EventStore/InMemoryEventStore.cs | 6 +++--- samples/AspNetCoreMcpServer/Program.cs | 1 + .../Server/IEventStore.cs | 7 ------- 5 files changed, 25 insertions(+), 16 deletions(-) create mode 100644 samples/AspNetCoreMcpServer/EventStore/IEventStoreCleaner.cs diff --git a/samples/AspNetCoreMcpServer/EventStore/EventStoreCleanupService.cs b/samples/AspNetCoreMcpServer/EventStore/EventStoreCleanupService.cs index d8f3604d3..cc79dc1a7 100644 --- a/samples/AspNetCoreMcpServer/EventStore/EventStoreCleanupService.cs +++ b/samples/AspNetCoreMcpServer/EventStore/EventStoreCleanupService.cs @@ -6,22 +6,22 @@ public class EventStoreCleanupService : BackgroundService { private readonly TimeSpan _jobRunFrequencyInMinutes; private readonly ILogger _logger; - private readonly IEventStore? _eventStore; + private readonly IEventStoreCleaner? _eventStoreCleaner; - public EventStoreCleanupService(ILogger logger, IConfiguration configuration, IEventStore? eventStore = null) + public EventStoreCleanupService(ILogger logger, IConfiguration configuration, IEventStoreCleaner? eventStoreCleaner = null) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - _eventStore = eventStore; + _eventStoreCleaner = eventStoreCleaner; _jobRunFrequencyInMinutes = TimeSpan.FromMinutes(configuration.GetValue("EventStore:CleanupJobRunFrequencyInMinutes", 30)); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - if (_eventStore is null) + if (_eventStoreCleaner is null) { - _logger.LogWarning("No event store implementation provided. Event store cleanup job will not run."); + _logger.LogWarning("No event store cleaner implementation provided. Event store cleanup job will not run."); return; } @@ -32,7 +32,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) try { _logger.LogInformation("Running event store cleanup job at {CurrentTimeInUtc}.", DateTime.UtcNow); - _eventStore.CleanupEventStore(); + _eventStoreCleaner.CleanEventStore(); } catch (Exception ex) { diff --git a/samples/AspNetCoreMcpServer/EventStore/IEventStoreCleaner.cs b/samples/AspNetCoreMcpServer/EventStore/IEventStoreCleaner.cs new file mode 100644 index 000000000..ba4d495d6 --- /dev/null +++ b/samples/AspNetCoreMcpServer/EventStore/IEventStoreCleaner.cs @@ -0,0 +1,15 @@ +namespace AspNetCoreMcpServer.EventStore; + +/// +/// Interface for cleaning up the event store +/// +public interface IEventStoreCleaner +{ + + /// + /// Cleans up the event store by removing outdated or unnecessary events. + /// + /// This method is typically used to maintain the event store's size and performance by clearing + /// events that are no longer needed. + void CleanEventStore(); +} diff --git a/samples/AspNetCoreMcpServer/EventStore/InMemoryEventStore.cs b/samples/AspNetCoreMcpServer/EventStore/InMemoryEventStore.cs index 26bd5ae26..0706c86bf 100644 --- a/samples/AspNetCoreMcpServer/EventStore/InMemoryEventStore.cs +++ b/samples/AspNetCoreMcpServer/EventStore/InMemoryEventStore.cs @@ -13,10 +13,10 @@ namespace AspNetCoreMcpServer.EventStore; /// The provides functionality to store events for a given stream and /// replay events after a specified event ID. It supports resumability for specific types of requests and ensures events /// are replayed in the correct order. -public sealed class InMemoryEventStore : IEventStore +public sealed class InMemoryEventStore : IEventStore, IEventStoreCleaner { public const string EventIdDelimiter = "_"; - private ConcurrentDictionary>> _eventStore = new(); + private static ConcurrentDictionary>> _eventStore = new(); private readonly ILogger _logger; private readonly TimeSpan _eventsRetentionDurationInMinutes; @@ -74,7 +74,7 @@ private static async IAsyncEnumerable> SseItemsAsyncEnum return $"{streamId}{EventIdDelimiter}{DateTime.UtcNow.Ticks}"; } - public void CleanupEventStore() + public void CleanEventStore() { var cutoffTime = DateTime.UtcNow - _eventsRetentionDurationInMinutes; _logger.LogInformation("Cleaning up events older than {CutoffTime} from event store.", cutoffTime); diff --git a/samples/AspNetCoreMcpServer/Program.cs b/samples/AspNetCoreMcpServer/Program.cs index 9a70be911..00ad4967c 100644 --- a/samples/AspNetCoreMcpServer/Program.cs +++ b/samples/AspNetCoreMcpServer/Program.cs @@ -35,6 +35,7 @@ // adding InMemoryEventStore to support stream resumability and background cleanup service builder.Services.TryAddSingleton(); +builder.Services.TryAddSingleton(); builder.Services.AddHostedService(); var app = builder.Build(); diff --git a/src/ModelContextProtocol.Core/Server/IEventStore.cs b/src/ModelContextProtocol.Core/Server/IEventStore.cs index 3b1a5c9ce..329a077fe 100644 --- a/src/ModelContextProtocol.Core/Server/IEventStore.cs +++ b/src/ModelContextProtocol.Core/Server/IEventStore.cs @@ -35,11 +35,4 @@ public interface IEventStore /// The event identifier as a string, or if no event identifier is associated with the /// message. string? GetEventId(string streamId, JsonRpcMessage message); - - /// - /// Cleans up the event store by removing outdated or unnecessary events. - /// - /// This method is typically used to maintain the event store's size and performance by clearing - /// events that are no longer needed. - void CleanupEventStore(); }