|
116 | 116 | (recur))))) |
117 | 117 | ch)) |
118 | 118 |
|
119 | | -(defn ^:private dispatch-input |
120 | | - "Dispatches messages received on the input-ch based on message type. Returns a |
121 | | - channel which will close after the input-ch is closed." |
122 | | - [server context input-ch] |
123 | | - (let [;; In order to process some requests and (all) notifications in series, |
124 | | - ;; the language server sometimes needs to block client-initiated input. |
125 | | - ;; If the language server sends requests during that time, it needs to |
126 | | - ;; receive responses, even though it's blocking other input. Otherwise, |
127 | | - ;; it will end up in a deadlock, where it's waiting to receive a |
128 | | - ;; response off the input-ch and the input-ch isn't being read from |
129 | | - ;; because the server is blocking input. See |
130 | | - ;; https://github.com/clojure-lsp/clojure-lsp/issues/1500. |
131 | | - |
132 | | - ;; The messages all arrive in order on the input-ch so to get to the |
133 | | - ;; client's response, we have to queue whatever other messages it's |
134 | | - ;; sent. We do that by storing them in a sliding buffer. Because of the |
135 | | - ;; sliding buffer: |
136 | | - ;; * if the client sends a message which causes the language server to |
137 | | - ;; block, and |
138 | | - ;; * if the language server sends a request during that time, and |
139 | | - ;; * if the client sends more than 100 other messages between when the |
140 | | - ;; language server started blocking and when the client responds to |
141 | | - ;; the language server's request, |
142 | | - ;; * then the client's earliest messages will be dropped. |
143 | | - ;; The same is true in reverse. |
144 | | - |
145 | | - ;; We process the client- and language-server-initiated messages in |
146 | | - ;; separate threads. |
147 | | - ;; * Threads, so the language server can use >!! and so that we can use |
148 | | - ;; (>!! output-ch) to respect back pressure from clients that are slow |
149 | | - ;; to read. |
150 | | - ;; * Separate, so one can continue while the other is blocked. |
151 | | - |
152 | | - ;; (Jacob Maine): 100 is picked out of thin air. I have no idea how to |
153 | | - ;; estimate how big the buffer should be to avoid dropping messages. LSP |
154 | | - ;; communication tends to be very quiet, then very chatty, so it depends |
155 | | - ;; a lot on what the client and server are doing. I also don't know how |
156 | | - ;; many messages we could store without running into memory problems, |
157 | | - ;; since this is dependent on so many variables, not just the size of |
158 | | - ;; the JVM's memory, but also the size of the messages, which can be |
159 | | - ;; anywhere from a few bytes to megabytes. |
160 | | - server-initiated-in-ch (thread-loop |
161 | | - (async/sliding-buffer 100) |
162 | | - (fn [response] |
163 | | - (protocols.endpoint/receive-response server response))) |
164 | | - client-initiated-in-ch (thread-loop |
165 | | - (async/sliding-buffer 100) |
166 | | - (fn [[message-type message]] |
167 | | - (if (identical? :request message-type) |
168 | | - (protocols.endpoint/receive-request server context message) |
169 | | - (protocols.endpoint/receive-notification server context message))))] |
170 | | - (async/go-loop [] |
171 | | - (if-let [message (async/<! input-ch)] |
172 | | - (let [message-type (coercer/input-message-type message)] |
173 | | - (case message-type |
174 | | - (:parse-error :invalid-request) |
175 | | - (protocols.endpoint/log server :error (format-error-code "Error reading message" message-type)) |
176 | | - (:response.result :response.error) |
177 | | - (async/>! server-initiated-in-ch message) |
178 | | - (:request :notification) |
179 | | - (async/>! client-initiated-in-ch [message-type message])) |
180 | | - (recur)) |
181 | | - (do |
182 | | - (async/close! server-initiated-in-ch) |
183 | | - (async/close! client-initiated-in-ch)))))) |
| 119 | +(def input-buffer-size |
| 120 | + ;; (Jacob Maine): This number is picked out of thin air. I have no idea how to |
| 121 | + ;; estimate how big the buffer could or should be. LSP communication tends to |
| 122 | + ;; be very quiet, then very chatty, so it depends a lot on what the client and |
| 123 | + ;; server are doing. I also don't know how many messages we could store |
| 124 | + ;; without running into memory problems, since this is dependent on so many |
| 125 | + ;; variables, not just the size of the JVM's memory, but also the size of the |
| 126 | + ;; messages, which can be anywhere from a few bytes to several megabytes. |
| 127 | + 1024) |
184 | 128 |
|
185 | 129 | ;; Expose endpoint methods to language servers |
186 | 130 |
|
|
253 | 197 | protocols.endpoint/IEndpoint |
254 | 198 | (start [this context] |
255 | 199 | ;; Start receiving messages. |
256 | | - (let [pipeline (dispatch-input this context input-ch)] |
| 200 | + (let [;; In order to process some requests and (all) notifications in |
| 201 | + ;; series, the language server sometimes needs to block |
| 202 | + ;; client-initiated input. If the language server sends requests |
| 203 | + ;; during that time, it needs to receive responses, even though it's |
| 204 | + ;; blocking other input. Otherwise, it will end up in a deadlock, |
| 205 | + ;; where it's waiting to receive a response off the input-ch and the |
| 206 | + ;; input-ch isn't being read from because the server is blocking |
| 207 | + ;; input. See https://github.com/clojure-lsp/clojure-lsp/issues/1500. |
| 208 | + |
| 209 | + ;; To avoid this problem we processes client-initiated input (client |
| 210 | + ;; requests and notifications) and server-initiated input (client |
| 211 | + ;; responses) separately. |
| 212 | + |
| 213 | + ;; If the server starts blocking waiting for a response, we buffer the |
| 214 | + ;; client's requests and notifications until the server is prepared to |
| 215 | + ;; process them. |
| 216 | + |
| 217 | + ;; However, if too many client requests and notifications arrive |
| 218 | + ;; before the response, the buffer fills up. In this situation, we |
| 219 | + ;; abort all the server's pending requests, hoping that this will |
| 220 | + ;; allow it to process the client's other messages. We do this to |
| 221 | + ;; prioritize the client's messages over the server's. |
| 222 | + |
| 223 | + ;; The situation is different in reverse. The client can also start |
| 224 | + ;; blocking waiting for a server response. In the meantime, the server |
| 225 | + ;; can send lots of messages. But this is bad behavior on the server's |
| 226 | + ;; part. So in this scenario we drop the server's earliest requests |
| 227 | + ;; and notifications. |
| 228 | + |
| 229 | + ;; We do that by storing them in a sliding buffer. Because of the |
| 230 | + ;; sliding buffer: |
| 231 | + ;; * if the language server sends a message, and |
| 232 | + ;; * if while processing that message the client sends a request which |
| 233 | + ;; causes it to block, and |
| 234 | + ;; * if the language server sends too many other messages between when |
| 235 | + ;; the client started blocking and when the language server responds |
| 236 | + ;; to the client's request, |
| 237 | + ;; * then the language server's messages will start being dropped, |
| 238 | + ;; starting from the earliest. |
| 239 | + |
| 240 | + ;; We process the client- and language-server-initiated messages in |
| 241 | + ;; separate threads. |
| 242 | + ;; * Threads, so the language server can use >!! and so that we can use |
| 243 | + ;; (>!! output-ch) to respect back pressure from clients that are slow |
| 244 | + ;; to read. |
| 245 | + ;; * Separate, so one can continue while the other is blocked. |
| 246 | + server-initiated-in-ch (thread-loop |
| 247 | + (async/sliding-buffer input-buffer-size) |
| 248 | + (fn [response] |
| 249 | + (protocols.endpoint/receive-response this response))) |
| 250 | + client-initiated-in-ch (thread-loop |
| 251 | + input-buffer-size |
| 252 | + (fn [[message-type message]] |
| 253 | + (if (identical? :request message-type) |
| 254 | + (protocols.endpoint/receive-request this context message) |
| 255 | + (protocols.endpoint/receive-notification this context message)))) |
| 256 | + reject-pending-sent-requests (fn [exception] |
| 257 | + (doseq [pending-request (vals @pending-sent-requests*)] |
| 258 | + (p/reject! (:p pending-request) |
| 259 | + exception))) |
| 260 | + pipeline (async/go-loop [] |
| 261 | + (if-let [message (async/<! input-ch)] |
| 262 | + (let [message-type (coercer/input-message-type message)] |
| 263 | + (case message-type |
| 264 | + (:parse-error :invalid-request) |
| 265 | + (protocols.endpoint/log this :error (format-error-code "Error reading message" message-type)) |
| 266 | + (:response.result :response.error) |
| 267 | + (async/>! server-initiated-in-ch message) |
| 268 | + (:request :notification) |
| 269 | + (when-not (async/offer! client-initiated-in-ch [message-type message]) |
| 270 | + ;; Buffers full. Fail any waiting pending requests and... |
| 271 | + (reject-pending-sent-requests |
| 272 | + (ex-info "Buffer of client messages exhausted." {})) |
| 273 | + ;; ... try again, but park this time, to respect |
| 274 | + ;; back pressure from the client. |
| 275 | + (async/>! client-initiated-in-ch [message-type message]))) |
| 276 | + (recur)) |
| 277 | + (do |
| 278 | + (async/close! server-initiated-in-ch) |
| 279 | + (async/close! client-initiated-in-ch))))] |
257 | 280 | ;; Wait to stop receiving messages. |
258 | 281 | (async/go |
259 | 282 | ;; When pipeline closes, it indicates input-ch has closed. We're done |
260 | 283 | ;; receiving. |
261 | 284 | (async/<! pipeline) |
262 | 285 | ;; Do cleanup. |
263 | 286 |
|
| 287 | + (reject-pending-sent-requests (ex-info "Server shutting down." {})) |
| 288 | + |
264 | 289 | ;; The [docs](https://clojuredocs.org/clojure.core.async/close!) for |
265 | 290 | ;; `close!` say A) "The channel will no longer accept any puts", B) |
266 | 291 | ;; "Data in the channel remains available for taking", and C) "Logically |
|
0 commit comments