Skip to content

Commit d601579

Browse files
committed
Implement pipe
1 parent 077b94c commit d601579

File tree

1 file changed

+149
-0
lines changed

1 file changed

+149
-0
lines changed

JavaScript/5-pipe.js

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
'use strict';
2+
3+
class Queue {
4+
constructor(concurrency) {
5+
this.paused = false;
6+
this.concurrency = concurrency;
7+
this.count = 0;
8+
this.waiting = [];
9+
this.onProcess = null;
10+
this.onDone = null;
11+
this.onSuccess = null;
12+
this.onFailure = null;
13+
this.onDrain = null;
14+
this.waitTimeout = Infinity;
15+
this.processTimeout = Infinity;
16+
this.priorityMode = false;
17+
this.destination = null;
18+
}
19+
static channels(concurrency) {
20+
return new Queue(concurrency);
21+
}
22+
wait(msec) {
23+
this.waitTimeout = msec;
24+
return this;
25+
}
26+
timeout(msec) {
27+
this.processTimeout = msec;
28+
return this;
29+
}
30+
add(task, priority = 0) {
31+
if (!this.paused) {
32+
const hasChannel = this.count < this.concurrency;
33+
if (hasChannel) {
34+
this.next(task);
35+
return;
36+
}
37+
}
38+
this.waiting.push({ task, start: Date.now(), priority });
39+
if (this.priorityMode) {
40+
this.waiting.sort((a, b) => b.priority - a.priority);
41+
}
42+
}
43+
next(task) {
44+
this.count++;
45+
let timer = null;
46+
let finished = false;
47+
const { processTimeout, onProcess } = this;
48+
const finish = (err, res) => {
49+
if (finished) return;
50+
finished = true;
51+
if (timer) clearTimeout(timer);
52+
this.count--;
53+
this.finish(err, res);
54+
if (!this.paused && this.waiting.length > 0) this.takeNext();
55+
};
56+
if (processTimeout !== Infinity) {
57+
const err = new Error('Process timed out');
58+
timer = setTimeout(finish, processTimeout, err, task);
59+
}
60+
onProcess(task, finish);
61+
}
62+
takeNext() {
63+
const { waiting, waitTimeout } = this;
64+
const { task, start } = waiting.shift();
65+
if (waitTimeout !== Infinity) {
66+
const delay = Date.now() - start;
67+
if (delay > waitTimeout) {
68+
const err = new Error('Waiting timed out');
69+
this.finish(err, task);
70+
if (waiting.length > 0) {
71+
setTimeout(() => {
72+
if (this.paused && waiting.length > 0) this.takeNext();
73+
}, 0);
74+
}
75+
return;
76+
}
77+
}
78+
const hasChannel = this.count < this.concurrency;
79+
if (hasChannel) this.next(task);
80+
return;
81+
}
82+
finish(err, res) {
83+
const { onFailure, onSuccess, onDone, onDrain } = this;
84+
if (err) {
85+
if (onFailure) onFailure(err, res);
86+
} else {
87+
if (onSuccess) onSuccess(res);
88+
if (this.destination) this.destination.add(res);
89+
}
90+
if (onDone) onDone(err, res);
91+
if (this.count === 0 && onDrain) onDrain();
92+
}
93+
process(listener) {
94+
this.onProcess = listener;
95+
return this;
96+
}
97+
done(listener) {
98+
this.onDone = listener;
99+
return this;
100+
}
101+
success(listener) {
102+
this.onSuccess = listener;
103+
return this;
104+
}
105+
failure(listener) {
106+
this.onFailure = listener;
107+
return this;
108+
}
109+
drain(listener) {
110+
this.onDrain = listener;
111+
return this;
112+
}
113+
pause() {
114+
this.paused = true;
115+
return this;
116+
}
117+
resume() {
118+
this.paused = false;
119+
if (this.waiting.length > 0) {
120+
const hasChannel = this.count < this.concurrency;
121+
if (hasChannel) this.takeNext();
122+
}
123+
return this;
124+
}
125+
priority(flag = true) {
126+
this.priorityMode = flag;
127+
return this;
128+
}
129+
pipe(destination) {
130+
this.destination = destination;
131+
return this;
132+
}
133+
}
134+
135+
// Usage
136+
137+
const destination = Queue.channels(2)
138+
.wait(5000)
139+
.process((task, next) => next(null, { ...task, processed: true }))
140+
.done((err, task) => console.log({ task }));
141+
142+
const source = Queue.channels(3)
143+
.timeout(4000)
144+
.process((task, next) => setTimeout(next, task.interval, null, task))
145+
.pipe(destination);
146+
147+
for (let i = 0; i < 10; i++) {
148+
source.add({ name: `Task${i}`, interval: 1000 });
149+
}

0 commit comments

Comments
 (0)