@@ -11,8 +11,10 @@ import (
1111 "encoding/json"
1212 "fmt"
1313 "log"
14+ "maps"
1415 "net"
1516 "os"
17+ "slices"
1618 "strconv"
1719 "strings"
1820 "sync"
@@ -30,52 +32,60 @@ import (
3032 "golang.org/x/tools/internal/jsonrpc2"
3133)
3234
33- // SessionEventType differentiates between new and exiting sessions.
34- type SessionEventType int
35-
36- const (
37- SessionStart SessionEventType = iota
38- SessionEnd
39- )
40-
41- // SessionEvent holds information about the session event.
42- type SessionEvent struct {
43- Type SessionEventType
44- Session * cache.Session
45- Server protocol.Server
46- }
47-
4835// Unique identifiers for client/server.
4936var serverIndex int64
5037
5138// The streamServer type is a jsonrpc2.streamServer that handles incoming
5239// streams as a new LSP session, using a shared cache.
53- type streamServer struct {
40+ type StreamServer struct {
5441 cache * cache.Cache
5542 // daemon controls whether or not to log new connections.
5643 daemon bool
5744
5845 // optionsOverrides is passed to newly created sessions.
5946 optionsOverrides func (* settings.Options )
6047
48+ // onSessionExit is called whenever a session exits, with the session ID.
49+ onSessionExit func (id string )
50+
6151 // serverForTest may be set to a test fake for testing.
6252 serverForTest protocol.Server
6353
64- // eventChan is an optional channel for LSP server session lifecycle events,
65- // including session creation and termination. If nil, no events are sent.
66- eventChan chan SessionEvent
54+ // Keep track of active sessions, for interrogation.
55+ sessionMu sync.Mutex
56+ sessions map [string ]sessionServer
57+ }
58+
59+ type sessionServer struct {
60+ session * cache.Session
61+ server protocol.Server
6762}
6863
6964// NewStreamServer creates a StreamServer using the shared cache. If
7065// withTelemetry is true, each session is instrumented with telemetry that
7166// records RPC statistics.
72- func NewStreamServer (cache * cache.Cache , daemon bool , eventChan chan SessionEvent , optionsFunc func (* settings.Options )) jsonrpc2.StreamServer {
73- return & streamServer {cache : cache , daemon : daemon , eventChan : eventChan , optionsOverrides : optionsFunc }
67+ func NewStreamServer (cache * cache.Cache , daemon bool , optionsFunc func (* settings.Options )) * StreamServer {
68+ return & StreamServer {
69+ cache : cache ,
70+ daemon : daemon ,
71+ optionsOverrides : optionsFunc ,
72+ sessions : make (map [string ]sessionServer ),
73+ }
74+ }
75+
76+ // SetSessionExitFunc sets the function to call when sessions exit.
77+ // It is not concurrency safe, and must only be called at most once, before the
78+ // receiver is passed to jsonrpc2.Serve.
79+ func (s * StreamServer ) SetSessionExitFunc (f func (id string )) {
80+ if s .onSessionExit != nil {
81+ panic ("duplicate call to SetSessionExitFunc" )
82+ }
83+ s .onSessionExit = f
7484}
7585
7686// ServeStream implements the jsonrpc2.StreamServer interface, by handling
7787// incoming streams using a new lsp server.
78- func (s * streamServer ) ServeStream (ctx context.Context , conn jsonrpc2.Conn ) error {
88+ func (s * StreamServer ) ServeStream (ctx context.Context , conn jsonrpc2.Conn ) error {
7989 client := protocol .ClientDispatcher (conn )
8090 session := cache .NewSession (ctx , s .cache )
8191 svr := s .serverForTest
@@ -86,6 +96,18 @@ func (s *streamServer) ServeStream(ctx context.Context, conn jsonrpc2.Conn) erro
8696 instance .AddService (svr , session )
8797 }
8898 }
99+ s .sessionMu .Lock ()
100+ s .sessions [session .ID ()] = sessionServer {session , svr }
101+ s .sessionMu .Unlock ()
102+ defer func () {
103+ s .sessionMu .Lock ()
104+ delete (s .sessions , session .ID ())
105+ s .sessionMu .Unlock ()
106+ if s .onSessionExit != nil {
107+ s .onSessionExit (session .ID ())
108+ }
109+ }()
110+
89111 // Clients may or may not send a shutdown message. Make sure the server is
90112 // shut down.
91113 // TODO(rFindley): this shutdown should perhaps be on a disconnected context.
@@ -106,21 +128,6 @@ func (s *streamServer) ServeStream(ctx context.Context, conn jsonrpc2.Conn) erro
106128 protocol .ServerHandler (svr ,
107129 jsonrpc2 .MethodNotFound ))))
108130
109- if s .eventChan != nil {
110- s .eventChan <- SessionEvent {
111- Session : session ,
112- Type : SessionStart ,
113- Server : svr ,
114- }
115- defer func () {
116- s .eventChan <- SessionEvent {
117- Session : session ,
118- Type : SessionEnd ,
119- Server : svr ,
120- }
121- }()
122- }
123-
124131 if s .daemon {
125132 log .Printf ("Session %s: connected" , session .ID ())
126133 defer log .Printf ("Session %s: exited" , session .ID ())
@@ -130,6 +137,29 @@ func (s *streamServer) ServeStream(ctx context.Context, conn jsonrpc2.Conn) erro
130137 return conn .Err ()
131138}
132139
140+ // Session returns the current active session for the given id, or (nil, nil)
141+ // if none exists.
142+ func (s * StreamServer ) Session (id string ) (* cache.Session , protocol.Server ) {
143+ s .sessionMu .Lock ()
144+ defer s .sessionMu .Unlock ()
145+ ss := s .sessions [id ]
146+ return ss .session , ss .server // possibly nil for zero value
147+ }
148+
149+ // FirstSession returns the first session by lexically sorted session ID, or
150+ // (nil, nil).
151+ func (s * StreamServer ) FirstSession () (* cache.Session , protocol.Server ) {
152+ s .sessionMu .Lock ()
153+ defer s .sessionMu .Unlock ()
154+ keys := slices .Collect (maps .Keys (s .sessions ))
155+ if len (keys ) == 0 {
156+ return nil , nil
157+ }
158+ id := slices .Min (keys )
159+ ss := s .sessions [id ]
160+ return ss .session , ss .server
161+ }
162+
133163// A forwarder is a jsonrpc2.StreamServer that handles an LSP stream by
134164// forwarding it to a remote. This is used when the gopls process started by
135165// the editor is in the `-remote` mode, which means it finds and connects to a
0 commit comments