Skip to content

Commit 59da322

Browse files
committed
feat: add redisEvents option which enables Subscriptions for listening events from queues
1 parent 2496821 commit 59da322

File tree

9 files changed

+76
-31
lines changed

9 files changed

+76
-31
lines changed

example/src/connectRedis.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ const BULL_REDIS_URI = process.env.BULL_REDIS_URI || 'redis://127.0.0.1:6379';
55

66
const redisInstances = new Map<string, Redis.Redis>();
77

8-
export function createBullConnection(type: 'queue' | 'worker' | 'scheduler' | 'custom') {
8+
export function createBullConnection(type: 'queue' | 'worker' | 'scheduler' | 'events' | 'custom') {
99
const existedClient = redisInstances.get(type);
1010
if (existedClient) {
1111
return existedClient;

example/src/schema.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ const { queryFields, mutationFields, subscriptionFields } = composeBull({
1111
prefix: 'bull.demo',
1212
},
1313
redis: createBullConnection('queue'),
14+
redisEvents: createBullConnection('events'),
1415
});
1516

1617
schemaComposer.Query.addFields({
@@ -21,8 +22,10 @@ schemaComposer.Mutation.addFields({
2122
...mutationFields,
2223
});
2324

24-
schemaComposer.Subscription.addFields({
25-
...subscriptionFields,
26-
});
25+
if (subscriptionFields) {
26+
schemaComposer.Subscription.addFields({
27+
...subscriptionFields,
28+
});
29+
}
2730

2831
export default schemaComposer.buildSchema();

src/composeBull.ts

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1-
import { schemaComposer, SchemaComposer } from 'graphql-compose';
1+
import {
2+
schemaComposer,
3+
SchemaComposer,
4+
ObjectTypeComposer,
5+
ObjectTypeComposerFieldConfigAsObjectDefinition,
6+
} from 'graphql-compose';
27
import { Options } from './definitions';
38
import { getQueueTC, getJobTC } from './types';
49
import {
@@ -29,17 +34,27 @@ import {
2934
createJobMoveToDelayedFC,
3035
createQueuePepUpFC,
3136
} from './mutation';
32-
import { createJobAddSubFC } from './subscriptions';
37+
import { createOnJobWaitingFC } from './subscriptions';
3338
import { wrapMutationFC, wrapQueueArgs, wrapQueueSubsArgs, composeFC } from './helpers';
3439

35-
export function composeBull(opts: Options & { schemaComposer?: SchemaComposer<any> }) {
40+
interface ComposeBullResult {
41+
QueueTC: ObjectTypeComposer;
42+
JobTC: ObjectTypeComposer;
43+
queryFields: Record<string, ObjectTypeComposerFieldConfigAsObjectDefinition<any, any>>;
44+
mutationFields: Record<string, ObjectTypeComposerFieldConfigAsObjectDefinition<any, any>>;
45+
subscriptionFields?: Record<string, ObjectTypeComposerFieldConfigAsObjectDefinition<any, any>>;
46+
}
47+
48+
export function composeBull(
49+
opts: Options & { schemaComposer?: SchemaComposer<any> }
50+
): ComposeBullResult {
3651
const sc = opts?.schemaComposer || schemaComposer;
3752

3853
const wrapQuery = composeFC(sc, opts)(wrapQueueArgs);
3954
const wrapMutation = composeFC(sc, opts)(wrapMutationFC, wrapQueueArgs);
4055
const wrapSubscription = composeFC(sc, opts)(wrapQueueSubsArgs);
4156

42-
return {
57+
const data = {
4358
QueueTC: getQueueTC(sc, opts),
4459
JobTC: getJobTC(sc, opts),
4560
queryFields: {
@@ -70,8 +85,13 @@ export function composeBull(opts: Options & { schemaComposer?: SchemaComposer<an
7085
jobMoveToDelayed: wrapMutation(createJobMoveToDelayedFC),
7186
queuePepUp: wrapMutation(createQueuePepUpFC),
7287
},
73-
subscriptionFields: {
74-
jobAddSub: wrapSubscription(createJobAddSubFC),
75-
},
76-
};
88+
} as ComposeBullResult;
89+
90+
if (opts?.redisEvents) {
91+
data.subscriptionFields = {
92+
onJobWaiting: wrapSubscription(createOnJobWaitingFC),
93+
};
94+
}
95+
96+
return data;
7797
}

src/definitions.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,5 @@ export type Options = {
99
prefix?: string;
1010
};
1111
redis?: RedisOptions | IORedis.Redis;
12+
redisEvents?: RedisOptions | IORedis.Redis;
1213
};

src/helpers/getBullConnection.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import Redis from 'ioredis';
33

44
const connectionMap = new Map<Options['redis'], Redis.Redis>();
55

6-
export function getBullConnection(opts: Options): Redis.Redis {
6+
export function getBullConnection(opts?: Options): Redis.Redis {
77
const optsRedis = opts?.redis;
88
let connection = connectionMap.get(optsRedis);
99
if (connection) {

src/helpers/queueEventsListen.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@ export function getAsyncIterator(
77
eventName: string,
88
opts: Options
99
) {
10-
const queueEvents = getQueueEvents(prefix, queueName, opts);
10+
const queueEvents = getQueueEventsSingleton(prefix, queueName, opts);
1111
return createAsyncIterator(queueEvents, prefix, queueName, eventName);
1212
}
1313

1414
const queueEventsMap = new Map();
1515

16-
function getQueueEvents(prefix: string, queueName: string, opts: Options): QueueEvents {
16+
function getQueueEventsSingleton(prefix: string, queueName: string, opts: Options): QueueEvents {
1717
const fullName = [prefix, queueName].join('.');
1818

1919
if (queueEventsMap.has(fullName)) {
@@ -22,7 +22,7 @@ function getQueueEvents(prefix: string, queueName: string, opts: Options): Queue
2222

2323
const queueEvents = new QueueEvents(queueName, {
2424
prefix,
25-
//connection: new Redis(),
25+
connection: opts?.redisEvents,
2626
});
2727

2828
queueEventsMap.set(fullName, queueEvents);
@@ -36,8 +36,8 @@ function createAsyncIterator<T = any>(
3636
queueName: string,
3737
eventName: string
3838
): AsyncIterator<T> {
39-
const pullSeries: any = [];
40-
const pushSeries: any = [];
39+
let pullSeries: any = [];
40+
let pushSeries: any = [];
4141
let listening = true;
4242

4343
const pushValue = async (event) => {
@@ -73,12 +73,12 @@ function createAsyncIterator<T = any>(
7373
for (const resolve of pullSeries) {
7474
resolve({ value: undefined, done: true });
7575
}
76-
pullSeries.length = 0;
77-
pushSeries.length = 0;
76+
pullSeries = [];
77+
pushSeries = [];
7878
}
7979
}
8080

81-
const returnProp = function () {
81+
const returnProp = () => {
8282
release();
8383
return Promise.resolve({ value: undefined, done: true });
8484
};

src/helpers/wrapQueueSubsArgs.ts

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,19 @@ export function wrapQueueSubsArgs(
2020
if (opts?.queue?.name) predifinedArgs.queueName = opts.queue?.name;
2121
if (opts?.queue?.prefix) predifinedArgs.prefix = opts.queue?.prefix;
2222

23-
const subSubscribe = fieldConfig.resolve || (() => ({}));
24-
fieldConfig.subscribe = async (source, args, context, info) => {
25-
return subSubscribe(source, { ...predifinedArgs, ...args }, context, info);
26-
};
23+
const subSubscribe = fieldConfig.subscribe;
24+
if (subSubscribe) {
25+
fieldConfig.subscribe = async (source, args, context, info) => {
26+
return subSubscribe(source, { ...predifinedArgs, ...args }, context, info);
27+
};
28+
}
29+
30+
const subResolve = fieldConfig.resolve;
31+
if (subResolve) {
32+
fieldConfig.resolve = async (source, args, context, info) => {
33+
return subResolve(source, { ...predifinedArgs, ...args }, context, info);
34+
};
35+
}
2736
}
2837

2938
return fieldConfig;

src/subscriptions/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
export { createJobAddSubFC } from './jobAddSub';
1+
export { createOnJobWaitingFC } from './onJobWaiting';

src/subscriptions/jobAddSub.ts renamed to src/subscriptions/onJobWaiting.ts

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,22 @@ import { getJobTC } from '../types/job/Job';
33
import { getQueue } from '../helpers';
44
import { Options } from '../definitions';
55
import { getAsyncIterator } from '../helpers';
6+
import { getQueueTC } from '../types/queue/Queue';
67

7-
export function createJobAddSubFC(
8+
export function createOnJobWaitingFC(
89
sc: SchemaComposer<any>,
910
opts: Options
1011
): ObjectTypeComposerFieldConfigAsObjectDefinition<any, any> {
1112
return {
12-
type: getJobTC(sc, opts),
13+
type: sc.createObjectTC({
14+
name: 'OnWaitingPayload',
15+
fields: {
16+
job: getJobTC(sc, opts),
17+
queue: getQueueTC(sc, opts).NonNull,
18+
jobId: 'String!',
19+
queueName: 'String!',
20+
},
21+
}),
1322
args: {
1423
prefix: {
1524
type: 'String!',
@@ -19,10 +28,13 @@ export function createJobAddSubFC(
1928
},
2029
resolve: async ({ prefix, queueName, jobId }) => {
2130
const queue = getQueue(prefix, queueName, opts);
22-
if (!queue) return null;
2331
const job = await queue.getJob(jobId);
24-
if (!job) return null;
25-
return job;
32+
return {
33+
job,
34+
queue,
35+
jobId,
36+
queueName,
37+
};
2638
},
2739
subscribe: (_, { prefix, queueName }) => {
2840
return getAsyncIterator(prefix, queueName, 'waiting', opts);

0 commit comments

Comments
 (0)