Skip to content

Commit 2496821

Browse files
Boris Dorofeevnodkz
authored andcommitted
feat: add async iterator
1 parent 6cd1b55 commit 2496821

File tree

9 files changed

+199
-4
lines changed

9 files changed

+199
-4
lines changed

example/src/schema.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { composeBull } from '../../src';
22
import { schemaComposer } from 'graphql-compose';
33
import { createBullConnection } from './connectRedis';
44

5-
const { queryFields, mutationFields } = composeBull({
5+
const { queryFields, mutationFields, subscriptionFields } = composeBull({
66
schemaComposer,
77
typePrefix: 'Prefix',
88
jobDataTC: `type MyJobData { fieldA: String! fieldB: String}`,
@@ -21,4 +21,8 @@ schemaComposer.Mutation.addFields({
2121
...mutationFields,
2222
});
2323

24+
schemaComposer.Subscription.addFields({
25+
...subscriptionFields,
26+
});
27+
2428
export default schemaComposer.buildSchema();

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
},
2626
"homepage": "https://github.com/graphql-compose/graphql-compose-bullmq",
2727
"peerDependencies": {
28-
"graphql-compose": "^7.15.0",
29-
"bullmq": "^1.8.11"
28+
"bullmq": "^1.8.11",
29+
"graphql-compose": "^7.15.0"
3030
},
3131
"devDependencies": {
3232
"@types/express": "^4.17.3",

src/composeBull.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,15 @@ import {
2929
createJobMoveToDelayedFC,
3030
createQueuePepUpFC,
3131
} from './mutation';
32-
import { wrapMutationFC, wrapQueueArgs, composeFC } from './helpers';
32+
import { createJobAddSubFC } from './subscriptions';
33+
import { wrapMutationFC, wrapQueueArgs, wrapQueueSubsArgs, composeFC } from './helpers';
3334

3435
export function composeBull(opts: Options & { schemaComposer?: SchemaComposer<any> }) {
3536
const sc = opts?.schemaComposer || schemaComposer;
3637

3738
const wrapQuery = composeFC(sc, opts)(wrapQueueArgs);
3839
const wrapMutation = composeFC(sc, opts)(wrapMutationFC, wrapQueueArgs);
40+
const wrapSubscription = composeFC(sc, opts)(wrapQueueSubsArgs);
3941

4042
return {
4143
QueueTC: getQueueTC(sc, opts),
@@ -68,5 +70,8 @@ export function composeBull(opts: Options & { schemaComposer?: SchemaComposer<an
6870
jobMoveToDelayed: wrapMutation(createJobMoveToDelayedFC),
6971
queuePepUp: wrapMutation(createQueuePepUpFC),
7072
},
73+
subscriptionFields: {
74+
jobAddSub: wrapSubscription(createJobAddSubFC),
75+
},
7176
};
7277
}

src/helpers/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ export * from './MutationError';
44
export * from './normalizePrefixGlob';
55
export * from './wrapMutationFC';
66
export * from './wrapQueueArgs';
7+
export * from './wrapQueueSubsArgs';
78
export * from './composeFC';
89
export * from './deleteKeys';
910
export * from './fixDelayStream';
1011
export * from './memoryUsage';
12+
export * from './queueEventsListen';

src/helpers/queueEventsListen.ts

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
import { QueueEvents } from 'bullmq';
2+
import { Options } from '../definitions';
3+
4+
export function getAsyncIterator(
5+
prefix: string,
6+
queueName: string,
7+
eventName: string,
8+
opts: Options
9+
) {
10+
const queueEvents = getQueueEvents(prefix, queueName, opts);
11+
return createAsyncIterator(queueEvents, prefix, queueName, eventName);
12+
}
13+
14+
const queueEventsMap = new Map();
15+
16+
function getQueueEvents(prefix: string, queueName: string, opts: Options): QueueEvents {
17+
const fullName = [prefix, queueName].join('.');
18+
19+
if (queueEventsMap.has(fullName)) {
20+
return queueEventsMap.get(fullName);
21+
}
22+
23+
const queueEvents = new QueueEvents(queueName, {
24+
prefix,
25+
//connection: new Redis(),
26+
});
27+
28+
queueEventsMap.set(fullName, queueEvents);
29+
30+
return queueEvents;
31+
}
32+
33+
function createAsyncIterator<T = any>(
34+
queueEvents: QueueEvents,
35+
prefix: string,
36+
queueName: string,
37+
eventName: string
38+
): AsyncIterator<T> {
39+
const pullSeries: any = [];
40+
const pushSeries: any = [];
41+
let listening = true;
42+
43+
const pushValue = async (event) => {
44+
if (pullSeries.length !== 0) {
45+
const resolver = pullSeries.shift();
46+
resolver(event);
47+
} else {
48+
pushSeries.push(event);
49+
}
50+
};
51+
52+
const pullValue = () => {
53+
return new Promise((resolve) => {
54+
if (pushSeries.length !== 0) {
55+
const value = pushSeries.shift();
56+
resolve({ value, done: false });
57+
} else {
58+
pullSeries.push(resolve);
59+
}
60+
});
61+
};
62+
63+
const handler = (event) => {
64+
pushValue({ prefix, queueName, ...event });
65+
};
66+
67+
queueEvents.on(eventName, handler);
68+
69+
function release() {
70+
if (listening) {
71+
listening = false;
72+
queueEvents.removeListener(eventName, handler);
73+
for (const resolve of pullSeries) {
74+
resolve({ value: undefined, done: true });
75+
}
76+
pullSeries.length = 0;
77+
pushSeries.length = 0;
78+
}
79+
}
80+
81+
const returnProp = function () {
82+
release();
83+
return Promise.resolve({ value: undefined, done: true });
84+
};
85+
86+
return {
87+
[Symbol.asyncIterator]() {
88+
return this;
89+
},
90+
return: returnProp,
91+
next() {
92+
return listening ? pullValue() : returnProp();
93+
},
94+
throw(error) {
95+
release();
96+
return Promise.reject(error);
97+
},
98+
} as AsyncIterator<T>;
99+
}

src/helpers/wrapQueueSubsArgs.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import { Options } from '../definitions';
2+
import { ObjectTypeComposerFieldConfigAsObjectDefinition, SchemaComposer } from 'graphql-compose';
3+
4+
export function wrapQueueSubsArgs(
5+
fieldConfig: ObjectTypeComposerFieldConfigAsObjectDefinition<any, any>,
6+
sc: SchemaComposer<any>,
7+
opts: Options
8+
): ObjectTypeComposerFieldConfigAsObjectDefinition<any, any> {
9+
// remove args if they provided via config
10+
if (opts?.queue?.name && fieldConfig.args?.queueName) {
11+
delete fieldConfig.args.queueName;
12+
}
13+
if (opts?.queue?.prefix && fieldConfig.args?.prefix) {
14+
delete fieldConfig.args.prefix;
15+
}
16+
17+
// pass config props to sub resolve issue
18+
if (opts?.queue?.name || opts?.queue?.prefix) {
19+
const predifinedArgs = {} as Record<string, any>;
20+
if (opts?.queue?.name) predifinedArgs.queueName = opts.queue?.name;
21+
if (opts?.queue?.prefix) predifinedArgs.prefix = opts.queue?.prefix;
22+
23+
const subSubscribe = fieldConfig.resolve || (() => ({}));
24+
fieldConfig.subscribe = async (source, args, context, info) => {
25+
return subSubscribe(source, { ...predifinedArgs, ...args }, context, info);
26+
};
27+
}
28+
29+
return fieldConfig;
30+
}

src/index.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,24 @@
11
export * from './composeBull';
2+
3+
import { getAsyncIterator } from './helpers';
4+
5+
const asyncIter = getAsyncIterator('bull.demo', 'fetch_metrics', 'completed', {
6+
typePrefix: 'Prefix',
7+
});
8+
9+
async function testAsyncIt() {
10+
let nn = 3;
11+
while (true) {
12+
const res = await asyncIter.next();
13+
console.log(res);
14+
15+
nn--;
16+
17+
if (nn == 0) {
18+
await asyncIter.return();
19+
break;
20+
}
21+
}
22+
}
23+
24+
testAsyncIt();

src/subscriptions/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export { createJobAddSubFC } from './jobAddSub';

src/subscriptions/jobAddSub.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import { SchemaComposer, ObjectTypeComposerFieldConfigAsObjectDefinition } from 'graphql-compose';
2+
import { getJobTC } from '../types/job/Job';
3+
import { getQueue } from '../helpers';
4+
import { Options } from '../definitions';
5+
import { getAsyncIterator } from '../helpers';
6+
7+
export function createJobAddSubFC(
8+
sc: SchemaComposer<any>,
9+
opts: Options
10+
): ObjectTypeComposerFieldConfigAsObjectDefinition<any, any> {
11+
return {
12+
type: getJobTC(sc, opts),
13+
args: {
14+
prefix: {
15+
type: 'String!',
16+
defaultValue: 'bull',
17+
},
18+
queueName: 'String!',
19+
},
20+
resolve: async ({ prefix, queueName, jobId }) => {
21+
const queue = getQueue(prefix, queueName, opts);
22+
if (!queue) return null;
23+
const job = await queue.getJob(jobId);
24+
if (!job) return null;
25+
return job;
26+
},
27+
subscribe: (_, { prefix, queueName }) => {
28+
return getAsyncIterator(prefix, queueName, 'waiting', opts);
29+
},
30+
};
31+
}

0 commit comments

Comments
 (0)