Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rivetkit-typescript/packages/rivetkit/src/client/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export const ClientConfigSchema = z.object({
getUpgradeWebSocket: z.custom<GetUpgradeWebSocket>().optional(),

/** Whether to automatically perform health checks when the client is created. */
disableHealthCheck: z.boolean().optional().default(false),
disableMetadataLookup: z.boolean().optional().default(false),
});

export type ClientConfig = z.infer<typeof ClientConfigSchema>;
Expand Down
4 changes: 2 additions & 2 deletions rivetkit-typescript/packages/rivetkit/src/registry/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ export class Registry<A extends RegistryActors> {
// a serverless runner request, so we do not know what to health check
if (config.runnerKind === "serverless") {
logger().debug("disabling health check since using serverless");
config.disableHealthCheck = true;
config.disableMetadataLookup = true;
}

// Auto-configure serverless runner if not in prod
Expand Down Expand Up @@ -289,7 +289,7 @@ async function configureServerlessRunner(config: RunnerConfig): Promise<void> {
encoding: config.encoding,
headers: config.headers,
getUpgradeWebSocket: config.getUpgradeWebSocket,
disableHealthCheck: true, // We don't need health check for this operation
disableMetadataLookup: true, // We don't need health check for this operation
};

// Fetch all datacenters
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import pRetry from "p-retry";
import type { ClientConfig } from "@/client/client";
import type { MetadataResponse } from "@/common/router";
import { stringifyError } from "@/common/utils";
import { getMetadata } from "./api-endpoints";
import { getEndpoint } from "./api-utils";
import { logger } from "./log";

// Global cache to store metadata check promises for each endpoint
const metadataLookupCache = new Map<string, Promise<MetadataResponse>>();

export async function lookupMetadataCached(
config: ClientConfig,
): Promise<MetadataResponse> {
const endpoint = getEndpoint(config);

// Check if metadata lookup is already in progress or completed for this endpoint
const existingPromise = metadataLookupCache.get(endpoint);
if (existingPromise) {
return existingPromise;
}

// Create and store the promise immediately to prevent racing requests
const metadataLookupPromise = pRetry(
async () => {
logger().debug({
msg: "fetching metadata",
endpoint,
});

const metadataData = await getMetadata(config);

logger().debug({
msg: "received metadata",
endpoint,
clientEndpoint: metadataData.clientEndpoint,
});

return metadataData;
},
{
forever: true,
minTimeout: 500,
maxTimeout: 15_000,
onFailedAttempt: (error) => {
logger().warn({
msg: "failed to fetch metadata, retrying",
endpoint,
attempt: error.attemptNumber,
error: stringifyError(error),
});
},
},
);

metadataLookupCache.set(endpoint, metadataLookupPromise);
return metadataLookupPromise;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import invariant from "invariant";
import { deserializeActorKey, serializeActorKey } from "@/actor/keys";
import { generateRandomString } from "@/actor/utils";
import type { ClientConfig } from "@/client/client";
import type { MetadataResponse } from "@/common/router";
import { noopNext, stringifyError } from "@/common/utils";
import type {
ActorOutput,
Expand Down Expand Up @@ -32,6 +33,7 @@ import {
} from "./api-endpoints";
import { EngineApiError, getEndpoint } from "./api-utils";
import { logger } from "./log";
import { lookupMetadataCached } from "./metadata";
import { createWebSocketProxy } from "./ws-proxy";

// TODO:
Expand All @@ -48,9 +50,6 @@ import { createWebSocketProxy } from "./ws-proxy";
// };
// })();

// Global cache to store metadata check promises for each endpoint
const metadataCheckCache = new Map<string, Promise<void>>();

export class RemoteManagerDriver implements ManagerDriver {
#config: ClientConfig;
#metadataPromise: Promise<void> | undefined;
Expand All @@ -63,64 +62,38 @@ export class RemoteManagerDriver implements ManagerDriver {
logger().info(
"detected next.js build phase, disabling health check",
);
runConfig.disableHealthCheck = true;
runConfig.disableMetadataLookup = true;
}

this.#config = runConfig;
// Clone config so we can mutate the endpoint in #metadataPromise
// NOTE: This is a shallow clone, so mutating nested properties will not do anything
this.#config = { ...runConfig };

// Perform metadata check if enabled
if (!runConfig.disableHealthCheck) {
this.#metadataPromise = this.#performMetadataCheck(runConfig);
this.#metadataPromise.catch((error) => {
logger().error({
msg: "metadata check failed",
error:
error instanceof Error ? error.message : String(error),
});
});
}
}

async #performMetadataCheck(config: ClientConfig): Promise<void> {
const endpoint = getEndpoint(config);

// Check if metadata check is already in progress or completed for this endpoint
const existingPromise = metadataCheckCache.get(endpoint);
if (existingPromise) {
return existingPromise;
}

// Create and store the promise immediately to prevent racing requests
const metadataCheckPromise = (async () => {
try {
const metadataData = await getMetadata(config);
if (!runConfig.disableMetadataLookup) {
// This should never error, since it uses pRetry. If it does for
// any reason, we'll surface the error anywhere #metadataPromise is
// awaited.
this.#metadataPromise = lookupMetadataCached(this.#config).then(
(metadataData) => {
// Override endpoint for all future requests
if (metadataData.clientEndpoint) {
this.#config.endpoint = metadataData.clientEndpoint;
logger().info({
msg: "overriding cached client endpoint",
endpoint: metadataData.clientEndpoint,
});
}

if (metadataData.clientEndpoint) {
logger().info({
msg: "received new client endpoint from metadata",
endpoint: metadataData.clientEndpoint,
msg: "connected to rivetkit manager",
runtime: metadataData.runtime,
version: metadataData.version,
runner: metadataData.runner,
});
this.#config.endpoint = metadataData.clientEndpoint;
}

// Log successful metadata check with runtime and version info
logger().info({
msg: "connected to rivetkit manager",
runtime: metadataData.runtime,
version: metadataData.version,
runner: metadataData.runner,
});
} catch (error) {
logger().error({
msg: "health check failed, validate the Rivet endpoint is configured correctly",
endpoint,
error: stringifyError(error),
});
}
})();

metadataCheckCache.set(endpoint, metadataCheckPromise);
return metadataCheckPromise;
},
);
}
}

async getForId({
Expand Down
Loading