Skip to content

Commit 5fe5da7

Browse files
authored
Merge pull request #25 from clojure-lsp/process-requests-in-parallel
Process requests in parallel
2 parents 31aee83 + d762cc6 commit 5fe5da7

File tree

6 files changed

+217
-118
lines changed

6 files changed

+217
-118
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
## Unreleased
44

5+
- Allow language servers to process and respond to requests in parallel.
6+
57
## v1.2.2
68

79
## v1.2.1

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ These `defmethod`s receive 3 arguments, the method name, a "context", and the `p
4747
(conform-or-log ::coercer/location)))
4848
```
4949

50+
Notifications will block other requests and notifications. That is, lsp4clj won't read the next request or notification sent by a client until the language server returns from `lsp4clj.server/receive-notification`. By default, requests will block other messages too. That is, if a language server wants a request to block others, it should calculate and return the response in `lsp4clj.server/receive-request`. Otherwise, to allow the response to be calculated in parallel with others, it should return a `java.util.concurrent.CompletableFuture`, possibly created with `promesa.core/future`.
51+
5052
The return value of requests will be converted to camelCase json and returned to the client. If the return value looks like `{:error ...}`, it is assumed to indicate an error response, and the `...` part will be set as the `error` of a [JSON-RPC error object](https://www.jsonrpc.org/specification#error_object). It is up to you to conform the `...` object (by giving it a `code`, `message`, and `data`.) Otherwise, the entire return value will be set as the `result` of a [JSON-RPC response object](https://www.jsonrpc.org/specification#response_object). (Message ids are handled internally by lsp4clj.)
5153

5254
### Send messages

deps.edn

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
{:deps {org.clojure/clojure {:mvn/version "1.11.1"}
22
org.clojure/core.async {:mvn/version "1.5.648"}
33
camel-snake-kebab/camel-snake-kebab {:mvn/version "0.4.3"}
4-
cheshire/cheshire {:mvn/version "5.11.0"} }
4+
cheshire/cheshire {:mvn/version "5.11.0"}
5+
funcool/promesa {:mvn/version "8.0.450"}}
56
:paths ["src" "resources"]
67
:aliases {:test {:extra-deps {lambdaisland/kaocha {:mvn/version "1.64.1010"}}
78
:extra-paths ["test"]

src/lsp4clj/server.clj

Lines changed: 97 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
[lsp4clj.lsp.requests :as lsp.requests]
88
[lsp4clj.lsp.responses :as lsp.responses]
99
[lsp4clj.protocols.endpoint :as protocols.endpoint]
10-
[lsp4clj.trace :as trace]))
10+
[lsp4clj.trace :as trace]
11+
[promesa.core :as p])
12+
(:import (java.util.concurrent CancellationException)))
1113

1214
(set! *warn-on-reflection* true)
1315

@@ -102,65 +104,26 @@
102104
(format "%s: %s (%s)" description message code)))
103105

104106
(defn ^:private log-error-receiving [server e message]
105-
(protocols.endpoint/log server :error e
106-
(str (format-error-code "Error receiving message" :internal-error) "\n"
107-
(select-keys message [:id :method]))))
107+
(let [message-details (select-keys message [:id :method])
108+
log-title (format-error-code "Error receiving message" :internal-error)]
109+
(protocols.endpoint/log server :error e (str log-title "\n" message-details))))
108110

109-
(defn ^:private start-pipeline [input-ch output-ch server context]
110-
;; Fork the input off to two streams of work, the input initiated by the
111-
;; client (the client's requests and notifications) and the input initiated by
112-
;; the server (the client's responses). Process each stream one message at a
113-
;; time, but independently. The streams must be processed indepedently so that
114-
;; while receiving a request, the server can send a request and receive the
115-
;; response before sending its response to the original request. This happens,
116-
;; for example, when servers send showMessageRequest while processing a
117-
;; request they have received.
118-
(let [server-initiated-ch (async/chan 1)
119-
client-initiated-ch (async/chan 1)
120-
pipeline
121-
(async/go-loop []
122-
(if-let [message (async/<! input-ch)]
123-
(let [message-type (coercer/input-message-type message)]
124-
(case message-type
125-
(:parse-error :invalid-request)
126-
(protocols.endpoint/log server :error (format-error-code "Error reading message" message-type))
127-
(:request :notification)
128-
(async/>! client-initiated-ch [message-type message])
129-
(:response.result :response.error)
130-
(async/>! server-initiated-ch message))
131-
(recur))
132-
(do
133-
(async/close! client-initiated-ch)
134-
(async/close! server-initiated-ch)
135-
(async/close! output-ch))))]
136-
;; a thread so server can use >!! and so that we can use (>!! output-ch) to
137-
;; respect back pressure from clients that are slow to read.
138-
(async/thread
111+
(defn ^:private receive-message
112+
[server context message]
113+
(let [message-type (coercer/input-message-type message)]
114+
(try
139115
(discarding-stdout
140-
(loop []
141-
(when-let [[message-type message] (async/<!! client-initiated-ch)]
142-
;; TODO: return error until initialize response is sent?
143-
;; https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#initialize
144-
(case message-type
145-
:request
146-
(async/>!! output-ch
147-
(try
148-
(protocols.endpoint/receive-request server context message)
149-
(catch Throwable e
150-
(log-error-receiving server e message)
151-
(-> (lsp.responses/response (:id message))
152-
(lsp.responses/error (lsp.errors/internal-error (select-keys message [:id :method])))))))
153-
:notification
154-
(try
155-
(protocols.endpoint/receive-notification server context message)
156-
(catch Throwable e
157-
(log-error-receiving server e message))))
158-
(recur)))))
159-
(async/go-loop []
160-
(when-let [message (async/<! server-initiated-ch)]
161-
(protocols.endpoint/receive-response server message)
162-
(recur)))
163-
pipeline))
116+
(case message-type
117+
(:parse-error :invalid-request)
118+
(protocols.endpoint/log server :error (format-error-code "Error reading message" message-type))
119+
:request
120+
(protocols.endpoint/receive-request server context message)
121+
(:response.result :response.error)
122+
(protocols.endpoint/receive-response server message)
123+
:notification
124+
(protocols.endpoint/receive-notification server context message)))
125+
(catch Throwable e ;; exceptions thrown by receive-response or receive-notification (receive-request catches its own exceptions)
126+
(log-error-receiving server e message)))))
164127

165128
;; Expose endpoint methods to language servers
166129

@@ -178,23 +141,46 @@
178141

179142
(defmethod receive-request :default [_method _context _params] ::method-not-found)
180143
(defmethod receive-notification :default [_method _context _params] ::method-not-found)
181-
;; Servers can't implement cancellation of inbound requests themselves, because
182-
;; lsp4clj manages request ids. Until lsp4clj adds support, ignore cancellation
183-
;; requests.
184-
(defmethod receive-notification "$/cancelRequest" [_ _ _])
185144

145+
(defn ^:private internal-error-response [resp req]
146+
(let [error-body (lsp.errors/internal-error (select-keys req [:id :method]))]
147+
(lsp.responses/error resp error-body)))
148+
149+
(defn ^:private cancellation-response [resp req]
150+
(let [message-details (select-keys req [:id :method])
151+
error-body (lsp.errors/body :request-cancelled
152+
(format "The request %s has been cancelled."
153+
(pr-str message-details))
154+
message-details)]
155+
(lsp.responses/error resp error-body)))
156+
157+
;; TODO: https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#initialize
158+
;; * receive-request should return error until initialize request is received
159+
;; * receive-notification should drop until initialize request is received, with the exception of exit
160+
;; * send-request should do nothing until initialize response is sent, with the exception of window/showMessageRequest
161+
;; * send-notification should do nothing until initialize response is sent, with the exception of window/showMessage, window/logMessage, telemetry/event, and $/progress
186162
(defrecord ChanServer [input-ch
187163
output-ch
188164
trace-ch
189165
log-ch
190166
^java.time.Clock clock
191167
on-close
192168
request-id*
193-
pending-requests*
169+
pending-sent-requests*
170+
pending-received-requests*
194171
join]
195172
protocols.endpoint/IEndpoint
196173
(start [this context]
197-
(let [pipeline (start-pipeline input-ch output-ch this context)]
174+
(let [;; a thread so language server can use >!! and so that receive-message
175+
;; can use (>!! output-ch) to respect back pressure from clients that
176+
;; are slow to read.
177+
pipeline (async/thread
178+
(loop []
179+
(if-let [message (async/<!! input-ch)]
180+
(do
181+
(receive-message this context message)
182+
(recur))
183+
(async/close! output-ch))))]
198184
(async/go
199185
;; Wait for pipeline to close. This indicates input-ch was closed and
200186
;; that now output-ch is closed.
@@ -224,7 +210,7 @@
224210
(some-> trace-ch (async/put! (trace/sending-request req now)))
225211
;; Important: record request before sending it, so it is sure to be
226212
;; available during receive-response.
227-
(swap! pending-requests* assoc id pending-request)
213+
(swap! pending-sent-requests* assoc id pending-request)
228214
;; respect back pressure from clients that are slow to read; (go (>!)) will not suffice
229215
(async/>!! output-ch req)
230216
pending-request))
@@ -236,30 +222,58 @@
236222
(async/>!! output-ch notif)))
237223
(receive-response [_this {:keys [id error result] :as resp}]
238224
(let [now (.instant clock)
239-
[pending-requests _] (swap-vals! pending-requests* dissoc id)]
225+
[pending-requests _] (swap-vals! pending-sent-requests* dissoc id)]
240226
(if-let [{:keys [p started] :as req} (get pending-requests id)]
241227
(do
242228
(some-> trace-ch (async/put! (trace/received-response req resp started now)))
243229
(deliver p (if error resp result)))
244230
(some-> trace-ch (async/put! (trace/received-unmatched-response resp now))))))
245231
(receive-request [this context {:keys [id method params] :as req}]
246-
(let [started (.instant clock)]
247-
(some-> trace-ch (async/put! (trace/received-request req started)))
248-
(let [result (receive-request method context params)
249-
resp (lsp.responses/response id)
250-
resp (if (identical? ::method-not-found result)
232+
(let [started (.instant clock)
233+
resp (lsp.responses/response id)]
234+
(try
235+
(some-> trace-ch (async/put! (trace/received-request req started)))
236+
;; coerce result/error to promise
237+
(let [result-promise (p/promise (receive-request method context params))]
238+
(swap! pending-received-requests* assoc id result-promise)
239+
(-> result-promise
240+
;; convert result/error to response
241+
(p/then
242+
(fn [result]
243+
(if (identical? ::method-not-found result)
244+
(do
245+
(protocols.endpoint/log this :warn "received unexpected request" method)
246+
(lsp.responses/error resp (lsp.errors/not-found method)))
247+
(lsp.responses/infer resp result))))
248+
;; Handle
249+
;; 1. Exceptions thrown within p/future created by receive-request.
250+
;; 2. Cancelled requests.
251+
(p/catch
252+
(fn [e]
253+
(if (instance? CancellationException e)
254+
(cancellation-response resp req)
251255
(do
252-
(protocols.endpoint/log this :warn "received unexpected request" method)
253-
(lsp.responses/error resp (lsp.errors/not-found method)))
254-
(lsp.responses/infer resp result))
255-
finished (.instant clock)]
256-
(some-> trace-ch (async/put! (trace/sending-response req resp started finished)))
257-
resp)))
256+
(log-error-receiving this e req)
257+
(internal-error-response resp req)))))
258+
(p/finally
259+
(fn [resp _error]
260+
(swap! pending-received-requests* dissoc id)
261+
(some-> trace-ch (async/put! (trace/sending-response req resp started (.instant clock))))
262+
(async/>!! output-ch resp)))))
263+
(catch Throwable e ;; exceptions thrown by receive-request
264+
(log-error-receiving this e req)
265+
(async/>!! output-ch (internal-error-response resp req))))))
258266
(receive-notification [this context {:keys [method params] :as notif}]
259-
(some-> trace-ch (async/put! (trace/received-notification notif (.instant clock))))
260-
(let [result (receive-notification method context params)]
261-
(when (identical? ::method-not-found result)
262-
(protocols.endpoint/log this :warn "received unexpected notification" method)))))
267+
(let [now (.instant clock)]
268+
(if (= method "$/cancelRequest")
269+
(if-let [result-promise (get @pending-received-requests* (:id params))]
270+
(p/cancel! result-promise)
271+
(some-> trace-ch (async/put! (trace/received-unmatched-cancellation-notification notif now))))
272+
(do
273+
(some-> trace-ch (async/put! (trace/received-notification notif now)))
274+
(let [result (receive-notification method context params)]
275+
(when (identical? ::method-not-found result)
276+
(protocols.endpoint/log this :warn "received unexpected notification" method))))))))
263277

264278
(defn chan-server
265279
[{:keys [output-ch input-ch log-ch trace? trace-ch clock on-close]
@@ -273,5 +287,6 @@
273287
:clock clock
274288
:on-close on-close
275289
:request-id* (atom 0)
276-
:pending-requests* (atom {})
290+
:pending-sent-requests* (atom {})
291+
:pending-received-requests* (atom {})
277292
:join (promise)}))

src/lsp4clj/trace.clj

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,24 +25,24 @@
2525
(format-body "Error data" (:data error))
2626
(format-body "Result" result)))
2727

28-
(defn ^:private format-trace [at action message-type header-details body]
28+
(defn ^:private format-trace [at direction message-type header-details body]
2929
[:debug
30-
(str (format-tag at) " " action " " message-type " " header-details "\n"
30+
(str (format-tag at) " " direction " " message-type " " header-details "\n"
3131
body "\n\n\n")])
3232

3333
(defn ^:private latency [^java.time.Instant started ^java.time.Instant finished]
3434
(format "%sms" (- (.toEpochMilli finished) (.toEpochMilli started))))
3535

36-
(defn ^:private format-notification [action notif at]
37-
(format-trace at action "notification" (format-notification-signature notif)
36+
(defn ^:private format-notification [direction notif at]
37+
(format-trace at direction "notification" (format-notification-signature notif)
3838
(format-params notif)))
3939

40-
(defn ^:private format-request [action req at]
41-
(format-trace at action "request" (format-request-signature req)
40+
(defn ^:private format-request [direction req at]
41+
(format-trace at direction "request" (format-request-signature req)
4242
(format-params req)))
4343

44-
(defn ^:private format-response [action req {:keys [error] :as resp} started finished]
45-
(format-trace finished action "response"
44+
(defn ^:private format-response [direction req {:keys [error] :as resp} started finished]
45+
(format-trace finished direction "response"
4646
(format
4747
(str "%s. Request took %s." (when error " Request failed: %s (%s)."))
4848
(format-request-signature req)
@@ -58,6 +58,10 @@
5858
(format-trace at "Received" "response" "for unmatched request:"
5959
(format-body "Body" resp)))
6060

61+
(defn received-unmatched-cancellation-notification [notif at]
62+
(format-trace at "Received" "cancellation notification" "for unmatched request:"
63+
(format-params notif)))
64+
6165
(defn sending-notification [notif at] (format-notification "Sending" notif at))
6266
(defn sending-request [req at] (format-request "Sending" req at))
6367
(defn sending-response [req resp started finished] (format-response "Sending" req resp started finished))

0 commit comments

Comments
 (0)