Skip to content

Commit 8e4772e

Browse files
committed
Simple multichannel queue implementation
1 parent f1a34c0 commit 8e4772e

File tree

1 file changed

+83
-0
lines changed

1 file changed

+83
-0
lines changed

JavaScript/1-channels.js

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
'use strict';
2+
3+
class Queue {
4+
constructor(concurrency) {
5+
this.concurrency = concurrency;
6+
this.count = 0;
7+
this.waiting = [];
8+
this.onProcess = null;
9+
this.onDone = null;
10+
this.onSuccess = null;
11+
this.onFailure = null;
12+
this.onDrain = null;
13+
}
14+
static channels(concurrency) {
15+
return new Queue(concurrency);
16+
}
17+
add(task) {
18+
const hasChannel = this.count < this.concurrency;
19+
if (hasChannel) {
20+
this.next(task);
21+
return;
22+
}
23+
this.waiting.push(task);
24+
}
25+
next(task) {
26+
this.count++;
27+
this.onProcess(task, (err, result) => {
28+
if (err) {
29+
if (this.onFailure) this.onFailure(err);
30+
} else if (this.onSuccess) {
31+
this.onSuccess(result);
32+
}
33+
if (this.onDone) this.onDone(err, result);
34+
this.count--;
35+
if (this.waiting.length > 0) {
36+
const task = this.waiting.shift();
37+
this.next(task);
38+
return;
39+
}
40+
if (this.count === 0 && this.onDrain) {
41+
this.onDrain();
42+
}
43+
});
44+
}
45+
process(listener) {
46+
this.onProcess = listener;
47+
return this;
48+
}
49+
done(listener) {
50+
this.onDone = listener;
51+
return this;
52+
}
53+
success(listener) {
54+
this.onSuccess = listener;
55+
return this;
56+
}
57+
failure(listener) {
58+
this.onFailure = listener;
59+
return this;
60+
}
61+
drain(listener) {
62+
this.onDrain = listener;
63+
return this;
64+
}
65+
}
66+
67+
// Usage
68+
69+
const job = (task, next) => {
70+
console.log(`Process: ${task.name}`);
71+
setTimeout(next, task.interval, null, task);
72+
};
73+
74+
const queue = Queue.channels(3)
75+
.process(job)
76+
.done((err, res) => console.log(`Done: ${res.name}`))
77+
.success(res => console.log(`Success: ${res.name}`))
78+
.failure(err => console.log(`Failure: ${err}`))
79+
.drain(() => console.log('Queue drain'));
80+
81+
for (let i = 0; i < 10; i++) {
82+
queue.add({ name: `Task${i}`, interval: i * 1000 });
83+
}

0 commit comments

Comments
 (0)