Skip to content
This repository was archived by the owner on Oct 22, 2025. It is now read-only.

Commit 2d2a9d0

Browse files
committed
Implement redis alarms using keyspace notifications
1 parent 203169c commit 2d2a9d0

File tree

2 files changed

+65
-3
lines changed

2 files changed

+65
-3
lines changed

packages/drivers/redis/src/actor.ts

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,45 @@
11
import type { ActorDriver, KvKey, KvValue } from "actor-core/driver-helpers";
22
import type Redis from "ioredis";
33
import { KEYS } from "./keys";
4-
import { AnyActorInstance } from "actor-core/driver-helpers";
4+
import type { AnyActorInstance } from "actor-core/driver-helpers";
55

66
export interface ActorDriverContext {
77
redis: Redis;
88
}
99

1010
export class RedisActorDriver implements ActorDriver {
1111
#redis: Redis;
12+
#subscriptionRedis: Redis | null = null;
13+
#alarmCallbacks = new Map<string, { actor: AnyActorInstance; timestamp: number }>();
1214

1315
constructor(redis: Redis) {
1416
this.#redis = redis;
17+
18+
// Create a separate connection for subscriptions since a subscribed connection
19+
// cannot be used for other commands
20+
this.#subscriptionRedis = redis.duplicate();
21+
22+
// Subscribe to expired events
23+
this.#subscriptionRedis.config('SET', 'notify-keyspace-events', 'Ex');
24+
this.#subscriptionRedis.subscribe('__keyevent@0__:expired');
25+
26+
// Handle expired events
27+
this.#subscriptionRedis.on('message', async (_channel, key) => {
28+
// Extract actor ID from the key
29+
const match = key.match(/^actor:(.+):alarm$/);
30+
if (!match) return;
31+
32+
const actorId = match[1];
33+
const callback = this.#alarmCallbacks.get(actorId);
34+
if (callback) {
35+
// Verify this is still the current alarm before triggering
36+
const currentAlarm = await this.getAlarm(callback.actor);
37+
if (currentAlarm === callback.timestamp) {
38+
await callback.actor.onAlarm();
39+
this.#alarmCallbacks.delete(actorId);
40+
}
41+
}
42+
});
1543
}
1644

1745
getContext(_actorId: string): ActorDriverContext {
@@ -52,8 +80,40 @@ export class RedisActorDriver implements ActorDriver {
5280
await this.#redis.del(key.map((k) => this.#serializeKey(actorId, k)));
5381
}
5482

55-
async setAlarm(_actor: AnyActorInstance, _timestamp: number): Promise<void> {
56-
throw new Error("Alarms are not yet implemented for this driver.");
83+
async setAlarm(actor: AnyActorInstance, timestamp: number): Promise<void> {
84+
const key = KEYS.ACTOR.alarm(actor.id);
85+
86+
// Delete any existing alarm first
87+
await this.deleteAlarm(actor);
88+
89+
const delay = timestamp - Date.now();
90+
if (delay <= 0) {
91+
// If timestamp is in the past, trigger immediately
92+
await actor.onAlarm();
93+
return;
94+
}
95+
96+
// Store both the actor instance and timestamp for callback verification
97+
this.#alarmCallbacks.set(actor.id, { actor, timestamp });
98+
99+
// Set the key with expiration
100+
await this.#redis.set(key, timestamp.toString(), 'PX', delay);
101+
}
102+
103+
async getAlarm(actor: AnyActorInstance): Promise<number | null> {
104+
const key = KEYS.ACTOR.alarm(actor.id);
105+
106+
// Get the timestamp value
107+
const value = await this.#redis.get(key);
108+
if (!value) return null;
109+
110+
return Number.parseInt(value, 10);
111+
}
112+
113+
async deleteAlarm(actor: AnyActorInstance): Promise<void> {
114+
const key = KEYS.ACTOR.alarm(actor.id);
115+
await this.#redis.del(key);
116+
this.#alarmCallbacks.delete(actor.id);
57117
}
58118

59119
#serializeKey(actorId: string, key: KvKey): string {

packages/drivers/redis/src/keys.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ export const KEYS = {
1212
tags: (actorId: string) => `actor:${actorId}:tags`,
1313
// KEY
1414
kv: (actorId: string, key: string) => `actor:${actorId}:kv:${key}`,
15+
// KEY (sorted set) = timestamp
16+
alarm: (actorId: string) => `actor:${actorId}:alarm`,
1517
},
1618
};
1719

0 commit comments

Comments
 (0)