|
112 | 112 | log-title (format-error-code "Error receiving message" :internal-error)] |
113 | 113 | (protocols.endpoint/log server :error e (str log-title "\n" message-details)))) |
114 | 114 |
|
115 | | -(defn ^:private spawn-receipt-thread [buf-or-n f] |
116 | | - (let [receipt-ch (async/chan buf-or-n)] |
| 115 | +(defn thread-loop [buf-or-n f] |
| 116 | + (let [ch (async/chan buf-or-n)] |
117 | 117 | (async/thread |
118 | 118 | (discarding-stdout |
119 | 119 | (loop [] |
120 | | - (when-let [[message-type message] (async/<!! receipt-ch)] |
121 | | - (f message-type message) |
| 120 | + (when-let [arg (async/<!! ch)] |
| 121 | + (f arg) |
122 | 122 | (recur))))) |
123 | | - receipt-ch)) |
| 123 | + ch)) |
124 | 124 |
|
125 | | -(defn ^:private run-pipeline |
126 | | - "Forwards messages received on the input-ch to the language server, for |
127 | | - further processing." |
| 125 | +(defn ^:private dispatch-input |
| 126 | + "Dispatches messages received on the input-ch based on message type. Returns a |
| 127 | + channel which will close after the input-ch is closed." |
128 | 128 | [server context input-ch] |
129 | 129 | (let [;; In order to process some requests and (all) notifications in series, |
130 | 130 | ;; the language server sometimes needs to block client-initiated input. |
|
154 | 154 | ;; (>!! output-ch) to respect back pressure from clients that are slow |
155 | 155 | ;; to read. |
156 | 156 | ;; * Separate, so one can continue while the other is blocked. |
157 | | - server-initiated-in-ch (spawn-receipt-thread |
| 157 | + |
| 158 | + ;; (Jacob Maine): 100 is picked out of thin air. I have no idea how to |
| 159 | + ;; estimate how big the buffer should be to avoid dropping messages. LSP |
| 160 | + ;; communication tends to be very quiet, then very chatty, so it depends |
| 161 | + ;; a lot on what the client and server are doing. I also don't know how |
| 162 | + ;; many messages we could store without running into memory problems, |
| 163 | + ;; since this is dependent on so many variables, not just the size of |
| 164 | + ;; the JVM's memory, but also the size of the messages, which can be |
| 165 | + ;; anywhere from a few bytes to megabytes. |
| 166 | + server-initiated-in-ch (thread-loop |
158 | 167 | (async/sliding-buffer 100) |
159 | | - (fn [_ message] |
160 | | - (try |
161 | | - (protocols.endpoint/receive-response server message) |
162 | | - (catch Throwable e |
163 | | - (log-error-receiving server e message))))) |
164 | | - client-initiated-in-ch (spawn-receipt-thread |
| 168 | + (fn [response] |
| 169 | + (protocols.endpoint/receive-response server response))) |
| 170 | + client-initiated-in-ch (thread-loop |
165 | 171 | (async/sliding-buffer 100) |
166 | | - (fn [message-type message] |
| 172 | + (fn [[message-type message]] |
167 | 173 | (if (identical? :request message-type) |
168 | | - ;; receive-request catches its own exceptions |
169 | 174 | (protocols.endpoint/receive-request server context message) |
170 | | - (try |
171 | | - (protocols.endpoint/receive-notification server context message) |
172 | | - (catch Throwable e |
173 | | - (log-error-receiving server e message))))))] |
| 175 | + (protocols.endpoint/receive-notification server context message))))] |
174 | 176 | (async/go-loop [] |
175 | 177 | (if-let [message (async/<! input-ch)] |
176 | 178 | (let [message-type (coercer/input-message-type message)] |
177 | 179 | (case message-type |
178 | 180 | (:parse-error :invalid-request) |
179 | 181 | (protocols.endpoint/log server :error (format-error-code "Error reading message" message-type)) |
180 | 182 | (:response.result :response.error) |
181 | | - (async/>! server-initiated-in-ch [:response message]) |
| 183 | + (async/>! server-initiated-in-ch message) |
182 | 184 | (:request :notification) |
183 | 185 | (async/>! client-initiated-in-ch [message-type message])) |
184 | 186 | (recur)) |
|
257 | 259 | protocols.endpoint/IEndpoint |
258 | 260 | (start [this context] |
259 | 261 | ;; Start receiving messages. |
260 | | - (let [pipeline (run-pipeline this context input-ch)] |
| 262 | + (let [pipeline (dispatch-input this context input-ch)] |
261 | 263 | ;; Wait to stop receiving messages. |
262 | 264 | (async/go |
263 | 265 | ;; When pipeline closes, it indicates input-ch has closed. We're done |
264 | 266 | ;; receiving. |
265 | 267 | (async/<! pipeline) |
266 | 268 | ;; Do cleanup. |
267 | | - ;; TODO: do we really know that we've finished putting to output-ch? |
| 269 | + |
| 270 | + ;; The [docs](https://clojuredocs.org/clojure.core.async/close!) for |
| 271 | + ;; `close!` say A) "The channel will no longer accept any puts", B) |
| 272 | + ;; "Data in the channel remains available for taking", and C) "Logically |
| 273 | + ;; closing happens after all puts have been delivered." |
| 274 | + |
| 275 | + ;; At this point the input-ch has been closed, which means any messages |
| 276 | + ;; that were read before the channel was closed have been put on the |
| 277 | + ;; channel (C). However, the takes off of it, the takes which then |
| 278 | + ;; forward the messages to the language server, may or may not have |
| 279 | + ;; happened (B). And even if the language server has received some |
| 280 | + ;; messages, if it responds after this line closes the output-ch, the |
| 281 | + ;; responses will be dropped (A). |
| 282 | + |
| 283 | + ;; All that to say, it's possible for the lsp4clj server to drop the |
| 284 | + ;; language server's final few responses. |
| 285 | + |
| 286 | + ;; It doesn't really matter though, because the users of lsp4clj |
| 287 | + ;; typically don't call `shutdown` on the lsp4clj server until they've |
| 288 | + ;; received the `exit` notification, which is the client indicating it |
| 289 | + ;; no longer expects any responses anyway. |
268 | 290 | (async/close! output-ch) |
269 | 291 | (async/close! log-ch) |
270 | 292 | (some-> trace-ch async/close!) |
|
302 | 324 | (async/>!! output-ch notif) |
303 | 325 | nil)) |
304 | 326 | (receive-response [this {:keys [id error result] :as resp}] |
305 | | - (let [now (.instant clock) |
306 | | - [pending-requests _] (swap-vals! pending-sent-requests* dissoc id)] |
307 | | - (if-let [{:keys [p started] :as req} (get pending-requests id)] |
308 | | - (do |
309 | | - (trace this trace/received-response req resp started now) |
310 | | - (deliver p (if error resp result))) |
311 | | - (trace this trace/received-unmatched-response resp now)))) |
| 327 | + (try |
| 328 | + (let [now (.instant clock) |
| 329 | + [pending-requests _] (swap-vals! pending-sent-requests* dissoc id)] |
| 330 | + (if-let [{:keys [p started] :as req} (get pending-requests id)] |
| 331 | + (do |
| 332 | + (trace this trace/received-response req resp started now) |
| 333 | + (deliver p (if error resp result))) |
| 334 | + (trace this trace/received-unmatched-response resp now))) |
| 335 | + (catch Throwable e |
| 336 | + (log-error-receiving this e resp)))) |
312 | 337 | (receive-request [this context {:keys [id method params] :as req}] |
313 | 338 | (let [started (.instant clock) |
314 | 339 | resp (lsp.responses/response id)] |
|
345 | 370 | (log-error-receiving this e req) |
346 | 371 | (async/>!! output-ch (internal-error-response resp req)))))) |
347 | 372 | (receive-notification [this context {:keys [method params] :as notif}] |
348 | | - (let [now (.instant clock)] |
349 | | - (trace this trace/received-notification notif now) |
350 | | - (if (= method "$/cancelRequest") |
351 | | - (if-let [pending-req (get @pending-received-requests* (:id params))] |
352 | | - (p/cancel! pending-req) |
353 | | - (trace this trace/received-unmatched-cancellation-notification notif now)) |
354 | | - (let [result (receive-notification method context params)] |
355 | | - (when (identical? ::method-not-found result) |
356 | | - (protocols.endpoint/log this :warn "received unexpected notification" method))))))) |
| 373 | + (try |
| 374 | + (let [now (.instant clock)] |
| 375 | + (trace this trace/received-notification notif now) |
| 376 | + (if (= method "$/cancelRequest") |
| 377 | + (if-let [pending-req (get @pending-received-requests* (:id params))] |
| 378 | + (p/cancel! pending-req) |
| 379 | + (trace this trace/received-unmatched-cancellation-notification notif now)) |
| 380 | + (let [result (receive-notification method context params)] |
| 381 | + (when (identical? ::method-not-found result) |
| 382 | + (protocols.endpoint/log this :warn "received unexpected notification" method))))) |
| 383 | + (catch Throwable e |
| 384 | + (log-error-receiving this e notif))))) |
357 | 385 |
|
358 | 386 | (defn set-trace-level [server trace-level] |
359 | 387 | (update server :tracer* reset! (trace/tracer-for-level trace-level))) |
|
0 commit comments