Skip to content

Commit d69baf8

Browse files
committed
test: prevent proxy test conflicts via worker-threads
1 parent b7e742a commit d69baf8

File tree

4 files changed

+114
-105
lines changed

4 files changed

+114
-105
lines changed

test/unit/helpers.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,18 @@ if (semver.satisfies(zmq.version, ">= 4.2")) {
1717
* Get a unique id to be used as a port number or IPC path.
1818
* This function is thread-safe and will use a lock file to ensure that the id is unique.
1919
*/
20-
let idFallback = 5000
20+
let idFallback = 6000
2121
async function getUniqueId() {
2222
const idPath = path.resolve(__dirname, "../../tmp/port-id.lock")
2323
await fs.promises.mkdir(path.dirname(idPath), {recursive: true})
2424

2525
try {
2626
// Create the file if it doesn't exist
2727
if (!fs.existsSync(idPath)) {
28-
await fs.promises.writeFile(idPath, "5000", "utf8")
28+
await fs.promises.writeFile(idPath, "6000", "utf8")
2929

3030
/* Windows cannot bind on a ports just above 1014; start higher to be safe. */
31-
return 5000
31+
return 6000
3232
}
3333

3434
await lockfile.lock(idPath, {retries: 10})
@@ -63,7 +63,7 @@ async function getUniqueId() {
6363
}
6464
}
6565

66-
type Proto = "ipc" | "tcp" | "udp" | "inproc"
66+
export type Proto = "ipc" | "tcp" | "udp" | "inproc"
6767

6868
export async function uniqAddress(proto: Proto) {
6969
const id = await getUniqueId()

test/unit/proxy-router-dealer-test.ts

Lines changed: 11 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -1,105 +1,16 @@
1-
import * as semver from "semver"
2-
import * as zmq from "../../src"
3-
4-
import {assert} from "chai"
5-
import {cleanSocket, testProtos, uniqAddress} from "./helpers"
1+
import {Worker} from "worker_threads"
2+
import {testProtos} from "./helpers"
63

74
for (const proto of testProtos("tcp", "ipc", "inproc")) {
8-
describe(`proxy with ${proto} router/dealer`, function () {
9-
/* ZMQ < 4.0.5 has no steerable proxy support. */
10-
if (semver.satisfies(zmq.version, "< 4.0.5")) {
11-
return
12-
}
13-
14-
let proxy: zmq.Proxy
15-
16-
let frontAddress: string
17-
let backAddress: string
18-
19-
let req: zmq.Request
20-
let rep: zmq.Reply
21-
22-
beforeEach(async function () {
23-
proxy = new zmq.Proxy(new zmq.Router(), new zmq.Dealer())
24-
25-
frontAddress = await uniqAddress(proto)
26-
backAddress = await uniqAddress(proto)
27-
28-
req = new zmq.Request()
29-
rep = new zmq.Reply()
30-
})
31-
32-
afterEach(async function () {
33-
/* Closing proxy sockets is only necessary if run() fails. */
34-
proxy.frontEnd.close()
35-
proxy.backEnd.close()
36-
37-
req.close()
38-
rep.close()
39-
global.gc?.()
40-
await Promise.all([cleanSocket(frontAddress), cleanSocket(backAddress)])
41-
})
42-
43-
describe("run", function () {
44-
it("should proxy messages", async function () {
45-
/* REQ -> foo -> ROUTER <-> DEALER -> foo -> REP
46-
<- foo <- <- foo <-
47-
-> bar -> -> bar ->
48-
<- bar <- <- bar <-
49-
pause
50-
resume
51-
-> baz -> -> baz ->
52-
<- baz <- <- baz <-
53-
-> qux -> -> qux ->
54-
<- qux <- <- qux <-
55-
*/
56-
57-
await proxy.frontEnd.bind(frontAddress)
58-
await proxy.backEnd.bind(backAddress)
59-
60-
const done = proxy.run()
61-
62-
const messages = ["foo", "bar", "baz", "qux"]
63-
const received: string[] = []
64-
65-
await req.connect(frontAddress)
66-
await rep.connect(backAddress)
67-
68-
const echo = async () => {
69-
for await (const msg of rep) {
70-
await rep.send(msg)
71-
}
72-
}
73-
74-
const send = async () => {
75-
for (const msg of messages) {
76-
if (received.length === 2) {
77-
proxy.pause()
78-
proxy.resume()
79-
}
80-
81-
await req.send(Buffer.from(msg))
82-
83-
const [res] = await req.receive()
84-
received.push(res.toString())
85-
if (received.length === messages.length) {
86-
break
87-
}
88-
}
89-
90-
rep.close()
91-
}
92-
93-
console.log(
94-
`waiting for messages for proxy with ${proto} router/dealer...`,
95-
)
96-
97-
await Promise.all([echo(), send()])
98-
assert.deepEqual(received, messages)
99-
100-
proxy.terminate()
101-
await done
102-
console.log(`Done proxying with ${proto} router/dealer`)
5+
describe(`proxy with ${proto} router/dealer`, () => {
6+
describe("run", () => {
7+
it("should proxy messages", async () => {
8+
const worker = new Worker(__filename, {
9+
workerData: {
10+
proto,
11+
},
12+
})
13+
await worker.terminate()
10314
})
10415
})
10516
})
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
import {assert} from "chai"
2+
import * as semver from "semver"
3+
import * as zmq from "../../src"
4+
import type {Proto} from "./helpers"
5+
import {cleanSocket, uniqAddress} from "./helpers"
6+
import {workerData} from "worker_threads"
7+
8+
async function testProxyRouterDealer(proto: Proto) {
9+
/* ZMQ < 4.0.5 has no steerable proxy support. */
10+
if (semver.satisfies(zmq.version, "< 4.0.5")) {
11+
return
12+
}
13+
14+
const proxy = new zmq.Proxy(new zmq.Router(), new zmq.Dealer())
15+
16+
const frontAddress = await uniqAddress(proto)
17+
const backAddress = await uniqAddress(proto)
18+
19+
const req = new zmq.Request()
20+
const rep = new zmq.Reply()
21+
22+
try {
23+
/* REQ -> foo -> ROUTER <-> DEALER -> foo -> REP
24+
<- foo <- <- foo <-
25+
-> bar -> -> bar ->
26+
<- bar <- <- bar <-
27+
pause
28+
resume
29+
-> baz -> -> baz ->
30+
<- baz <- <- baz <-
31+
-> qux -> -> qux ->
32+
<- qux <- <- qux <-
33+
*/
34+
await proxy.frontEnd.bind(frontAddress)
35+
await proxy.backEnd.bind(backAddress)
36+
37+
const done = proxy.run()
38+
39+
const messages = ["foo", "bar", "baz", "qux"]
40+
const received: string[] = []
41+
42+
await req.connect(frontAddress)
43+
await rep.connect(backAddress)
44+
45+
const echo = async () => {
46+
for await (const msg of rep) {
47+
await rep.send(msg)
48+
}
49+
}
50+
51+
const send = async () => {
52+
for (const msg of messages) {
53+
if (received.length === 2) {
54+
proxy.pause()
55+
proxy.resume()
56+
}
57+
58+
await req.send(Buffer.from(msg))
59+
60+
const [res] = await req.receive()
61+
received.push(res.toString())
62+
if (received.length === messages.length) {
63+
break
64+
}
65+
}
66+
67+
rep.close()
68+
}
69+
70+
console.log(`waiting for messages for proxy with ${proto} router/dealer...`)
71+
72+
await Promise.all([echo(), send()])
73+
assert.deepEqual(received, messages)
74+
75+
proxy.terminate()
76+
await done
77+
console.log(`Done proxying with ${proto} router/dealer`)
78+
} catch (err) {
79+
/* Closing proxy sockets is only necessary if run() fails. */
80+
proxy.frontEnd.close()
81+
proxy.backEnd.close()
82+
throw err
83+
} finally {
84+
req.close()
85+
rep.close()
86+
global.gc?.()
87+
await Promise.all([cleanSocket(frontAddress), cleanSocket(backAddress)])
88+
}
89+
}
90+
91+
// Receive the proto from the main thread
92+
testProxyRouterDealer(workerData.proto as Proto).catch(err => {
93+
console.error(
94+
`Error testing proxy with ${workerData.proto} router/dealer:`,
95+
err,
96+
)
97+
process.exit(1)
98+
})

test/unit/proxy-run-test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import * as semver from "semver"
22
import * as zmq from "../../src"
33

44
import {assert} from "chai"
5-
import {testProtos, uniqAddress} from "./helpers"
5+
import {cleanSocket, testProtos, uniqAddress} from "./helpers"
66
import {isFullError} from "../../src/errors"
77

88
for (const proto of testProtos("tcp", "ipc", "inproc")) {

0 commit comments

Comments
 (0)