Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions lib/event_processor/event_store.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ describe('EventStore', () => {
expect(await store.getKeys()).toEqual([]);
});

it('should resave events without expiresAt on get', async () => {
it('should resave events without time information on get', async () => {
const ttl = 120_000;
const { mockStore, store } = getEventStore({ ttl });

Expand All @@ -218,12 +218,12 @@ describe('EventStore', () => {
return originalSet(key, value);
}

// Simulate old stored event without expiresAt
const eventWithoutExpiresAt: StoredEvent = {
// Simulate old stored event without time info
const eventWithoutTime: StoredEvent = {
id: value.id,
event: value.event,
};
return originalSet(key, eventWithoutExpiresAt);
return originalSet(key, eventWithoutTime);
});

await store.set('test', event);
Expand All @@ -237,8 +237,10 @@ describe('EventStore', () => {

const secondCall = setSpy.mock.calls[1];

expect(secondCall[1].expiresAt).toBeDefined();
expect(secondCall[1].expiresAt!).toBeGreaterThanOrEqual(Date.now() + ttl - 10);
expect(secondCall[1]._time).toBeDefined();
expect(secondCall[1]._time?.storedAt).toBeLessThanOrEqual(Date.now());
expect(secondCall[1]._time?.storedAt).toBeGreaterThanOrEqual(Date.now() - 10);
expect(secondCall[1]._time?.ttl).toBe(ttl);
});

it('should store event when key expires after store being full', async () => {
Expand Down Expand Up @@ -317,7 +319,7 @@ describe('EventStore', () => {
await expect(store.getKeys()).resolves.toEqual(['key-2']);
});

it('should resave events without expiresAt during getBatched', async () => {
it('should resave events without time information during getBatched', async () => {
const ttl = 120_000;
const { mockStore, store } = getEventStore({ ttl });
const event: EventWithId = { id: '1', event: createImpressionEvent('test') };
Expand All @@ -330,12 +332,12 @@ describe('EventStore', () => {
return originalSet(key, value);
}

// Simulate old stored event without expiresAt
const eventWithoutExpiresAt: StoredEvent = {
// Simulate old stored event without time information
const eventWithoutTime: StoredEvent = {
id: value.id,
event: value.event,
};
return originalSet(key, eventWithoutExpiresAt);
return originalSet(key, eventWithoutTime);
});

await store.set('key-1', event);
Expand All @@ -352,8 +354,10 @@ describe('EventStore', () => {

const secondCall = setSpy.mock.calls[1];

expect(secondCall[1].expiresAt).toBeDefined();
expect(secondCall[1].expiresAt!).toBeGreaterThanOrEqual(Date.now() + ttl - 10);
expect(secondCall[1]._time).toBeDefined();
expect(secondCall[1]._time?.storedAt).toBeLessThanOrEqual(Date.now());
expect(secondCall[1]._time?.storedAt).toBeGreaterThanOrEqual(Date.now() - 10);
expect(secondCall[1]._time?.ttl).toBe(ttl);
});

it('should store event when keys expire during getBatched after store being full', async () => {
Expand Down
17 changes: 12 additions & 5 deletions lib/event_processor/event_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ import { Maybe } from "../utils/type";
import { EventWithId } from "./batch_event_processor";

export type StoredEvent = EventWithId & {
expiresAt?: number;
_time?: {
storedAt: number;
ttl: number;
};
};

const identity = <T>(v: T): T => v;
Expand Down Expand Up @@ -97,7 +100,7 @@ export class EventStore extends AsyncStoreWithBatchedGet<EventWithId> implements
// still keep the stored event count below maxSize (it will underfill the store).
// next getKeys() should fix the discrepency.
this.keys?.add(key);
return this.store.set(key, { ...event, expiresAt: Date.now() + this.ttl });
return this.store.set(key, { ...event, _time: { storedAt: Date.now(), ttl: this.ttl } });
}

private processStoredEvent(key: string, value: StoredEvent | undefined): Maybe<EventWithId> {
Expand All @@ -107,13 +110,17 @@ export class EventStore extends AsyncStoreWithBatchedGet<EventWithId> implements
// they will not have the storedAt time, update them with the current time
// before returning

if (value.expiresAt === undefined) {
value.expiresAt = Date.now() + this.ttl;
if (value._time === undefined) {
value._time = { storedAt: Date.now(), ttl: this.ttl };
this.set(key, value).catch(() => {});
return value;
}

if (value.expiresAt <= Date.now()) {
// use the ttl of the current store even if the stored event has a different ttl
// this ensures that if the store ttl is reduced, old events will also expire sooner
// and if the store ttl is increased, old events will stay longer
// the ttl at the time of save is still stored with the event for potential future use
if (value._time.storedAt + this.ttl <= Date.now()) {
this.remove(key).catch(() => {});
return undefined;
}
Expand Down
Loading