Skip to content

Commit e3769b9

Browse files
committed
Implement priority in queue
1 parent dddde56 commit e3769b9

File tree

1 file changed

+149
-0
lines changed

1 file changed

+149
-0
lines changed

JavaScript/4-priority.js

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
'use strict';
2+
3+
class Queue {
4+
constructor(concurrency) {
5+
this.paused = false;
6+
this.concurrency = concurrency;
7+
this.count = 0;
8+
this.waiting = [];
9+
this.onProcess = null;
10+
this.onDone = null;
11+
this.onSuccess = null;
12+
this.onFailure = null;
13+
this.onDrain = null;
14+
this.waitTimeout = Infinity;
15+
this.processTimeout = Infinity;
16+
this.priorityMode = false;
17+
}
18+
static channels(concurrency) {
19+
return new Queue(concurrency);
20+
}
21+
wait(msec) {
22+
this.waitTimeout = msec;
23+
return this;
24+
}
25+
timeout(msec) {
26+
this.processTimeout = msec;
27+
return this;
28+
}
29+
add(task, priority = 0) {
30+
if (!this.paused) {
31+
const hasChannel = this.count < this.concurrency;
32+
if (hasChannel) {
33+
this.next(task);
34+
return;
35+
}
36+
}
37+
this.waiting.push({ task, start: Date.now(), priority });
38+
if (this.priorityMode) {
39+
this.waiting.sort((a, b) => b.priority - a.priority);
40+
}
41+
}
42+
next(task) {
43+
this.count++;
44+
let timer = null;
45+
let finished = false;
46+
const { processTimeout, onProcess } = this;
47+
const finish = (err, res) => {
48+
if (finished) return;
49+
finished = true;
50+
if (timer) clearTimeout(timer);
51+
this.count--;
52+
this.finish(err, res);
53+
if (!this.paused && this.waiting.length > 0) this.takeNext();
54+
};
55+
if (processTimeout !== Infinity) {
56+
const err = new Error('Process timed out');
57+
timer = setTimeout(finish, processTimeout, err, task);
58+
}
59+
onProcess(task, finish);
60+
}
61+
takeNext() {
62+
const { waiting, waitTimeout } = this;
63+
const { task, start } = waiting.shift();
64+
if (waitTimeout !== Infinity) {
65+
const delay = Date.now() - start;
66+
if (delay > waitTimeout) {
67+
const err = new Error('Waiting timed out');
68+
this.finish(err, task);
69+
if (waiting.length > 0) {
70+
setTimeout(() => {
71+
if (this.paused && waiting.length > 0) this.takeNext();
72+
}, 0);
73+
}
74+
return;
75+
}
76+
}
77+
const hasChannel = this.count < this.concurrency;
78+
if (hasChannel) this.next(task);
79+
return;
80+
}
81+
finish(err, res) {
82+
const { onFailure, onSuccess, onDone, onDrain } = this;
83+
if (err) {
84+
if (onFailure) onFailure(err, res);
85+
} else if (onSuccess) {
86+
onSuccess(res);
87+
}
88+
if (onDone) onDone(err, res);
89+
if (this.count === 0 && onDrain) onDrain();
90+
}
91+
process(listener) {
92+
this.onProcess = listener;
93+
return this;
94+
}
95+
done(listener) {
96+
this.onDone = listener;
97+
return this;
98+
}
99+
success(listener) {
100+
this.onSuccess = listener;
101+
return this;
102+
}
103+
failure(listener) {
104+
this.onFailure = listener;
105+
return this;
106+
}
107+
drain(listener) {
108+
this.onDrain = listener;
109+
return this;
110+
}
111+
pause() {
112+
this.paused = true;
113+
return this;
114+
}
115+
resume() {
116+
this.paused = false;
117+
if (this.waiting.length > 0) {
118+
const hasChannel = this.count < this.concurrency;
119+
if (hasChannel) this.takeNext();
120+
}
121+
return this;
122+
}
123+
priority(flag = true) {
124+
this.priorityMode = flag;
125+
return this;
126+
}
127+
}
128+
129+
// Usage
130+
131+
const job = (task, next) => {
132+
console.log(`Process: ${task.name}`);
133+
setTimeout(next, task.interval, null, task).unref();
134+
};
135+
136+
const queue = Queue.channels(3)
137+
.wait(4000)
138+
.timeout(5000)
139+
.process(job)
140+
.priority()
141+
.done((err, task) => console.log(`Done: ${task.name}`))
142+
.success(task => console.log(`Success: ${task.name}`))
143+
.failure((err, task) => console.log(`Failure: ${err} ${task.name}`))
144+
.drain(() => console.log('Queue drain'));
145+
146+
for (let i = 0; i < 10; i++) {
147+
const interval = (10 - i) * 1000;
148+
queue.add({ name: `Task${i}`, interval }, i);
149+
}

0 commit comments

Comments
 (0)