@@ -30,14 +30,7 @@ class LaravelStreamableHttpTransport implements ServerTransportInterface
3030 public function __construct (
3131 protected SessionManager $ sessionManager ,
3232 protected ?EventStoreInterface $ eventStore = null
33- ) {
34- $ this ->on ('message ' , function (Message $ message , string $ sessionId ) {
35- $ session = $ this ->sessionManager ->getSession ($ sessionId );
36- if ($ session !== null ) {
37- $ session ->save ();
38- }
39- });
40- }
33+ ) {}
4134
4235 protected function generateId (): string
4336 {
@@ -59,14 +52,14 @@ public function sendMessage(Message $message, string $sessionId, array $context
5952
6053 $ eventId = null ;
6154 if ($ this ->eventStore && isset ($ context ['type ' ]) && in_array ($ context ['type ' ], ['get_sse ' , 'post_sse ' ])) {
62- $ streamKey = $ context ['type ' ] === ' get_sse ' ? " get_stream_ { $ sessionId }" : $ context [ ' streamId ' ] ?? " post_stream_ { $ sessionId }" ;
63- $ eventId = $ this ->eventStore ->storeEvent ($ streamKey , $ rawMessage );
55+ $ streamId = $ context ['streamId ' ];
56+ $ eventId = $ this ->eventStore ->storeEvent ($ streamId , $ rawMessage );
6457 }
6558
6659 $ messageData = [
6760 'id ' => $ eventId ?? $ this ->generateId (),
6861 'data ' => $ rawMessage ,
69- 'context ' => $ context[ ' type ' ] ?? ' get_sse ' ,
62+ 'context ' => $ context ,
7063 'timestamp ' => time ()
7164 ];
7265
@@ -161,35 +154,26 @@ protected function handleJsonResponse(Message $message, string $sessionId, array
161154 $ context ['type ' ] = 'post_json ' ;
162155 $ this ->emit ('message ' , [$ message , $ sessionId , $ context ]);
163156
164- $ maxWaitTime = config ('mcp.transports.http_integrated.json_response_timeout ' , 30 );
165- $ pollInterval = 0.1 ; // 100ms
166- $ waitedTime = 0 ;
167-
168- while ($ waitedTime < $ maxWaitTime ) {
169- $ messages = $ this ->dequeueMessagesForContext ($ sessionId , 'post_json ' );
170-
171- if (!empty ($ messages )) {
172- $ responseMessage = $ messages [0 ];
173- $ data = $ responseMessage ['data ' ];
157+ $ messages = $ this ->dequeueMessagesForContext ($ sessionId , 'post_json ' );
174158
175- $ headers = [
176- ' Content-Type ' => ' application/json ' ,
177- ... $ this ->getCorsHeaders ()
178- ];
159+ if ( empty ( $ messages )) {
160+ $ error = Error:: forInternalError ( ' Internal error ' );
161+ return response ()-> json ( $ error , 500 , $ this ->getCorsHeaders ());
162+ }
179163
180- if ($ context ['is_initialize_request ' ] ?? false ) {
181- $ headers ['Mcp-Session-Id ' ] = $ sessionId ;
182- }
164+ $ responseMessage = $ messages [0 ];
165+ $ data = $ responseMessage ['data ' ];
183166
184- return response ()->make ($ data , 200 , $ headers );
185- }
167+ $ headers = [
168+ 'Content-Type ' => 'application/json ' ,
169+ ...$ this ->getCorsHeaders ()
170+ ];
186171
187- usleep (( int )( $ pollInterval * 1000000 ));
188- $ waitedTime + = $ pollInterval ;
172+ if ( $ context [ ' is_initialize_request ' ] ?? false ) {
173+ $ headers [ ' Mcp-Session-Id ' ] = $ sessionId ;
189174 }
190175
191- $ error = Error::forInternalError ('Request timeout ' );
192- return response ()->json ($ error , 504 , $ this ->getCorsHeaders ());
176+ return response ()->make ($ data , 200 , $ headers );
193177 } catch (Throwable $ e ) {
194178 Log::error ('JSON response mode error ' , ['exception ' => $ e ]);
195179 $ error = Error::forInternalError ('Internal error ' );
@@ -202,46 +186,29 @@ protected function handleJsonResponse(Message $message, string $sessionId, array
202186 */
203187 protected function handleSseResponse (Message $ message , string $ sessionId , int $ nRequests , array $ context ): StreamedResponse
204188 {
205- $ streamId = $ this ->generateId ();
206- $ context ['type ' ] = 'post_sse ' ;
207- $ context ['streamId ' ] = $ streamId ;
208- $ context ['nRequests ' ] = $ nRequests ;
209-
210- $ this ->emit ('message ' , [$ message , $ sessionId , $ context ]);
211-
212- return response ()->stream (function () use ($ sessionId , $ nRequests , $ streamId ) {
213- $ responsesSent = 0 ;
214- $ maxWaitTime = 30 ; // 30 seconds timeout
215- $ pollInterval = 0.1 ; // 100ms
216- $ waitedTime = 0 ;
189+ $ headers = array_merge ([
190+ 'Content-Type ' => 'text/event-stream ' ,
191+ 'Cache-Control ' => 'no-cache ' ,
192+ 'Connection ' => 'keep-alive ' ,
193+ 'X-Accel-Buffering ' => 'no ' ,
194+ ], $ this ->getCorsHeaders ());
217195
218- while ($ responsesSent < $ nRequests && $ waitedTime < $ maxWaitTime ) {
219- if (connection_aborted ()) {
220- break ;
221- }
196+ if ($ context ['is_initialize_request ' ] ?? false ) {
197+ $ headers ['Mcp-Session-Id ' ] = $ sessionId ;
198+ }
222199
223- $ messages = $ this ->dequeueMessagesForContext ($ sessionId , 'post_sse ' , $ streamId );
200+ return response ()->stream (function () use ($ sessionId , $ nRequests , $ message , $ context ) {
201+ $ streamId = $ this ->generateId ();
202+ $ context ['type ' ] = 'post_sse ' ;
203+ $ context ['streamId ' ] = $ streamId ;
204+ $ context ['nRequests ' ] = $ nRequests ;
224205
225- foreach ($ messages as $ messageData ) {
226- $ this ->sendSseEvent ($ messageData ['data ' ], $ messageData ['id ' ]);
227- $ responsesSent ++;
206+ $ this ->emit ('message ' , [$ message , $ sessionId , $ context ]);
228207
229- if ($ responsesSent >= $ nRequests ) {
230- break ;
231- }
232- }
208+ $ messages = $ this ->dequeueMessagesForContext ($ sessionId , 'post_sse ' , $ streamId );
233209
234- if ($ responsesSent < $ nRequests ) {
235- usleep ((int )($ pollInterval * 1000000 ));
236- $ waitedTime += $ pollInterval ;
237- }
238- }
239- }, headers: array_merge ([
240- 'Content-Type ' => 'text/event-stream ' ,
241- 'Cache-Control ' => 'no-cache ' ,
242- 'Connection ' => 'keep-alive ' ,
243- 'X-Accel-Buffering ' => 'no ' ,
244- ], $ this ->getCorsHeaders ()));
210+ $ this ->sendSseEvent ($ messages [0 ]['data ' ], $ messages [0 ]['id ' ]);
211+ }, headers: $ headers );
245212 }
246213
247214 /**
@@ -325,20 +292,21 @@ public function handleDeleteRequest(Request $request): Response
325292 /**
326293 * Dequeue messages for specific context, requeue others
327294 */
328- protected function dequeueMessagesForContext (string $ sessionId , string $ context , ?string $ streamId = null ): array
295+ protected function dequeueMessagesForContext (string $ sessionId , string $ type , ?string $ streamId = null ): array
329296 {
330297 $ allMessages = $ this ->sessionManager ->dequeueMessages ($ sessionId );
331298 $ contextMessages = [];
332299 $ requeueMessages = [];
333300
334301 foreach ($ allMessages as $ rawMessage ) {
335302 $ messageData = json_decode ($ rawMessage , true );
303+ $ context = $ messageData ['context ' ] ?? [];
336304
337- if ($ messageData && isset ( $ messageData [ ' context ' ]) ) {
338- $ matchesContext = $ messageData [ ' context ' ] === $ context ;
305+ if ($ messageData ) {
306+ $ matchesContext = $ context [ ' type ' ] === $ type ;
339307
340- if ($ context === 'post_sse ' && $ streamId ) {
341- $ matchesContext = $ matchesContext && isset ($ messageData ['streamId ' ]) && $ messageData ['streamId ' ] === $ streamId ;
308+ if ($ type === 'post_sse ' && $ streamId ) {
309+ $ matchesContext = $ matchesContext && isset ($ context ['streamId ' ]) && $ context ['streamId ' ] === $ streamId ;
342310 }
343311
344312 if ($ matchesContext ) {
@@ -412,6 +380,15 @@ private function flushOutput(): void
412380 @flush ();
413381 }
414382
383+ protected function collectSessionGarbage (): void
384+ {
385+ $ lottery = config ('mcp.session.lottery ' , [2 , 100 ]);
386+
387+ if (random_int (1 , $ lottery [1 ]) <= $ lottery [0 ]) {
388+ $ this ->sessionManager ->gc ();
389+ }
390+ }
391+
415392 /**
416393 * Get CORS headers
417394 */
0 commit comments