@@ -40,7 +40,34 @@ core.async. If not supplied the ExecutorService for :io will be
4040used instead.
4141
4242The set of contexts may grow in the future so the function should
43- return nil for unexpected contexts."
43+ return nil for unexpected contexts.
44+
45+ Use the Java system property `clojure.core.async.vthreads` to control
46+ how core.async uses JDK 21+ virtual threads. The property can be one of
47+ the following values:
48+
49+ unset - core.async will opportunistically use vthreads when available
50+ (≥ Java 21) and will otherwise use the old IOC impl. io-thread and :io
51+ thread pool will run on platform threads if vthreads are not available.
52+ If AOT compiling, go blocks will always use IOC so that the resulting
53+ bytecode works on all JVMs (so no change in compiled output)
54+
55+ \" target\" - means that you are targeting virtual threads. At runtime
56+ from source, go blocks will throw if vthreads are not available.
57+ If AOT compiling, go blocks are always compiled as normal Clojure
58+ code to be run on vthreads and will throw at runtime if vthreads are
59+ not available (Java <21)
60+
61+ \" avoid\" - means that vthreads will not be used by core.async - you can
62+ use this to minimize impacts if you are not yet ready to utilize vthreads
63+ in your app. If AOT compiling, go blocks will use IOC. At runtime, io-thread
64+ and the :io thread pool use platform threads
65+
66+ Note: existing IOC compiled go blocks from older core.async versions continue
67+ to work (we retain and load the IOC state machine runtime - this does not
68+ require the analyzer), and you can interact with the same channels from both
69+ IOC and vthread code.
70+ "
4471 (:refer-clojure :exclude [reduce transduce into merge map take partition
4572 partition-by bounded-count])
4673 (:require [clojure.core.async.impl.protocols :as impl]
@@ -49,14 +76,18 @@ return nil for unexpected contexts."
4976 [clojure.core.async.impl.timers :as timers]
5077 [clojure.core.async.impl.dispatch :as dispatch]
5178 [clojure.core.async.impl.ioc-macros :as ioc]
52- clojure.core.async.impl.go ; ; TODO: make conditional
5379 [clojure.core.async.impl.mutex :as mutex]
5480 )
5581 (:import [java.util.concurrent.atomic AtomicLong]
5682 [java.util.concurrent.locks Lock]
5783 [java.util.concurrent ThreadLocalRandom]
5884 [java.util Arrays ArrayList]))
5985
86+ (def ^:private lazy-loading-supported? (dispatch/at-least-clojure-version? [1 12 3 ]))
87+
88+ (when-not lazy-loading-supported?
89+ (require 'clojure.core.async.impl.go))
90+
6091(alias 'core 'clojure.core)
6192
6293(set! *warn-on-reflection* false )
@@ -138,6 +169,21 @@ return nil for unexpected contexts."
138169 [^long msecs]
139170 (timers/timeout msecs))
140171
172+ (defn- defparkingop* [op doc arglist]
173+ (let [as (mapv #(list 'quote %) arglist)
174+ blockingop (-> op name (str " !" ) symbol)]
175+ `(def ~(with-meta op {:arglists `(list ~as) :doc doc})
176+ (fn [~'& ~'args]
177+ (if (dispatch/in-vthread? )
178+ ~(list* apply blockingop '[args])
179+ (assert nil ~(str op " used not in (go ...) block" )))))))
180+
181+ (defmacro defparkingop
182+ " Emits a Var with a function that checks if it's running in a virtual thread. If so then
183+ the related blocking op will be called, otherwise the function throws."
184+ [op doc arglist]
185+ (defparkingop* op doc arglist ))
186+
141187(defmacro defblockingop
142188 [op doc arglist & body]
143189 (let [as (mapv #(list 'quote %) arglist)]
@@ -162,11 +208,11 @@ return nil for unexpected contexts."
162208 @ret
163209 (deref p))))
164210
165- (defn <!
166- " takes a val from port. Must be called inside a (go ...) block. Will
167- return nil if closed. Will park if nothing is available. "
168- [port]
169- ( assert nil " <! used not in (go ...) block " ) )
211+ (defparkingop <!
212+ " takes a val from port. Must be called inside a (go ...) block, or on
213+ a virtual thread (no matter how it was started). Will return nil if
214+ closed. Will park if nothing is available. "
215+ [port] )
170216
171217(defn take!
172218 " Asynchronously takes a val from port, passing to fn1. Will pass nil
@@ -201,12 +247,12 @@ return nil for unexpected contexts."
201247 @ret
202248 (deref p))))
203249
204- (defn >!
250+ (defparkingop >!
205251 " puts a val into port. nil values are not allowed. Must be called
206- inside a (go ...) block. Will park if no buffer space is available.
252+ inside a (go ...) block, or on a virtual thread (no matter how it
253+ was started). Will park if no buffer space is available.
207254 Returns true unless port is already closed."
208- [port val]
209- (assert nil " >! used not in (go ...) block" ))
255+ [port val])
210256
211257(defn- nop [_])
212258(def ^:private fhnop (fn-handler nop))
@@ -344,16 +390,16 @@ return nil for unexpected contexts."
344390 @ret
345391 (deref p))))
346392
347- (defn alts!
393+ (defparkingop alts!
348394 " Completes at most one of several channel operations. Must be called
349- inside a (go ...) block. ports is a vector of channel endpoints,
350- which can be either a channel to take from or a vector of
351- [channel-to-put-to val-to-put], in any combination. Takes will be
352- made as if by <!, and puts will be made as if by >!. Unless
353- the :priority option is true, if more than one port operation is
354- ready a non-deterministic choice will be made. If no operation is
355- ready and a :default value is supplied, [default-val :default] will
356- be returned, otherwise alts! will park until the first operation to
395+ inside a (go ...) block, or on a virtual thread (no matter how it was
396+ started). ports is a vector of channel endpoints, which can be either
397+ a channel to take from or a vector of [channel-to-put-to val-to-put],
398+ in any combination. Takes will be made as if by <!, and puts will be
399+ made as if by >!. Unless the :priority option is true, if more than one
400+ port operation is ready a non-deterministic choice will be made. If no
401+ operation is ready and a :default value is supplied, [default-val :default]
402+ will be returned, otherwise alts! will park until the first operation to
357403 become ready completes. Returns [val port] of the completed
358404 operation, where val is the value taken for takes, and a
359405 boolean (true unless already closed, as per put!) for puts.
@@ -367,8 +413,7 @@ return nil for unexpected contexts."
367413 used, nor in what order should they be, so they should not be
368414 depended upon for side effects."
369415
370- [ports & {:as opts}]
371- (assert nil " alts! used not in (go ...) block" ))
416+ [ports & {:as opts}])
372417
373418(defn do-alt [alts clauses]
374419 (assert (even? (count clauses)) " unbalanced clauses" )
@@ -471,6 +516,22 @@ return nil for unexpected contexts."
471516 (let [ret (impl/take! port (fn-handler nop false ))]
472517 (when ret @ret)))
473518
519+ (defn- go* [body env]
520+ (cond (and (not dispatch/virtual-threads-available?)
521+ dispatch/target-vthreads?
522+ (not clojure.core/*compile-files*))
523+ (dispatch/report-vthreads-not-available-error! )
524+
525+ (or dispatch/target-vthreads?
526+ (and dispatch/unset-vthreads?
527+ dispatch/virtual-threads-available?
528+ (not clojure.core/*compile-files*)))
529+ `(do (dispatch/ensure-runtime-vthreads! )
530+ (thread-call (^:once fn* [] ~@body) :io ))
531+
532+ :else
533+ ((requiring-resolve 'clojure.core.async.impl.go/go-impl) env body)))
534+
474535(defmacro go
475536 " Asynchronously executes the body, returning immediately to the
476537 calling thread. Additionally, any visible calls to <!, >! and alt!/alts!
@@ -487,7 +548,7 @@ return nil for unexpected contexts."
487548 Returns a channel which will receive the result of the body when
488549 completed"
489550 [& body]
490- (#'clojure.core.async.impl.go/go-impl &env body ))
551+ (go* body &env))
491552
492553(defonce ^:private thread-macro-executor nil )
493554
0 commit comments