Skip to content

Commit 93834d1

Browse files
Fix support for sending server notifications with request-specific McpServer instances used outside their original scope. (#832)
* Fix support for sending server notifications with request-specific McpServer instances used outside their original scope. * Address feedback. * Update src/ModelContextProtocol.Core/Server/SseWriter.cs
1 parent 445ea42 commit 93834d1

File tree

5 files changed

+49
-6
lines changed

5 files changed

+49
-6
lines changed

src/ModelContextProtocol.Core/Server/SseResponseStreamTransport.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public async ValueTask DisposeAsync()
6666
public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken cancellationToken = default)
6767
{
6868
Throw.IfNull(message);
69+
// If the underlying writer has been disposed, just drop the message.
6970
await _sseWriter.SendMessageAsync(message, cancellationToken).ConfigureAwait(false);
7071
}
7172

src/ModelContextProtocol.Core/Server/SseWriter.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,22 +47,22 @@ public Task WriteAllAsync(Stream sseResponseStream, CancellationToken cancellati
4747
return _writeTask;
4848
}
4949

50-
public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken cancellationToken = default)
50+
public async Task<bool> SendMessageAsync(JsonRpcMessage message, CancellationToken cancellationToken = default)
5151
{
5252
Throw.IfNull(message);
5353

5454
using var _ = await _disposeLock.LockAsync(cancellationToken).ConfigureAwait(false);
5555

5656
if (_disposed)
5757
{
58-
// Don't throw an ODE, because this is disposed internally when the transport disconnects due to an abort
59-
// or sending all the responses for the a give given Streamable HTTP POST request, so the user might not be at fault.
60-
// There's precedence for no-oping here similar to writing to the response body of an aborted request in ASP.NET Core.
61-
return;
58+
// Don't throw ObjectDisposedException here; just return false to indicate the message wasn't sent.
59+
// The calling transport can determine what to do in this case (drop the message, or fall back to another transport).
60+
return false;
6261
}
6362

6463
// Emit redundant "event: message" lines for better compatibility with other SDKs.
6564
await _messages.Writer.WriteAsync(new SseItem<JsonRpcMessage?>(message, SseParser.EventTypeDefault), cancellationToken).ConfigureAwait(false);
65+
return true;
6666
}
6767

6868
public async ValueTask DisposeAsync()

src/ModelContextProtocol.Core/Server/StreamableHttpPostTransport.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,13 @@ public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken can
7272
throw new InvalidOperationException("Server to client requests are not supported in stateless mode.");
7373
}
7474

75-
await _sseWriter.SendMessageAsync(message, cancellationToken).ConfigureAwait(false);
75+
bool isAccepted = await _sseWriter.SendMessageAsync(message, cancellationToken).ConfigureAwait(false);
76+
if (!isAccepted)
77+
{
78+
// The underlying writer didn't accept the message because the underlying request has completed.
79+
// Rather than drop the message, fall back to sending it via the parent transport.
80+
await parentTransport.SendMessageAsync(message, cancellationToken).ConfigureAwait(false);
81+
}
7682
}
7783

7884
public async ValueTask DisposeAsync()

src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken can
131131
throw new InvalidOperationException("Unsolicited server to client messages are not supported in stateless mode.");
132132
}
133133

134+
// If the underlying writer has been disposed, just drop the message.
134135
await _sseWriter.SendMessageAsync(message, cancellationToken).ConfigureAwait(false);
135136
}
136137

tests/ModelContextProtocol.AspNetCore.Tests/StreamableHttpServerConformanceTests.cs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,41 @@ public async Task IdleSessionsPastMaxIdleSessionCount_ArePruned_LongestIdleFirst
521521
Assert.StartsWith("MaxIdleSessionCount of 2 exceeded. Closing idle session", idleLimitLogMessage.Message);
522522
}
523523

524+
[Fact]
525+
public async Task McpServer_UsedOutOfScope_CanSendNotifications()
526+
{
527+
McpServer? capturedServer = null;
528+
Builder.Services.AddMcpServer()
529+
.WithHttpTransport()
530+
.WithListResourcesHandler((_, _) => ValueTask.FromResult(new ListResourcesResult()))
531+
.WithSubscribeToResourcesHandler((context, token) =>
532+
{
533+
capturedServer = context.Server;
534+
return ValueTask.FromResult(new EmptyResult());
535+
});
536+
537+
await StartAsync();
538+
539+
string sessionId = await CallInitializeAndValidateAsync();
540+
SetSessionId(sessionId);
541+
542+
// Call the subscribe method to capture the McpServer instance.
543+
using var response = await HttpClient.PostAsync("", JsonContent(Request("resources/subscribe")), TestContext.Current.CancellationToken);
544+
var rpcResponse = await AssertSingleSseResponseAsync(response);
545+
AssertType<EmptyResult>(rpcResponse.Result);
546+
Assert.NotNull(capturedServer);
547+
548+
// Check the captured McpServer instance can send a notification.
549+
await capturedServer.SendNotificationAsync(NotificationMethods.ResourceUpdatedNotification, TestContext.Current.CancellationToken);
550+
using var getResponse = await HttpClient.GetAsync("", HttpCompletionOption.ResponseHeadersRead, TestContext.Current.CancellationToken);
551+
JsonRpcMessage? firstSseMessage = await ReadSseAsync(getResponse.Content)
552+
.Select(data => JsonSerializer.Deserialize<JsonRpcMessage>(data, McpJsonUtilities.DefaultOptions))
553+
.FirstOrDefaultAsync(TestContext.Current.CancellationToken);
554+
555+
var notification = Assert.IsType<JsonRpcNotification>(firstSseMessage);
556+
Assert.Equal(NotificationMethods.ResourceUpdatedNotification, notification.Method);
557+
}
558+
524559
private static StringContent JsonContent(string json) => new StringContent(json, Encoding.UTF8, "application/json");
525560
private static JsonTypeInfo<T> GetJsonTypeInfo<T>() => (JsonTypeInfo<T>)McpJsonUtilities.DefaultOptions.GetTypeInfo(typeof(T));
526561

0 commit comments

Comments
 (0)