|
1 | | -import * as semver from "semver" |
2 | | -import * as zmq from "../../src" |
3 | | - |
4 | | -import {assert} from "chai" |
5 | | -import {testProtos, uniqAddress} from "./helpers" |
| 1 | +import {Worker} from "worker_threads" |
| 2 | +import {testProtos} from "./helpers" |
6 | 3 |
|
7 | 4 | for (const proto of testProtos("tcp", "ipc", "inproc")) { |
8 | | - describe(`proxy with ${proto} router/dealer`, function () { |
9 | | - /* ZMQ < 4.0.5 has no steerable proxy support. */ |
10 | | - if (semver.satisfies(zmq.version, "< 4.0.5")) { |
11 | | - return |
12 | | - } |
13 | | - |
14 | | - let proxy: zmq.Proxy |
15 | | - |
16 | | - let frontAddress: string |
17 | | - let backAddress: string |
18 | | - |
19 | | - let req: zmq.Request |
20 | | - let rep: zmq.Reply |
21 | | - |
22 | | - beforeEach(async function () { |
23 | | - proxy = new zmq.Proxy(new zmq.Router(), new zmq.Dealer()) |
24 | | - |
25 | | - frontAddress = await uniqAddress(proto) |
26 | | - backAddress = await uniqAddress(proto) |
27 | | - |
28 | | - req = new zmq.Request() |
29 | | - rep = new zmq.Reply() |
30 | | - }) |
31 | | - |
32 | | - afterEach(function () { |
33 | | - /* Closing proxy sockets is only necessary if run() fails. */ |
34 | | - proxy.frontEnd.close() |
35 | | - proxy.backEnd.close() |
36 | | - |
37 | | - req.close() |
38 | | - rep.close() |
39 | | - global.gc?.() |
40 | | - }) |
41 | | - |
42 | | - describe("run", function () { |
43 | | - it("should proxy messages", async function () { |
44 | | - /* REQ -> foo -> ROUTER <-> DEALER -> foo -> REP |
45 | | - <- foo <- <- foo <- |
46 | | - -> bar -> -> bar -> |
47 | | - <- bar <- <- bar <- |
48 | | - pause |
49 | | - resume |
50 | | - -> baz -> -> baz -> |
51 | | - <- baz <- <- baz <- |
52 | | - -> qux -> -> qux -> |
53 | | - <- qux <- <- qux <- |
54 | | - */ |
55 | | - |
56 | | - await proxy.frontEnd.bind(frontAddress) |
57 | | - await proxy.backEnd.bind(backAddress) |
58 | | - |
59 | | - const done = proxy.run() |
60 | | - |
61 | | - const messages = ["foo", "bar", "baz", "qux"] |
62 | | - const received: string[] = [] |
63 | | - |
64 | | - await req.connect(frontAddress) |
65 | | - await rep.connect(backAddress) |
66 | | - |
67 | | - const echo = async () => { |
68 | | - for await (const msg of rep) { |
69 | | - await rep.send(msg) |
70 | | - } |
71 | | - } |
72 | | - |
73 | | - const send = async () => { |
74 | | - for (const msg of messages) { |
75 | | - if (received.length === 2) { |
76 | | - proxy.pause() |
77 | | - proxy.resume() |
78 | | - } |
79 | | - |
80 | | - await req.send(Buffer.from(msg)) |
81 | | - |
82 | | - const [res] = await req.receive() |
83 | | - received.push(res.toString()) |
84 | | - if (received.length === messages.length) { |
85 | | - break |
86 | | - } |
87 | | - } |
88 | | - |
89 | | - rep.close() |
90 | | - } |
91 | | - |
92 | | - console.log( |
93 | | - `waiting for messages for proxy with ${proto} router/dealer...`, |
94 | | - ) |
95 | | - |
96 | | - await Promise.all([echo(), send()]) |
97 | | - assert.deepEqual(received, messages) |
98 | | - |
99 | | - proxy.terminate() |
100 | | - await done |
101 | | - console.log(`Done proxying with ${proto} router/dealer`) |
| 5 | + describe(`proxy with ${proto} router/dealer`, () => { |
| 6 | + describe("run", () => { |
| 7 | + it("should proxy messages", async () => { |
| 8 | + const worker = new Worker(__filename, { |
| 9 | + workerData: { |
| 10 | + proto, |
| 11 | + }, |
| 12 | + }) |
| 13 | + await worker.terminate() |
102 | 14 | }) |
103 | 15 | }) |
104 | 16 | }) |
|
0 commit comments