Skip to content
This repository was archived by the owner on Oct 22, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions packages/core/src/drivers/file-system/actor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@ export class FileSystemActorDriver implements ActorDriver {
}

async readPersistedData(actorId: string): Promise<Uint8Array | undefined> {
console.log("reading data", this.#state.readPersistedData(actorId));
return this.#state.readPersistedData(actorId);
}

async writePersistedData(actorId: string, data: Uint8Array): Promise<void> {
console.log("writing data", data);
this.#state.writePersistedData(actorId, data);

// Save state to disk
Expand Down
140 changes: 43 additions & 97 deletions packages/core/src/drivers/file-system/global-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,48 +38,23 @@ export class FileSystemGlobalState {
// Ensure storage directories exist synchronously during initialization
ensureDirectoryExistsSync(getActorsDir(this.#storagePath));

// Load all actors into cache synchronously
this.#loadAllActorsIntoCache();

logger().info("file system loaded", {
dir: this.#storagePath,
actorCount: this.#stateCache.size,
});
}

/**
* Load all actors into the state cache from the file system
* Only called once during initialization
*/
#loadAllActorsIntoCache(): void {
const actorsDir = getActorsDir(this.#storagePath);

let actorCount = 0;

try {
// HACK: Use synchronous filesystem operations for initialization
const actorIds = fsSync.readdirSync(actorsDir);

for (const actorId of actorIds) {
const stateFilePath = getActorDataPath(this.#storagePath, actorId);

if (fsSync.existsSync(stateFilePath)) {
try {
const stateData = fsSync.readFileSync(stateFilePath);
const state = cbor.decode(stateData) as ActorState;

this.#stateCache.set(actorId, state);
} catch (error) {
logger().error(
"failed to read actor state during cache initialization",
{ actorId, error },
);
}
}
}
actorCount = actorIds.length;
} catch (error) {
logger().error("failed to load actors into cache", { error });
logger().error("failed to count actors", { error });
}

logger().info("file system loaded", {
dir: this.#storagePath,
actorCount,
});
}


/**
* Get the current storage directory path
*/
Expand All @@ -88,16 +63,34 @@ export class FileSystemGlobalState {
}

/**
* Load actor state from cache
* Load actor state from cache or disk (lazy loading)
*/
loadActorState(actorId: string): ActorState {
this.ensureActorExists(actorId);

// Get actor state from cache
// Check if already in cache
const cachedActor = this.#stateCache.get(actorId);
invariant(cachedActor, `actor state should exist in cache for ${actorId}`);
if (cachedActor) {
return cachedActor;
}

return cachedActor;
// Try to load from disk
const stateFilePath = getActorDataPath(this.#storagePath, actorId);

if (!fsSync.existsSync(stateFilePath)) {
throw new Error(`Actor does not exist for ID: ${actorId}`);
}

try {
const stateData = fsSync.readFileSync(stateFilePath);
const state = cbor.decode(stateData) as ActorState;

// Cache the loaded state
this.#stateCache.set(actorId, state);

return state;
} catch (error) {
logger().error("failed to load actor state", { actorId, error });
throw new Error(`Failed to load actor state: ${error}`);
}
}

/**
Expand Down Expand Up @@ -135,29 +128,27 @@ export class FileSystemGlobalState {
// Write data
const serializedState = cbor.encode(state);
await fs.writeFile(dataPath, serializedState);
console.log("saving state", dataPath);
} catch (error) {
logger().error("failed to save actor state", { actorId, error });
throw new Error(`Failed to save actor state: ${error}`);
}
}

/**
* Check if a actor exists in the cache
* Check if a actor exists in cache or on disk
*/
hasActor(actorId: string): boolean {
return this.#stateCache.has(actorId);
}

/**
* Ensure a actor exists, throwing if it doesn't
*/
ensureActorExists(actorId: string): void {
if (!this.hasActor(actorId)) {
throw new Error(`Actor does not exist for ID: ${actorId}`);
// Check cache first
if (this.#stateCache.has(actorId)) {
return true;
}

// Check if file exists on disk
const stateFilePath = getActorDataPath(this.#storagePath, actorId);
return fsSync.existsSync(stateFilePath);
}


/**
* Create a actor
*/
Expand Down Expand Up @@ -186,49 +177,4 @@ export class FileSystemGlobalState {
// Save to disk
await this.saveActorState(actorId);
}

/**
* Find actor by name and key
*/
findActorByNameAndKey(name: string, key: ActorKey): ActorState | undefined {
// NOTE: This is a slow implementation that checks each actor individually.
// This can be optimized with an index in the future.

return this.findActor((actor) => {
if (actor.name !== name) return false;

// If actor doesn't have a key, it's not a match
if (!actor.key || actor.key.length !== key.length) {
return false;
}

// Check if all elements in key are in actor.key
for (let i = 0; i < key.length; i++) {
if (key[i] !== actor.key[i]) {
return false;
}
}
return true;
});
}

/**
* Find actor by filter function
*/
findActor(filter: (actor: ActorState) => boolean): ActorState | undefined {
for (const actor of this.#stateCache.values()) {
if (filter(actor)) {
return actor;
}
}
return undefined;
}

/**
* Get all actors from the cache
*/
getAllActors(): ActorState[] {
// Return all actors from the cache
return Array.from(this.#stateCache.values());
}
}
24 changes: 13 additions & 11 deletions packages/core/src/drivers/file-system/manager.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import * as crypto from "node:crypto";
import type {
GetOrCreateWithKeyInput,
GetForIdInput,
Expand All @@ -12,6 +11,7 @@ import { logger } from "./log";
import type { FileSystemGlobalState } from "./global-state";
import { ActorState } from "./global-state";
import type { Registry } from "@/registry/mod";
import { generateActorId } from "./utils";

export class FileSystemManagerDriver implements ManagerDriver {
#state: FileSystemGlobalState;
Expand Down Expand Up @@ -52,14 +52,15 @@ export class FileSystemManagerDriver implements ManagerDriver {
name,
key,
}: GetWithKeyInput): Promise<ActorOutput | undefined> {
// Search through all actors to find a match
const actor = this.#state.findActorByNameAndKey(name, key);

if (actor) {
// Generate the deterministic actor ID
const actorId = generateActorId(name, key);

// Check if actor exists
if (this.#state.hasActor(actorId)) {
return {
actorId: actor.id,
actorId,
name,
key: actor.key,
key,
};
}

Expand All @@ -79,13 +80,14 @@ export class FileSystemManagerDriver implements ManagerDriver {
}

async createActor({ name, key, input }: CreateInput): Promise<ActorOutput> {
// Check if actor with the same name and key already exists
const existingActor = await this.getWithKey({ name, key });
if (existingActor) {
// Generate the deterministic actor ID
const actorId = generateActorId(name, key);

// Check if actor already exists
if (this.#state.hasActor(actorId)) {
throw new ActorAlreadyExists(name, key);
}

const actorId = crypto.randomUUID();
await this.#state.createActor(actorId, name, key, input);

// Notify inspector about actor changes
Expand Down
18 changes: 18 additions & 0 deletions packages/core/src/drivers/file-system/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,24 @@ import * as fsSync from "node:fs";
import * as path from "node:path";
import * as crypto from "node:crypto";
import * as os from "node:os";
import type { ActorKey } from "@/actor/mod";

/**
* Generate a deterministic actor ID from name and key
*/
export function generateActorId(name: string, key: ActorKey): string {
// Generate deterministic key string
const jsonString = JSON.stringify([name, key]);

// Hash to ensure safe file system names
const hash = crypto
.createHash("sha256")
.update(jsonString)
.digest("hex")
.substring(0, 16);

return hash;
}

/**
* Create a hash for a path, normalizing it first
Expand Down
Loading