22
33import asyncio
44import itertools
5+ import threading
56import uuid
67from dataclasses import dataclass , field
78from enum import Enum
@@ -74,10 +75,34 @@ class WorkspaceDocumentsResult:
7475
7576@dataclass
7677class DiagnosticsData :
77- id : str
78+ id : str = field ( default_factory = lambda : str ( uuid . uuid4 ()))
7879 entries : Dict [Any , Optional [List [Diagnostic ]]] = field (default_factory = dict )
7980 version : Optional [int ] = None
8081 task : Optional [asyncio .Task [Any ]] = None
82+ force : bool = False
83+
84+
85+ def _cancel_all_tasks (loop : asyncio .AbstractEventLoop ) -> None :
86+ to_cancel = asyncio .all_tasks (loop )
87+ if not to_cancel :
88+ return
89+
90+ for task in to_cancel :
91+ task .cancel ()
92+
93+ loop .run_until_complete (asyncio .gather (* to_cancel , loop = loop , return_exceptions = True ))
94+
95+ for task in to_cancel :
96+ if task .cancelled ():
97+ continue
98+ if task .exception () is not None :
99+ loop .call_exception_handler (
100+ {
101+ "message" : "unhandled exception during asyncio.run() shutdown" ,
102+ "exception" : task .exception (),
103+ "task" : task ,
104+ }
105+ )
81106
82107
83108class DiagnosticsProtocolPart (LanguageServerProtocolPart , HasExtendCapabilities ):
@@ -97,18 +122,63 @@ def __init__(self, protocol: LanguageServerProtocol) -> None:
97122
98123 self ._current_workspace_task : Optional [asyncio .Task [WorkspaceDiagnosticReport ]] = None
99124
125+ self ._diagnostics_loop : Optional [asyncio .AbstractEventLoop ] = None
126+ self ._diagnostics_loop_lock = threading .RLock ()
127+ self ._diagnostics_started = threading .Event ()
128+
129+ self .parent .on_initialized .add (self .initialized )
130+
100131 self .in_get_workspace_diagnostics = Event (True )
101132
133+ async def initialized (self , sender : Any ) -> None :
134+ self ._ensure_diagnostics_thread_started ()
135+
136+ @property
137+ def diagnostics_loop (self ) -> asyncio .AbstractEventLoop :
138+ if self ._diagnostics_loop is None :
139+ self ._ensure_diagnostics_thread_started ()
140+
141+ assert self ._diagnostics_loop is not None
142+
143+ return self ._diagnostics_loop
144+
145+ def _run_diagnostics (self ) -> None :
146+ loop = asyncio .new_event_loop ()
147+ asyncio .set_event_loop (loop )
148+ try :
149+ self ._diagnostics_loop = loop
150+ self ._diagnostics_started .set ()
151+
152+ loop .slow_callback_duration = 10
153+
154+ loop .run_forever ()
155+ _cancel_all_tasks (loop )
156+ loop .run_until_complete (loop .shutdown_asyncgens ())
157+ finally :
158+ asyncio .set_event_loop (None )
159+ loop .close ()
160+
161+ def _ensure_diagnostics_thread_started (self ) -> None :
162+ with self ._diagnostics_loop_lock :
163+ if self ._diagnostics_loop is None :
164+ self ._server_thread = threading .Thread (
165+ name = "diagnostics_worker" , target = self ._run_diagnostics , daemon = True
166+ )
167+
168+ self ._server_thread .start ()
169+
170+ if not self ._diagnostics_started .wait (10 ):
171+ raise RuntimeError ("Can't start diagnostics worker thread." )
172+
102173 def extend_capabilities (self , capabilities : ServerCapabilities ) -> None :
103174 if (
104175 self .parent .client_capabilities is not None
105176 and self .parent .client_capabilities .text_document is not None
106177 and self .parent .client_capabilities .text_document .diagnostic is not None
107178 ):
108- # capabilities.diagnostic_provider = None
109179 capabilities .diagnostic_provider = DiagnosticOptions (
110180 inter_file_dependencies = True ,
111- workspace_diagnostics = True ,
181+ workspace_diagnostics = False ,
112182 identifier = f"robotcodelsp_{ uuid .uuid4 ()} " ,
113183 work_done_progress = True ,
114184 )
@@ -117,19 +187,6 @@ def extend_capabilities(self, capabilities: ServerCapabilities) -> None:
117187 async def collect (sender , document : TextDocument ) -> DiagnosticsResult : # NOSONAR
118188 ...
119189
120- @async_tasking_event_iterator
121- async def collect_document_has_diagnostics (sender , document : TextDocument ) -> bool : # NOSONAR
122- ...
123-
124- async def document_has_diagnostics (self , document : TextDocument ) -> bool : # NOSONAR
125- async for result in self .collect_document_has_diagnostics (
126- self , document , callback_filter = language_id_filter (document )
127- ):
128- if result :
129- return True
130-
131- return False
132-
133190 @async_tasking_event
134191 async def load_workspace_documents (sender ) -> List [WorkspaceDocumentsResult ]: # NOSONAR
135192 ...
@@ -167,10 +224,22 @@ async def ensure_workspace_loaded(self) -> None:
167224 self ._workspace_loaded = True
168225 self .workspace_loaded_event .set ()
169226 await self .on_workspace_loaded (self )
170- await self .refresh ()
227+ await self .force_refresh_all ()
228+
229+ async def force_refresh_all (self ) -> None :
230+ for doc in self .parent .documents .documents :
231+ self .get_diagnostics_data (doc ).force = True
232+
233+ await self .refresh ()
234+
235+ async def force_refresh_document (self , document : TextDocument ) -> None :
236+ self .get_diagnostics_data (document ).force = True
237+ if document .opened_in_editor :
238+ await self .refresh ()
171239
172240 @_logger .call
173- async def _get_diagnostics (self , document : TextDocument , data : DiagnosticsData ) -> None :
241+ async def _get_diagnostics_for_document (self , document : TextDocument , data : DiagnosticsData ) -> None :
242+ self ._logger .debug (lambda : f"Get diagnostics for { document } " )
174243
175244 await asyncio .sleep (0.75 )
176245
@@ -193,12 +262,11 @@ async def _get_diagnostics(self, document: TextDocument, data: DiagnosticsData)
193262 if result .diagnostics is not None :
194263 collected_keys .append (result .key )
195264
196- await self .refresh ()
265+ if document .opened_in_editor :
266+ await self .refresh ()
197267
198268 except asyncio .CancelledError :
199- self ._logger .critical (lambda : f"_get_diagnostics cancelled for { document } " )
200- else :
201- await self .refresh ()
269+ self ._logger .debug (lambda : f"_get_diagnostics cancelled for { document } " )
202270 finally :
203271 for k in set (data .entries .keys ()) - set (collected_keys ):
204272 data .entries .pop (k )
@@ -214,50 +282,41 @@ async def _text_document_diagnostic(
214282 ** kwargs : Any ,
215283 ) -> DocumentDiagnosticReport :
216284 try :
217- # if not self.workspace_loaded_event.is_set():
218- # raise JsonRPCErrorException(
219- # ErrorCodes.SERVER_CANCELLED,
220- # "Workspace not loaded.",
221- # data=DiagnosticServerCancellationData(True),
222- # )
223-
224285 document = await self .parent .documents .get (text_document .uri )
225286 if document is None :
226- raise JsonRPCErrorException (ErrorCodes .INVALID_PARAMS , f"Document { text_document !r} not found." )
287+ raise JsonRPCErrorException (ErrorCodes .SERVER_CANCELLED , f"Document { text_document !r} not found." )
227288
228- data : DiagnosticsData = document . get_data ( self , None )
289+ data = self . get_diagnostics_data ( document )
229290
230- if data is None :
231- data = DiagnosticsData (str (uuid .uuid4 ()))
232- document .set_data (self , data )
233-
234- if (
235- document .version != data .version
236- or data .task is None
237- or not await self .document_has_diagnostics (document )
238- ):
291+ if data .force or document .version != data .version or data .task is None :
239292
240293 task = data .task
241294
242- data = DiagnosticsData (str ( uuid . uuid4 ()) )
295+ data = DiagnosticsData ()
243296 document .set_data (self , data )
244297
245298 if task is not None and not task .done ():
246- self ._logger .critical (lambda : f"try to cancel diagnostics for { document } " )
299+ self ._logger .debug (lambda : f"try to cancel diagnostics for { document } " )
247300 task .get_loop ().call_soon_threadsafe (task .cancel )
248301
249302 data .version = document .version
250303 data .task = create_sub_task (
251- self ._get_diagnostics (document , data ), loop = self .parent .loop , name = f"diagnostics ${ text_document } "
304+ self ._get_diagnostics_for_document (document , data ),
305+ loop = self .diagnostics_loop ,
306+ name = f"diagnostics ${ text_document } " ,
252307 )
253308
254309 def done (t : asyncio .Task [Any ]) -> None :
255- if t .cancelled ():
256- self ._logger .critical (lambda : f"diagnostics for { document } canceled" )
257- try :
258- t .exception ()
259- except asyncio .CancelledError :
260- pass
310+
311+ self ._logger .debug (lambda : f"diagnostics for { document } { 'canceled' if t .cancelled () else 'ended' } " )
312+ try :
313+ t .result ()
314+ except asyncio .CancelledError :
315+ pass
316+ except (SystemExit , KeyboardInterrupt ):
317+ raise
318+ except BaseException as e :
319+ self ._logger .exception (e )
261320
262321 data .task .add_done_callback (done )
263322
@@ -268,9 +327,18 @@ def done(t: asyncio.Task[Any]) -> None:
268327 list (itertools .chain (* (e for e in data .entries .values () if e is not None ))), result_id = data .id
269328 )
270329 except asyncio .CancelledError :
271- self ._logger .critical ("canceled _text_document_diagnostic" )
330+ self ._logger .debug ("canceled _text_document_diagnostic" )
272331 raise
273332
333+ def get_diagnostics_data (self , document : TextDocument ) -> DiagnosticsData :
334+ data : DiagnosticsData = document .get_data (self , None )
335+
336+ if data is None :
337+ data = DiagnosticsData (str (uuid .uuid4 ()))
338+ document .set_data (self , data )
339+
340+ return data
341+
274342 @rpc_method (name = "workspace/diagnostic" , param_type = WorkspaceDiagnosticParams )
275343 @threaded ()
276344 async def _workspace_diagnostic (
0 commit comments