Skip to content

Commit 614dff2

Browse files
Boris Dorofeevnodkz
authored andcommitted
feat: add job & queue subscription fields
1 parent 59da322 commit 614dff2

File tree

16 files changed

+529
-41
lines changed

16 files changed

+529
-41
lines changed

example/src/demo_queues/fetchMetrics.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,22 +33,30 @@ metricsQueue.add(
3333
const metricsWorker = new Worker(
3434
queueSettings.name,
3535
async (job) => {
36+
// for (let i = 0; i < 5; i++) {
37+
// job.updateProgress(i * 20);
38+
// await new Promise((resolve) => setTimeout(resolve, 1000));
39+
// }
40+
// console.log(`----> execution of job ${job.id} with data ${JSON.stringify(job.data)}`);
41+
3642
//https://github.com/taskforcesh/bullmq/issues/69
3743
console.log(new Date().toISOString(), 'Starting name: ' + job.name + ', job: ' + job.id);
3844
return new Promise((resolve, reject) => {
39-
//throw new Error('Ошибка обработки данных...');
4045
//setTimeout(() => reject('Здесь какая то причина ...'), 0);
4146
setTimeout(
4247
() =>
4348
resolve({
4449
status: 'job completed',
4550
result: new Date().toISOString(),
4651
}),
47-
0
52+
100000
4853
);
4954
});
5055
},
5156
{
57+
settings: {
58+
stalledInterval: 300,
59+
},
5260
prefix,
5361
connection: createBullConnection('worker'), // BULL_REDIS_URI,
5462
}

src/composeBull.ts

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,18 @@ import {
3434
createJobMoveToDelayedFC,
3535
createQueuePepUpFC,
3636
} from './mutation';
37-
import { createOnJobWaitingFC } from './subscriptions';
37+
import {
38+
createOnJobActiveFC,
39+
createOnJobCompletedFC,
40+
createOnJobDelayedFC,
41+
createOnJobFailedFC,
42+
createOnJobProgressFC,
43+
createOnJobRemovedFC,
44+
createOnJobStalledFC,
45+
createOnJobWaitingFC,
46+
createOnQueuePausedFC,
47+
createOnQueueResumedFC,
48+
} from './subscriptions';
3849
import { wrapMutationFC, wrapQueueArgs, wrapQueueSubsArgs, composeFC } from './helpers';
3950

4051
interface ComposeBullResult {
@@ -87,11 +98,20 @@ export function composeBull(
8798
},
8899
} as ComposeBullResult;
89100

90-
if (opts?.redisEvents) {
91-
data.subscriptionFields = {
92-
onJobWaiting: wrapSubscription(createOnJobWaitingFC),
93-
};
94-
}
101+
//if (opts?.redisEvents) {
102+
data.subscriptionFields = {
103+
onJobActive: wrapSubscription(createOnJobActiveFC),
104+
onJobCompleted: wrapSubscription(createOnJobCompletedFC),
105+
onJobDelayed: wrapSubscription(createOnJobDelayedFC),
106+
onJobFailed: wrapSubscription(createOnJobFailedFC),
107+
onJobProgress: wrapSubscription(createOnJobProgressFC),
108+
onJobRemoved: wrapSubscription(createOnJobRemovedFC),
109+
onJobStalled: wrapSubscription(createOnJobStalledFC),
110+
onJobWaiting: wrapSubscription(createOnJobWaitingFC),
111+
onQueuePaused: wrapSubscription(createOnQueuePausedFC),
112+
onQueueResumed: wrapSubscription(createOnQueueResumedFC),
113+
};
114+
//}
95115

96116
return data;
97117
}

src/helpers/queueEventsListen.ts

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ function createAsyncIterator<T = any>(
3535
prefix: string,
3636
queueName: string,
3737
eventName: string
38-
): AsyncIterator<T> {
38+
): Required<AsyncIterator<T>> {
3939
let pullSeries: any = [];
4040
let pushSeries: any = [];
4141
let listening = true;
@@ -78,22 +78,20 @@ function createAsyncIterator<T = any>(
7878
}
7979
}
8080

81-
const returnProp = () => {
82-
release();
83-
return Promise.resolve({ value: undefined, done: true });
84-
};
85-
8681
return {
8782
[Symbol.asyncIterator]() {
8883
return this;
8984
},
90-
return: returnProp,
85+
return() {
86+
release();
87+
return Promise.resolve({ value: undefined, done: true });
88+
},
9189
next() {
92-
return listening ? pullValue() : returnProp();
90+
return listening ? pullValue() : this.return();
9391
},
9492
throw(error) {
9593
release();
9694
return Promise.reject(error);
9795
},
98-
} as AsyncIterator<T>;
96+
} as Required<AsyncIterator<T>>;
9997
}

src/index.ts

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1 @@
11
export * from './composeBull';
2-
3-
import { getAsyncIterator } from './helpers';
4-
5-
const asyncIter = getAsyncIterator('bull.demo', 'fetch_metrics', 'completed', {
6-
typePrefix: 'Prefix',
7-
});
8-
9-
async function testAsyncIt() {
10-
let nn = 3;
11-
while (true) {
12-
const res = await asyncIter.next();
13-
console.log(res);
14-
15-
nn--;
16-
17-
if (nn == 0) {
18-
await asyncIter.return();
19-
break;
20-
}
21-
}
22-
}
23-
24-
testAsyncIt();

src/subscriptions/index.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,10 @@
1+
export { createOnJobActiveFC } from './onJobActive';
2+
export { createOnJobCompletedFC } from './onJobCompleted';
3+
export { createOnJobDelayedFC } from './onJobDelayed';
4+
export { createOnJobFailedFC } from './onJobFailed';
5+
export { createOnJobProgressFC } from './onJobProgress';
6+
export { createOnJobRemovedFC } from './onJobRemoved';
7+
export { createOnJobStalledFC } from './onJobStalled';
18
export { createOnJobWaitingFC } from './onJobWaiting';
9+
export { createOnQueuePausedFC } from './onQueuePaused';
10+
export { createOnQueueResumedFC } from './onQueueResumed';

src/subscriptions/onJobActive.ts

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import { SchemaComposer, ObjectTypeComposerFieldConfigAsObjectDefinition } from 'graphql-compose';
2+
import { getJobTC } from '../types/job/Job';
3+
import { getQueue } from '../helpers';
4+
import { Options } from '../definitions';
5+
import { getAsyncIterator } from '../helpers';
6+
import { getQueueTC } from '../types/queue/Queue';
7+
8+
export function createOnJobActiveFC(
9+
sc: SchemaComposer<any>,
10+
opts: Options
11+
): ObjectTypeComposerFieldConfigAsObjectDefinition<any, any> {
12+
return {
13+
type: sc.createObjectTC({
14+
name: 'OnJobActivePayload',
15+
fields: {
16+
job: getJobTC(sc, opts),
17+
queue: getQueueTC(sc, opts).NonNull,
18+
jobId: 'String!',
19+
queueName: 'String!',
20+
prev: 'String',
21+
},
22+
}),
23+
args: {
24+
prefix: {
25+
type: 'String!',
26+
defaultValue: 'bull',
27+
},
28+
queueName: 'String!',
29+
},
30+
resolve: async ({ prefix, queueName, jobId, prev }) => {
31+
const queue = getQueue(prefix, queueName, opts);
32+
const job = await queue.getJob(jobId);
33+
return {
34+
job,
35+
queue,
36+
jobId,
37+
queueName,
38+
prev,
39+
};
40+
},
41+
subscribe: (_, { prefix, queueName }) => {
42+
return getAsyncIterator(prefix, queueName, 'active', opts);
43+
},
44+
};
45+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import { SchemaComposer, ObjectTypeComposerFieldConfigAsObjectDefinition } from 'graphql-compose';
2+
import { getJobTC } from '../types/job/Job';
3+
import { getQueue } from '../helpers';
4+
import { Options } from '../definitions';
5+
import { getAsyncIterator } from '../helpers';
6+
import { getQueueTC } from '../types/queue/Queue';
7+
8+
export function createOnJobCompletedFC(
9+
sc: SchemaComposer<any>,
10+
opts: Options
11+
): ObjectTypeComposerFieldConfigAsObjectDefinition<any, any> {
12+
return {
13+
type: sc.createObjectTC({
14+
name: 'OnJobCompletedPayload',
15+
fields: {
16+
job: getJobTC(sc, opts),
17+
queue: getQueueTC(sc, opts).NonNull,
18+
jobId: 'String!',
19+
queueName: 'String!',
20+
},
21+
}),
22+
args: {
23+
prefix: {
24+
type: 'String!',
25+
defaultValue: 'bull',
26+
},
27+
queueName: 'String!',
28+
},
29+
resolve: async ({ prefix, queueName, jobId }) => {
30+
const queue = getQueue(prefix, queueName, opts);
31+
const job = await queue.getJob(jobId);
32+
return {
33+
job,
34+
queue,
35+
jobId,
36+
queueName,
37+
};
38+
},
39+
subscribe: (_, { prefix, queueName }) => {
40+
return getAsyncIterator(prefix, queueName, 'completed', opts);
41+
},
42+
};
43+
}

src/subscriptions/onJobDelayed.ts

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import { SchemaComposer, ObjectTypeComposerFieldConfigAsObjectDefinition } from 'graphql-compose';
2+
import { getJobTC } from '../types/job/Job';
3+
import { getQueue } from '../helpers';
4+
import { Options } from '../definitions';
5+
import { getAsyncIterator } from '../helpers';
6+
import { getQueueTC } from '../types/queue/Queue';
7+
8+
export function createOnJobDelayedFC(
9+
sc: SchemaComposer<any>,
10+
opts: Options
11+
): ObjectTypeComposerFieldConfigAsObjectDefinition<any, any> {
12+
return {
13+
type: sc.createObjectTC({
14+
name: 'OnJobDelayedPayload',
15+
fields: {
16+
job: getJobTC(sc, opts),
17+
queue: getQueueTC(sc, opts).NonNull,
18+
jobId: 'String!',
19+
queueName: 'String!',
20+
delay: 'Int',
21+
},
22+
}),
23+
args: {
24+
prefix: {
25+
type: 'String!',
26+
defaultValue: 'bull',
27+
},
28+
queueName: 'String!',
29+
},
30+
resolve: async ({ prefix, queueName, jobId, delay }) => {
31+
const queue = getQueue(prefix, queueName, opts);
32+
const job = await queue.getJob(jobId);
33+
return {
34+
job,
35+
queue,
36+
jobId,
37+
queueName,
38+
delay: parseInt(delay),
39+
};
40+
},
41+
subscribe: (_, { prefix, queueName }) => {
42+
return getAsyncIterator(prefix, queueName, 'delayed', opts);
43+
},
44+
};
45+
}

src/subscriptions/onJobFailed.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import { SchemaComposer, ObjectTypeComposerFieldConfigAsObjectDefinition } from 'graphql-compose';
2+
import { getJobTC } from '../types/job/Job';
3+
import { getQueue } from '../helpers';
4+
import { Options } from '../definitions';
5+
import { getAsyncIterator } from '../helpers';
6+
import { getQueueTC } from '../types/queue/Queue';
7+
8+
export function createOnJobFailedFC(
9+
sc: SchemaComposer<any>,
10+
opts: Options
11+
): ObjectTypeComposerFieldConfigAsObjectDefinition<any, any> {
12+
return {
13+
type: sc.createObjectTC({
14+
name: 'OnJobFailedPayload',
15+
fields: {
16+
job: getJobTC(sc, opts),
17+
queue: getQueueTC(sc, opts).NonNull,
18+
jobId: 'String!',
19+
queueName: 'String!',
20+
},
21+
}),
22+
args: {
23+
prefix: {
24+
type: 'String!',
25+
defaultValue: 'bull',
26+
},
27+
queueName: 'String!',
28+
},
29+
resolve: async ({ prefix, queueName, jobId }) => {
30+
const queue = getQueue(prefix, queueName, opts);
31+
const job = await queue.getJob(jobId);
32+
return {
33+
job,
34+
queue,
35+
jobId,
36+
queueName,
37+
};
38+
},
39+
subscribe: (_, { prefix, queueName }) => {
40+
return getAsyncIterator(prefix, queueName, 'failed', opts);
41+
},
42+
};
43+
}

src/subscriptions/onJobProgress.ts

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import { SchemaComposer, ObjectTypeComposerFieldConfigAsObjectDefinition } from 'graphql-compose';
2+
import { getJobTC } from '../types/job/Job';
3+
import { getQueue } from '../helpers';
4+
import { Options } from '../definitions';
5+
import { getAsyncIterator } from '../helpers';
6+
import { getQueueTC } from '../types/queue/Queue';
7+
8+
export function createOnJobProgressFC(
9+
sc: SchemaComposer<any>,
10+
opts: Options
11+
): ObjectTypeComposerFieldConfigAsObjectDefinition<any, any> {
12+
return {
13+
type: sc.createObjectTC({
14+
name: 'OnJobProgressPayload',
15+
fields: {
16+
job: getJobTC(sc, opts),
17+
queue: getQueueTC(sc, opts).NonNull,
18+
jobId: 'String!',
19+
queueName: 'String!',
20+
progress: 'Int',
21+
},
22+
}),
23+
args: {
24+
prefix: {
25+
type: 'String!',
26+
defaultValue: 'bull',
27+
},
28+
queueName: 'String!',
29+
},
30+
resolve: async ({ prefix, queueName, jobId, data }) => {
31+
const queue = getQueue(prefix, queueName, opts);
32+
const job = await queue.getJob(jobId);
33+
return {
34+
job,
35+
queue,
36+
jobId,
37+
queueName,
38+
progress: parseInt(data),
39+
};
40+
},
41+
subscribe: (_, { prefix, queueName }) => {
42+
return getAsyncIterator(prefix, queueName, 'progress', opts);
43+
},
44+
};
45+
}

0 commit comments

Comments
 (0)