11use crate :: lsp:: protocol:: * ;
2+ use crate :: model:: entities:: DiagnosticEvent ;
23use crate :: types:: LanguageServerConfig ;
34use anyhow:: Result ;
45use lsp_types:: * ;
@@ -7,8 +8,9 @@ use std::collections::HashMap;
78use std:: process:: Stdio ;
89use std:: sync:: Arc ;
910use tokio:: io:: BufReader ;
11+ use tokio:: io:: AsyncBufReadExt ;
1012use tokio:: process:: Command ;
11- use tokio:: sync:: { oneshot, Mutex } ;
13+ use tokio:: sync:: { oneshot, Mutex , broadcast } ;
1214use tracing:: { debug, error} ;
1315use url:: Url ;
1416
@@ -20,11 +22,14 @@ type ResponseCallback = Box<dyn FnOnce(Result<Value>) + Send>;
2022/// - Symbol finding and navigation
2123/// - Code formatting and refactoring
2224/// - Document lifecycle management
25+ /// - Diagnostic notifications (push model)
2326pub struct LspClient {
2427 stdin : Arc < Mutex < tokio:: process:: ChildStdin > > ,
2528 pending_requests : Arc < Mutex < HashMap < u64 , ResponseCallback > > > ,
29+ diagnostic_sender : broadcast:: Sender < DiagnosticEvent > ,
2630 next_id : Arc < Mutex < u64 > > ,
2731 config : LanguageServerConfig ,
32+ init_result : Arc < Mutex < Option < InitializeResult > > > ,
2833}
2934
3035impl LspClient {
@@ -52,14 +57,35 @@ impl LspClient {
5257 . stdout
5358 . take ( )
5459 . ok_or_else ( || anyhow:: anyhow!( "No stdout" ) ) ?;
60+ let stderr = child
61+ . stderr
62+ . take ( )
63+ . ok_or_else ( || anyhow:: anyhow!( "No stderr" ) ) ?;
64+
65+ // Create broadcast channel for diagnostics (capacity of 100 events)
66+ let ( diagnostic_sender, _) = broadcast:: channel ( 100 ) ;
5567
5668 let client = Self {
5769 stdin : Arc :: new ( Mutex :: new ( stdin) ) ,
5870 pending_requests : Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ,
71+ diagnostic_sender,
5972 next_id : Arc :: new ( Mutex :: new ( 1 ) ) ,
60- config,
73+ config : config. clone ( ) ,
74+ init_result : Arc :: new ( Mutex :: new ( None ) ) ,
6175 } ;
6276
77+ // Start stderr monitoring
78+ let server_name = config. name . clone ( ) ;
79+ tokio:: spawn ( async move {
80+ let mut reader = BufReader :: new ( stderr) ;
81+ let mut line = String :: new ( ) ;
82+ while let Ok ( n) = reader. read_line ( & mut line) . await {
83+ if n == 0 { break ; }
84+ tracing:: error!( "LSP {} stderr: {}" , server_name, line. trim( ) ) ;
85+ line. clear ( ) ;
86+ }
87+ } ) ;
88+
6389 client. start_message_handler ( stdout) . await ;
6490 Ok ( client)
6591 }
@@ -72,13 +98,17 @@ impl LspClient {
7298 /// # Returns
7399 /// * `Result<InitializeResult>` - Server capabilities or initialization error
74100 pub async fn initialize ( & self , root_uri : Url ) -> Result < InitializeResult > {
101+ tracing:: debug!( "Initializing LSP client for workspace: {}" , root_uri) ;
102+
75103 let ( tx, rx) = oneshot:: channel ( ) ;
76104
77105 let init_params = crate :: lsp:: LspConfig :: build_initialize_params (
78- root_uri,
106+ root_uri. clone ( ) ,
79107 self . config . initialization_options . clone ( ) ,
80108 ) ;
81109
110+ tracing:: debug!( "Sending initialize request to LSP server: {}" , self . config. name) ;
111+
82112 self . send_request ( "initialize" , json ! ( init_params) , move |result| {
83113 let _ = tx. send ( result) ;
84114 } )
@@ -87,10 +117,29 @@ impl LspClient {
87117 let result = rx. await ??;
88118 let init_result: InitializeResult = serde_json:: from_value ( result) ?;
89119
120+ // Store the initialization result
121+ * self . init_result . lock ( ) . await = Some ( init_result. clone ( ) ) ;
122+
123+ tracing:: debug!( "Sending initialized notification to LSP server: {}" , self . config. name) ;
90124 self . send_notification ( "initialized" , json ! ( { } ) ) . await ?;
125+
126+ tracing:: debug!( "LSP client initialization completed for: {}" , self . config. name) ;
91127 Ok ( init_result)
92128 }
93129
130+ /// Subscribe to diagnostic notifications from the language server
131+ ///
132+ /// # Returns
133+ /// * `broadcast::Receiver<DiagnosticEvent>` - Receiver for diagnostic events
134+ pub fn subscribe_diagnostics ( & self ) -> broadcast:: Receiver < DiagnosticEvent > {
135+ self . diagnostic_sender . subscribe ( )
136+ }
137+
138+ /// Get the server capabilities from initialization
139+ pub async fn get_server_capabilities ( & self ) -> Option < ServerCapabilities > {
140+ self . init_result . lock ( ) . await . as_ref ( ) . map ( |result| result. capabilities . clone ( ) )
141+ }
142+
94143 /// Navigate to symbol definition
95144 ///
96145 /// # Arguments
@@ -221,6 +270,21 @@ impl LspClient {
221270 . await
222271 }
223272
273+ /// Request diagnostics for a document (pull model)
274+ ///
275+ /// # Arguments
276+ /// * `params` - Document diagnostic parameters
277+ ///
278+ /// # Returns
279+ /// * `Result<Option<DocumentDiagnosticReport>>` - Diagnostic report or None
280+ pub async fn document_diagnostic (
281+ & self ,
282+ params : DocumentDiagnosticParams ,
283+ ) -> Result < Option < DocumentDiagnosticReport > > {
284+ self . send_lsp_request ( "textDocument/diagnostic" , params)
285+ . await
286+ }
287+
224288 /// Generic LSP request handler with automatic response parsing
225289 async fn send_lsp_request < T , R > ( & self , method : & str , params : T ) -> Result < Option < R > >
226290 where
@@ -254,11 +318,12 @@ impl LspClient {
254318 /// Start background task to handle LSP messages from server
255319 async fn start_message_handler ( & self , stdout : tokio:: process:: ChildStdout ) {
256320 let pending_requests = self . pending_requests . clone ( ) ;
321+ let diagnostic_sender = self . diagnostic_sender . clone ( ) ;
257322 tokio:: spawn ( async move {
258323 let mut reader = BufReader :: new ( stdout) ;
259324
260325 while let Ok ( content) = read_lsp_message ( & mut reader) . await {
261- if let Err ( e) = Self :: process_message ( & content, & pending_requests) . await {
326+ if let Err ( e) = Self :: process_message ( & content, & pending_requests, & diagnostic_sender ) . await {
262327 error ! ( "Failed to process LSP message: {}" , e) ;
263328 }
264329 }
@@ -270,11 +335,51 @@ impl LspClient {
270335 async fn process_message (
271336 content : & str ,
272337 pending_requests : & Arc < Mutex < HashMap < u64 , ResponseCallback > > > ,
338+ diagnostic_sender : & broadcast:: Sender < DiagnosticEvent > ,
273339 ) -> Result < ( ) > {
274340 let message = parse_lsp_message ( content) ?;
341+
342+ // Debug: Log all incoming messages
343+ debug ! ( "LSP message received: method={}, has_id={}" , message. method, message. id. is_some( ) ) ;
344+
345+ // Handle notifications (no ID)
346+ if message. id . is_none ( ) {
347+ match message. method . as_str ( ) {
348+ "textDocument/publishDiagnostics" => {
349+ debug ! ( "Processing publishDiagnostics notification" ) ;
350+ if let Some ( params) = message. params {
351+ match serde_json:: from_value :: < PublishDiagnosticsParams > ( params) {
352+ Ok ( diagnostic_params) => {
353+ let event = DiagnosticEvent {
354+ uri : diagnostic_params. uri . to_string ( ) ,
355+ diagnostics : diagnostic_params. diagnostics ,
356+ } ;
357+
358+ debug ! ( "Sending diagnostic event: uri={}, count={}" , event. uri, event. diagnostics. len( ) ) ;
359+
360+ // Send to broadcast channel (ignore if no receivers)
361+ match diagnostic_sender. send ( event) {
362+ Ok ( _) => debug ! ( "Diagnostic event sent successfully" ) ,
363+ Err ( e) => error ! ( "Failed to send diagnostic event: {}" , e) ,
364+ }
365+ }
366+ Err ( e) => {
367+ error ! ( "Failed to parse publishDiagnostics params: {}" , e) ;
368+ }
369+ }
370+ }
371+ }
372+ _ => {
373+ // Other notifications - just log for now
374+ debug ! ( "Received LSP notification: {}" , message. method) ;
375+ }
376+ }
377+ return Ok ( ( ) ) ;
378+ }
275379
380+ // Handle responses (with ID)
276381 let Some ( id) = message. id . and_then ( |id| id. as_u64 ( ) ) else {
277- return Ok ( ( ) ) ; // Notification or invalid ID
382+ return Ok ( ( ) ) ; // Invalid ID
278383 } ;
279384
280385 let Some ( callback) = pending_requests. lock ( ) . await . remove ( & id) else {
0 commit comments