|
33 | 33 | details. The flow configuration provides a centralized place for |
34 | 34 | policy decisions regarding process settings, threading, buffering etc. |
35 | 35 |
|
| 36 | + Flow also provides a subsystem for broadcast communication of |
| 37 | + out-of-band messages without explicit connections or |
| 38 | + declarations. This could for example be used to communicate the |
| 39 | + passage of (real or virtual) time. Broadcast messages are associated |
| 40 | + with (otherwise undeclared) signal-ids, and will be received by |
| 41 | + processes selecting those ids. Broadcasts messages will arrive along |
| 42 | + with messages from process inputs, so signal-ids must not conflict |
| 43 | + with any process input-id. Thus namespaced keywords, UUIDs etc or |
| 44 | + tuples thereof are recommended as signal-ids. See process |
| 45 | + describe/transform and inject below for details. |
| 46 | + |
36 | 47 | It is expected that applications will rarely define instances of the |
37 | 48 | process protocol but instead use the API function 'process' that |
38 | 49 | implements the process protocol in terms of calls to ordinary |
|
71 | 82 |
|
72 | 83 | :proc - a function that starts a process |
73 | 84 | :args - a map of param->val which will be passed to the process ctor |
74 | | - :chan-opts - a map of in-or-out-id->{:keys [buf-or-n xform]}, where buf-or-n |
| 85 | + :chan-opts - a map of io-id->{:keys [buf-or-n xform]}, |
| 86 | + where io-id is an input/output name, and buf-or-n |
75 | 87 | and xform have their meanings per core.async/chan |
76 | | - the default is {:buf-or-n 10} |
| 88 | + the default for inputs and outputs is {:buf-or-n 10} |
77 | 89 | |
78 | 90 | :conns - a collection of [[from-pid outid] [to-pid inid]] tuples. |
79 | 91 |
|
80 | 92 | Inputs and outputs support multiple connections. When an output is |
81 | | - connected multiple times every connection will get every message, |
82 | | - as per a core.async/mult. |
| 93 | + connected multiple times every connection will get every message, as |
| 94 | + per a core.async/mult. Note that non-multed outputs do not have |
| 95 | + corresponding channels and thus any chan-opts will be ignored. |
83 | 96 |
|
| 97 | + Broadcast signals are conveyed to a process via a channel with an |
| 98 | + async/sliding-buffer of size 100, thus signals not handled in a |
| 99 | + timely manner will be dropped in favor of later arriving signals. |
| 100 | + |
84 | 101 | :mixed-exec/:io-exec/:compute-exec -> ExecutorService |
85 | 102 | These can be used to specify the ExecutorService to use for the |
86 | 103 | corresonding workload, in lieu of the lib defaults. |
|
136 | 153 | (g/ping-proc g pid timeout-ms)) |
137 | 154 |
|
138 | 155 | (defn inject |
139 | | - "asynchronously puts the messages on the channel corresponding to the |
140 | | - input or output of the process, returning a future that will |
141 | | - complete when done." |
| 156 | + "asynchronously puts the messages on the channel corresponding to |
| 157 | + the input or output of the process, returning a future that will |
| 158 | + complete when done. You can broadcast messages on a signal using the |
| 159 | + special coord [::flow/cast a-signal-id]. Note that signals cannot be |
| 160 | + sent to a particular process." |
142 | 161 | [g [pid io-id :as coord] msgs] (g/inject g coord msgs)) |
143 | 162 |
|
144 | 163 | (defn process |
|
160 | 179 | datafy. |
161 | 180 |
|
162 | 181 | arity 0 - 'describe', () -> description |
163 | | - where description is a map with keys :params :ins and :outs, each of which |
164 | | - in turn is a map of keyword to doc string, and :workload with |
165 | | - possible values of :mixed :io :compute. All entries in the describe |
166 | | - return map are optional. |
| 182 | + where description is a map with possible keys: |
| 183 | + :params :ins and :outs, each of which in turn is a map of keyword to doc string |
| 184 | + :signal-select - a predicate of a signal-id. Messages on approved |
| 185 | + signals will appear in the transform arity (see below) |
| 186 | + For the simple case of enumerated signal-ids, use a set, |
| 187 | + e.g. #{:this/signal :that/signal} |
| 188 | + If no :signal-select is provided, no signals will be received |
| 189 | + :workload with possible values of :mixed :io :compute. |
| 190 | + All entries in the describe return map are optional. |
167 | 191 | |
168 | 192 | :params describes the initial arguments to setup the state for the function. |
169 | | - :ins enumerates the input[s], for which the flow will create channels |
170 | | - :outs enumerates the output[s], for which the flow may create channels. |
| 193 | + :ins enumerates the process input[s], for which the flow will create channels |
| 194 | + :outs enumerates the process output[s], for which the flow _may_ create channels. |
171 | 195 | :workload - describes the nature of the workload, one of :mixed :io or :compute |
172 | 196 | an :io workload should not do extended computation |
173 | 197 | a :compute workload should never block |
174 | 198 | |
175 | | - No key may be present in both :ins and :outs, allowing for a uniform |
176 | | - channel coordinate system of [:process-id :channel-id]. The |
| 199 | + No io-id key may be present in both :ins and :outs, allowing for a |
| 200 | + uniform channel coordinate system of [:process-id :channel-id]. The |
177 | 201 | ins/outs/params returned will be the ins/outs/params of the |
178 | 202 | process. describe may be called by users to understand how to use |
179 | 203 | the proc. It will also be called by the impl in order to discover |
|
213 | 237 | process will no longer be used following that. See the SPI for |
214 | 238 | details. state' will be the state supplied to subsequent calls. |
215 | 239 |
|
216 | | - arity 3 - 'transform', (state in-name msg) -> [state' output] |
| 240 | + arity 3 - 'transform', (state in-or-signal-id msg) -> [state' output] |
217 | 241 | where output is a map of outid->[msgs*] |
218 | 242 |
|
219 | | - The transform arity will be called every time a message arrives at any |
220 | | - of the inputs. Output can be sent to none, any or all of the :outs |
221 | | - enumerated, and/or an input named by a [pid inid] tuple (e.g. for |
222 | | - reply-to), and/or to the ::flow/report output. A step need not |
223 | | - output at all (output or msgs can be empyt/nil), however an output _message_ |
224 | | - may never be nil (per core.async channels). state' will be the state |
225 | | - supplied to subsequent calls. |
| 243 | + The transform arity will be called every time a message arrives at |
| 244 | + any of the inputs or signals (selected via :signal-select in |
| 245 | + describe), identified by the id. Output can be sent to none, any or |
| 246 | + all of the :outs enumerated, and/or an input named by a [pid in-id] |
| 247 | + coord tuple (e.g. for reply-to), and/or to the ::flow/report |
| 248 | + output. |
| 249 | +
|
| 250 | + You can broadcast output to all processes selecting a signal via |
| 251 | + the special coord [::flow/cast a-signal-id] Note that signals cannot |
| 252 | + be sent to a particular process. |
| 253 | +
|
| 254 | + A step need not output at all (output or msgs can be empty/nil), |
| 255 | + however an output _message_ may never be nil (per core.async |
| 256 | + channels). state' will be the state supplied to subsequent calls. |
226 | 257 |
|
227 | 258 | process also accepts an option map with keys: |
228 | 259 | :workload - one of :mixed, :io or :compute |
|
0 commit comments