Skip to content

Commit 974a014

Browse files
committed
add ignoreOldEvents and maxAge
1 parent c0d1e2a commit 974a014

File tree

2 files changed

+75
-40
lines changed

2 files changed

+75
-40
lines changed

src/createFunctions.js

Lines changed: 51 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ const parseMessage = require('./pubSub/parseMessage');
55
const withIdempotency = require('./ensureIdempotent');
66
const keepFunctionAlive = require('./keepFunctionAlive');
77
const configStore = require('./configStore');
8+
const ignoreOldEvents = require('./ignoreOldEvents');
89

910
const fromEntries = [(acc, [key, value]) => ({ ...acc, [key]: value }), {}];
1011

@@ -57,21 +58,24 @@ const setupKeepAlive = (config, service) =>
5758
}
5859
: {};
5960

60-
const setupSchedule = (config, service) => ({
61-
time,
62-
name,
63-
function: toExecute,
64-
timeZone = 'America/New_York',
65-
runtimeOptions,
66-
}) => [
67-
[`${service.basePath}_${name}`],
68-
functions
69-
.region(config.region)
70-
.runWith(runtimeOptions || service.runtimeOptions || {})
71-
.pubsub.schedule(time)
72-
.timeZone(timeZone)
73-
.onRun(toExecute),
74-
];
61+
const setupSchedule =
62+
(config, service) =>
63+
({
64+
time,
65+
name,
66+
function: toExecute,
67+
timeZone = 'America/New_York',
68+
runtimeOptions,
69+
}) =>
70+
[
71+
[`${service.basePath}_${name}`],
72+
functions
73+
.region(config.region)
74+
.runWith(runtimeOptions || service.runtimeOptions || {})
75+
.pubsub.schedule(time)
76+
.timeZone(timeZone)
77+
.onRun(toExecute),
78+
];
7579

7680
const setupSchedules = (config, service) =>
7781
Array.isArray(service.schedule)
@@ -80,30 +84,37 @@ const setupSchedules = (config, service) =>
8084
.reduce(...fromEntries)
8185
: {};
8286

83-
const setupEvent = (config, service) => ({
84-
topic,
85-
type = '',
86-
function: toExecute,
87-
ensureIdempotent = false,
88-
runtimeOptions,
89-
}) => {
90-
const functionName = `${service.basePath}_${topic}${type ? `_${type}` : ''}`;
91-
92-
return [
93-
functionName,
94-
functions
95-
.region(config.region)
96-
.runWith(runtimeOptions || service.runtimeOptions || {})
97-
.pubsub.topic(topic)
98-
.onPublish(
99-
parseMessage(
100-
ensureIdempotent
101-
? withIdempotency(functionName, toExecute)
102-
: toExecute
103-
)
104-
),
105-
];
106-
};
87+
const setupEvent =
88+
(config, service) =>
89+
({
90+
topic,
91+
type = '',
92+
function: toExecute,
93+
ensureIdempotent = false,
94+
maxAge,
95+
runtimeOptions,
96+
}) => {
97+
const functionName = `${service.basePath}_${topic}${
98+
type ? `_${type}` : ''
99+
}`;
100+
101+
const handlerWithIdempotency = ensureIdempotent
102+
? withIdempotency(functionName, toExecute)
103+
: toExecute;
104+
105+
const handler = Boolean(maxAge)
106+
? ignoreOldEvents(maxAge, handlerWithIdempotency)
107+
: handlerWithIdempotency;
108+
109+
return [
110+
functionName,
111+
functions
112+
.region(config.region)
113+
.runWith(runtimeOptions || service.runtimeOptions || {})
114+
.pubsub.topic(topic)
115+
.onPublish(parseMessage(handler)),
116+
];
117+
};
107118

108119
const setupEvents = (config, service) =>
109120
Array.isArray(service.events)
@@ -122,7 +133,7 @@ const createFunctions = (config = {}, services) => {
122133
configStore.config = { ...configStore.config, ...config };
123134

124135
return services
125-
.map(service => parseConfig(configStore.config, service))
136+
.map((service) => parseConfig(configStore.config, service))
126137
.reduce(...flattenObjects);
127138
};
128139

src/ignoreOldEvents.js

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/**
2+
* If an event continues to retry for a long period of time
3+
* there may be a reason to just drop it and stop retrying
4+
* This util, when applied, will bail out of a handler if
5+
* it is greater than the maxAge set.
6+
*/
7+
8+
function setupIgnoreOldEvents(maxAge, handler) {
9+
return function ignoreOldEvents(...args) {
10+
const [_message, context] = args;
11+
const eventAgeMs = Date.now() - Date.parse(context.timestamp);
12+
13+
if (eventAgeMs > maxAge) {
14+
console.log(
15+
`Dropping event ${context.eventId} with age[ms]: ${eventAgeMs}`
16+
);
17+
return true;
18+
}
19+
20+
return handler(...args);
21+
};
22+
}
23+
24+
module.exports = setupIgnoreOldEvents;

0 commit comments

Comments
 (0)