diff --git a/src/configStore.js b/src/configStore.js index 1ca032f..713c786 100644 --- a/src/configStore.js +++ b/src/configStore.js @@ -12,7 +12,7 @@ const defaultConfig = { region: 'us-central1', validatePrivilege: defaultValidatePrivilege, validateVisibility: defaultValidateVisibility, - unwrapResponse: defaultUnwrapResponse, + unwrapResponse: defaultUnwrapResponse, middleware: [], corsEnabled: true, corsOptions: defaultCorsOptions, diff --git a/src/index.js b/src/index.js index a9500a5..3d4aef3 100644 --- a/src/index.js +++ b/src/index.js @@ -7,6 +7,8 @@ const applyModifiers = require('./validateFields/applyModifiers') const iFC = require('./iFC.js'); const publish = require('./pubSub/publish'); +module.exports = require('./v2'); + module.exports.default = createFunctions; module.exports.createFunctions = createFunctions; module.exports.generateTypes = generateTypes; diff --git a/src/v2/createFunctionsV2.js b/src/v2/createFunctionsV2.js new file mode 100644 index 0000000..aed0a3f --- /dev/null +++ b/src/v2/createFunctionsV2.js @@ -0,0 +1,160 @@ +const { onMessagePublished } = require('firebase-functions/v2/pubsub'); +const { onSchedule } = require('firebase-functions/v2/scheduler'); +const { onRequest } = require('firebase-functions/v2/https'); +const { onDocumentDeleted, onDocumentUpdated, onDocumentCreated } = require('firebase-functions/v2/firestore'); +const parseRoutes = require('../parseRoutes'); +const withIdempotencyV2 = require('../ensureIdempotent'); +const keepFunctionAlive = require('../keepFunctionAlive'); +const configStore = require('../configStore'); +const publishChangesV2 = require('./publishChangesV2'); +const parseMessageV2 = require('./pubSub/parseMessageV2'); +const ignoreOldEventsV2 = require('./ignoreOldEventsV2'); + +const fromEntries = [(acc, [key, value]) => ({ ...acc, [key]: value }), {}]; + +const flattenObjects = [(acc, el) => ({ ...acc, ...el }), {}]; + +const setupRoutesV2 = (config, service) => + Array.isArray(service.routes) + ? { + [service.basePath]: onRequest( + { + region: config.region, + ...(config.runtimeOptions || service.runtimeOptions), + }, + parseRoutes(config, service) + ), + } + : {}; + +const setupOnCreateV2 = (config, service) => + onDocumentCreated( + { + region: config.region, + document: service.resourcePath, + }, + publishChangesV2(service.basePath) + ); + +const setupOnUpdateV2 = (config, service) => + onDocumentUpdated( + { + region: config.region, + document: service.resourcePath, + }, + publishChangesV2(service.basePath) + ); + +const setupOnDeleteV2 = (config, service) => + onDocumentDeleted( + { + region: config.region, + document: service.resourcePath, + }, + publishChangesV2(service.basePath) + ); + +const setupDBTriggersV2 = (config, service) => + service.publishChanges + ? { + [`${service.basePath}_onCreate`]: setupOnCreateV2(config, service), + [`${service.basePath}_onUpdate`]: setupOnUpdateV2(config, service), + [`${service.basePath}_onDelete`]: setupOnDeleteV2(config, service), + } + : {}; + +const setupKeepAliveV2 = (config, service) => + service.keepAlive + ? { + [`${service.basePath}_keep_alive`]: onSchedule( + { region: config.region, schedule: "every 5 minutes" }, + keepFunctionAlive(service) + ), + } + : {}; + +const setupScheduleV2 = + (config, service) => + ({ + time, + name, + function: toExecute, + timeZone = "America/New_York", + runtimeOptions, + }) => + [ + [`${service.basePath}_${name}`], + onSchedule( + { + schedule: time, + region: config.region, + timeZone, + ...(runtimeOptions || service.runtimeOptions), + }, + toExecute + ), + ]; + +const setupSchedulesV2 = (config, service) => + Array.isArray(service.schedule) + ? service.schedule + .map(setupScheduleV2(config, service)) + .reduce(...fromEntries) + : {}; + +const setupEventV2 = + (config, service) => + ({ + topic, + type = "", + function: toExecute, + ensureIdempotent = false, + maxAge, + runtimeOptions, + }) => { + const functionName = `${service.basePath}_${topic}${type ? `_${type}` : "" + }`; + + const handlerWithIdempotency = ensureIdempotent + ? withIdempotencyV2(functionName, toExecute) + : toExecute; + + const handler = maxAge + ? ignoreOldEventsV2(maxAge, handlerWithIdempotency) + : handlerWithIdempotency; + + return [ + functionName, + onMessagePublished( + { + topic, + region: config.region, + ...(runtimeOptions || service.runtimeOptions), + }, + parseMessageV2(handler) + ), + ]; + }; + +const setupEventsV2 = (config, service) => + Array.isArray(service.events) + ? service.events.map(setupEventV2(config, service)).reduce(...fromEntries) + : {}; + +const parseConfigV2 = (config, service) => ({ + ...setupRoutesV2(config, service), + ...setupSchedulesV2(config, service), + ...setupDBTriggersV2(config, service), + ...setupEventsV2(config, service), + ...setupKeepAliveV2(config, service), +}); + +const createFunctionsV2 = (config = {}, services) => { + configStore.config = { ...configStore.config, ...config }; + + return services + .map((service) => parseConfigV2(configStore.config, service)) + .reduce(...flattenObjects); +}; + +module.exports = createFunctionsV2; diff --git a/src/v2/ignoreOldEventsV2.js b/src/v2/ignoreOldEventsV2.js new file mode 100644 index 0000000..5fcf4da --- /dev/null +++ b/src/v2/ignoreOldEventsV2.js @@ -0,0 +1,25 @@ +/** + * If an event continues to retry for a long period of time + * there may be a reason to just drop it and stop retrying + * This util, when applied, will bail out of a handler if + * it is greater than the maxAge set. + */ + +function setupIgnoreOldEvents(maxAge, handler) { + return function ignoreOldEvents(...args) { + const [message, context] = args; + const {time} = message + const eventAgeMs = Date.now() - Date.parse(time); + + if (eventAgeMs > maxAge) { + console.log( + `Dropping event ${context.eventId} with age[ms]: ${eventAgeMs}` + ); + return true; + } + + return handler(...args); + }; +} + +module.exports = setupIgnoreOldEvents; diff --git a/src/v2/index.js b/src/v2/index.js new file mode 100644 index 0000000..931a8c3 --- /dev/null +++ b/src/v2/index.js @@ -0,0 +1,6 @@ +const createFunctionsV2 = require('./createFunctionsV2'); +const parseMessageV2 = require('./pubSub/parseMessageV2'); + +module.exports.default = createFunctionsV2; +module.exports.createFunctionsV2 = createFunctionsV2; +module.exports.parseMessageV2 = parseMessageV2; diff --git a/src/v2/pubSub/parseMessageV2.js b/src/v2/pubSub/parseMessageV2.js new file mode 100644 index 0000000..f6240cb --- /dev/null +++ b/src/v2/pubSub/parseMessageV2.js @@ -0,0 +1,18 @@ +const parseMessageV2 = callback => (event) => { + const { data } = event; + const {message} = data; + + const newMessage = message.data + ? Buffer.from(message.data, 'base64').toString() + : null; + + if (!newMessage) { + throw new Error('message cannot be empty'); + } + + const parsedMessage = JSON.parse(newMessage); + + return callback(parsedMessage, event); +}; + +module.exports = parseMessageV2; diff --git a/src/v2/publishChangesV2.js b/src/v2/publishChangesV2.js new file mode 100644 index 0000000..3c8b746 --- /dev/null +++ b/src/v2/publishChangesV2.js @@ -0,0 +1,29 @@ +const { PubSub } = require('@google-cloud/pubsub'); + +const pubsub = new PubSub(); + +module.exports = topic => async (event) => { + const { type, data } = event; + const childType = type.split('google.firebase.database.').pop(); + + let eventData; + + if (childType === 'ref.v1.created') { + eventData = data.data(); + } else if (childType === 'ref.v1.updated') { + eventData = data.after.data(); + } else if (childType === 'ref.v1.deleted') { + eventData = data.data(); + } + + const newData = JSON.stringify({ + type: childType, + data: eventData, + ...(type === 'ref.v1.updated' ? { dataBefore: data.before.data() } : {}), + changeContext: event, + }); + + const dataBuffer = Buffer.from(newData); + + return pubsub.topic(topic).publish(dataBuffer); +};