Skip to content

Commit fc16802

Browse files
committed
chore: fix AsyncIterator return value
1 parent 9d24a32 commit fc16802

File tree

13 files changed

+39
-42
lines changed

13 files changed

+39
-42
lines changed

example/src/demo_queues/fetchMetrics.ts

Lines changed: 21 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ const prefix = queueSettings.prefix;
1111
const metricsScheduler = new QueueScheduler(queueSettings.name, {
1212
prefix,
1313
connection: createBullConnection('scheduler'), // BULL_REDIS_URI,
14+
stalledInterval: 900000,
1415
});
1516

1617
export const metricsQueue = new Queue(queueSettings.name, {
@@ -24,39 +25,33 @@ metricsQueue.add(
2425
{ repeat: { cron: '*/1 * * * *' } }
2526
);
2627

27-
metricsQueue.add(
28-
'fetch_metrics_every_100000',
29-
{ field1: 'asdasdadas' },
30-
{ repeat: { every: 100000 } }
31-
);
28+
metricsQueue.add('fetch_metrics_every_5000', { field1: 'asdasdadas' }, { repeat: { every: 5000 } });
3229

3330
const metricsWorker = new Worker(
3431
queueSettings.name,
3532
async (job) => {
36-
// for (let i = 0; i < 5; i++) {
37-
// job.updateProgress(i * 20);
38-
// await new Promise((resolve) => setTimeout(resolve, 1000));
39-
// }
40-
// console.log(`----> execution of job ${job.id} with data ${JSON.stringify(job.data)}`);
33+
for (let i = 0; i < 5; i++) {
34+
job.updateProgress(i * 20);
35+
await new Promise((resolve) => setTimeout(resolve, 1000));
36+
}
37+
console.log(`----> execution of job ${job.id} with data ${JSON.stringify(job.data)}`);
4138

4239
//https://github.com/taskforcesh/bullmq/issues/69
43-
console.log(new Date().toISOString(), 'Starting name: ' + job.name + ', job: ' + job.id);
44-
return new Promise((resolve, reject) => {
45-
//setTimeout(() => reject('Здесь какая то причина ...'), 0);
46-
setTimeout(
47-
() =>
48-
resolve({
49-
status: 'job completed',
50-
result: new Date().toISOString(),
51-
}),
52-
100000
53-
);
54-
});
40+
// console.log(new Date().toISOString(), 'Starting name: ' + job.name + ', job: ' + job.id);
41+
// return new Promise((resolve, reject) => {
42+
// //setTimeout(() => reject('Здесь какая то причина ...'), 0);
43+
// setTimeout(
44+
// () =>
45+
// resolve({
46+
// status: 'job completed',
47+
// result: new Date().toISOString(),
48+
// }),
49+
// 100000
50+
// );
51+
// });
5552
},
5653
{
57-
settings: {
58-
stalledInterval: 300,
59-
},
54+
lockDuration: 900000,
6055
prefix,
6156
connection: createBullConnection('worker'), // BULL_REDIS_URI,
6257
}
@@ -74,5 +69,5 @@ export default {
7469
name: queueSettings.name as string,
7570
prefix: queueSettings.prefix as string,
7671
bullQueue: metricsQueue,
77-
jobNames: ['fetch_metrics_every_5m'],
72+
jobNames: ['fetch_metrics_every_5s'],
7873
};

example/src/index.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,13 @@ const server = new ApolloServer({
77
context: () => {
88
return { Queues };
99
},
10+
subscriptions: {
11+
path: '/',
12+
},
1013
});
1114

12-
server.listen(5000).then(({ url }) => {
15+
server.listen(process.env.PORT || 5000).then(({ url }) => {
1316
console.log(`🚀 Server ready at ${url}`);
17+
console.log(`🚀 WebSocket ready at ${url.replace(/^http/i, 'ws')}`);
1418
console.log(`🚀 Server pid: ${process.pid}`);
1519
});

src/helpers/queueEventsListen.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ export function getAsyncIterator(
88
opts: Options
99
) {
1010
const queueEvents = getQueueEventsSingleton(prefix, queueName, opts);
11-
return createAsyncIterator(queueEvents, prefix, queueName, eventName);
11+
return createAsyncIterator(queueEvents, eventName);
1212
}
1313

1414
const queueEventsMap = new Map();
@@ -32,8 +32,6 @@ function getQueueEventsSingleton(prefix: string, queueName: string, opts: Option
3232

3333
function createAsyncIterator<T = any>(
3434
queueEvents: QueueEvents,
35-
prefix: string,
36-
queueName: string,
3735
eventName: string
3836
): Required<AsyncIterator<T>> {
3937
let pullSeries: any = [];
@@ -43,7 +41,7 @@ function createAsyncIterator<T = any>(
4341
const pushValue = async (event) => {
4442
if (pullSeries.length !== 0) {
4543
const resolver = pullSeries.shift();
46-
resolver(event);
44+
resolver({ value: event, done: false });
4745
} else {
4846
pushSeries.push(event);
4947
}
@@ -61,7 +59,7 @@ function createAsyncIterator<T = any>(
6159
};
6260

6361
const handler = (event) => {
64-
pushValue({ prefix, queueName, ...event });
62+
pushValue(event);
6563
};
6664

6765
queueEvents.on(eventName, handler);

src/subscriptions/onJobActive.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ export function createOnJobActiveFC(
2727
},
2828
queueName: 'String!',
2929
},
30-
resolve: async ({ prefix, queueName, jobId, prev }) => {
30+
resolve: async ({ jobId, prev }, { prefix, queueName }) => {
3131
const queue = getQueue(prefix, queueName, opts);
3232
const job = await queue.getJob(jobId);
3333
return {

src/subscriptions/onJobCompleted.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ export function createOnJobCompletedFC(
2626
},
2727
queueName: 'String!',
2828
},
29-
resolve: async ({ prefix, queueName, jobId }) => {
29+
resolve: async ({ jobId }, { prefix, queueName }) => {
3030
const queue = getQueue(prefix, queueName, opts);
3131
const job = await queue.getJob(jobId);
3232
return {

src/subscriptions/onJobDelayed.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ export function createOnJobDelayedFC(
2727
},
2828
queueName: 'String!',
2929
},
30-
resolve: async ({ prefix, queueName, jobId, delay }) => {
30+
resolve: async ({ jobId, delay }, { prefix, queueName }) => {
3131
const queue = getQueue(prefix, queueName, opts);
3232
const job = await queue.getJob(jobId);
3333
return {

src/subscriptions/onJobFailed.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ export function createOnJobFailedFC(
2626
},
2727
queueName: 'String!',
2828
},
29-
resolve: async ({ prefix, queueName, jobId }) => {
29+
resolve: async ({ jobId }, { prefix, queueName }) => {
3030
const queue = getQueue(prefix, queueName, opts);
3131
const job = await queue.getJob(jobId);
3232
return {

src/subscriptions/onJobProgress.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ export function createOnJobProgressFC(
2727
},
2828
queueName: 'String!',
2929
},
30-
resolve: async ({ prefix, queueName, jobId, data }) => {
30+
resolve: async ({ jobId, data }, { prefix, queueName }) => {
3131
const queue = getQueue(prefix, queueName, opts);
3232
const job = await queue.getJob(jobId);
3333
return {

src/subscriptions/onJobRemoved.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ export function createOnJobRemovedFC(
2727
},
2828
queueName: 'String!',
2929
},
30-
resolve: async ({ prefix, queueName, jobId, prev }) => {
30+
resolve: async ({ jobId, prev }, { prefix, queueName }) => {
3131
const queue = getQueue(prefix, queueName, opts);
3232
const job = await queue.getJob(jobId);
3333
return {

src/subscriptions/onJobStalled.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ export function createOnJobStalledFC(
2626
},
2727
queueName: 'String!',
2828
},
29-
resolve: async ({ prefix, queueName, jobId }) => {
29+
resolve: async ({ jobId }, { prefix, queueName }) => {
3030
const queue = getQueue(prefix, queueName, opts);
3131
const job = await queue.getJob(jobId);
3232
return {

0 commit comments

Comments
 (0)