From 406bd5a79aad5d420fd03b8938b48799d265e301 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Thu, 6 Nov 2025 13:59:06 -0800 Subject: [PATCH] fix(rivetkit): properly apply clientEndpoint from metadata lookup --- .../packages/rivetkit/src/client/config.ts | 2 +- .../packages/rivetkit/src/registry/mod.ts | 4 +- .../src/remote-manager-driver/metadata.ts | 58 +++++++++++++ .../rivetkit/src/remote-manager-driver/mod.ts | 81 +++++++------------ 4 files changed, 88 insertions(+), 57 deletions(-) create mode 100644 rivetkit-typescript/packages/rivetkit/src/remote-manager-driver/metadata.ts diff --git a/rivetkit-typescript/packages/rivetkit/src/client/config.ts b/rivetkit-typescript/packages/rivetkit/src/client/config.ts index c2c622d900..97f9f8d269 100644 --- a/rivetkit-typescript/packages/rivetkit/src/client/config.ts +++ b/rivetkit-typescript/packages/rivetkit/src/client/config.ts @@ -38,7 +38,7 @@ export const ClientConfigSchema = z.object({ getUpgradeWebSocket: z.custom().optional(), /** Whether to automatically perform health checks when the client is created. */ - disableHealthCheck: z.boolean().optional().default(false), + disableMetadataLookup: z.boolean().optional().default(false), }); export type ClientConfig = z.infer; diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/mod.ts b/rivetkit-typescript/packages/rivetkit/src/registry/mod.ts index e7965e71f2..78fb798778 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/mod.ts @@ -80,7 +80,7 @@ export class Registry { // a serverless runner request, so we do not know what to health check if (config.runnerKind === "serverless") { logger().debug("disabling health check since using serverless"); - config.disableHealthCheck = true; + config.disableMetadataLookup = true; } // Auto-configure serverless runner if not in prod @@ -289,7 +289,7 @@ async function configureServerlessRunner(config: RunnerConfig): Promise { encoding: config.encoding, headers: config.headers, getUpgradeWebSocket: config.getUpgradeWebSocket, - disableHealthCheck: true, // We don't need health check for this operation + disableMetadataLookup: true, // We don't need health check for this operation }; // Fetch all datacenters diff --git a/rivetkit-typescript/packages/rivetkit/src/remote-manager-driver/metadata.ts b/rivetkit-typescript/packages/rivetkit/src/remote-manager-driver/metadata.ts new file mode 100644 index 0000000000..5b073297e5 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/src/remote-manager-driver/metadata.ts @@ -0,0 +1,58 @@ +import pRetry from "p-retry"; +import type { ClientConfig } from "@/client/client"; +import type { MetadataResponse } from "@/common/router"; +import { stringifyError } from "@/common/utils"; +import { getMetadata } from "./api-endpoints"; +import { getEndpoint } from "./api-utils"; +import { logger } from "./log"; + +// Global cache to store metadata check promises for each endpoint +const metadataLookupCache = new Map>(); + +export async function lookupMetadataCached( + config: ClientConfig, +): Promise { + const endpoint = getEndpoint(config); + + // Check if metadata lookup is already in progress or completed for this endpoint + const existingPromise = metadataLookupCache.get(endpoint); + if (existingPromise) { + return existingPromise; + } + + // Create and store the promise immediately to prevent racing requests + const metadataLookupPromise = pRetry( + async () => { + logger().debug({ + msg: "fetching metadata", + endpoint, + }); + + const metadataData = await getMetadata(config); + + logger().debug({ + msg: "received metadata", + endpoint, + clientEndpoint: metadataData.clientEndpoint, + }); + + return metadataData; + }, + { + forever: true, + minTimeout: 500, + maxTimeout: 15_000, + onFailedAttempt: (error) => { + logger().warn({ + msg: "failed to fetch metadata, retrying", + endpoint, + attempt: error.attemptNumber, + error: stringifyError(error), + }); + }, + }, + ); + + metadataLookupCache.set(endpoint, metadataLookupPromise); + return metadataLookupPromise; +} diff --git a/rivetkit-typescript/packages/rivetkit/src/remote-manager-driver/mod.ts b/rivetkit-typescript/packages/rivetkit/src/remote-manager-driver/mod.ts index 08a29a4710..62e734dada 100644 --- a/rivetkit-typescript/packages/rivetkit/src/remote-manager-driver/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/remote-manager-driver/mod.ts @@ -4,6 +4,7 @@ import invariant from "invariant"; import { deserializeActorKey, serializeActorKey } from "@/actor/keys"; import { generateRandomString } from "@/actor/utils"; import type { ClientConfig } from "@/client/client"; +import type { MetadataResponse } from "@/common/router"; import { noopNext, stringifyError } from "@/common/utils"; import type { ActorOutput, @@ -32,6 +33,7 @@ import { } from "./api-endpoints"; import { EngineApiError, getEndpoint } from "./api-utils"; import { logger } from "./log"; +import { lookupMetadataCached } from "./metadata"; import { createWebSocketProxy } from "./ws-proxy"; // TODO: @@ -48,9 +50,6 @@ import { createWebSocketProxy } from "./ws-proxy"; // }; // })(); -// Global cache to store metadata check promises for each endpoint -const metadataCheckCache = new Map>(); - export class RemoteManagerDriver implements ManagerDriver { #config: ClientConfig; #metadataPromise: Promise | undefined; @@ -63,64 +62,38 @@ export class RemoteManagerDriver implements ManagerDriver { logger().info( "detected next.js build phase, disabling health check", ); - runConfig.disableHealthCheck = true; + runConfig.disableMetadataLookup = true; } - this.#config = runConfig; + // Clone config so we can mutate the endpoint in #metadataPromise + // NOTE: This is a shallow clone, so mutating nested properties will not do anything + this.#config = { ...runConfig }; // Perform metadata check if enabled - if (!runConfig.disableHealthCheck) { - this.#metadataPromise = this.#performMetadataCheck(runConfig); - this.#metadataPromise.catch((error) => { - logger().error({ - msg: "metadata check failed", - error: - error instanceof Error ? error.message : String(error), - }); - }); - } - } - - async #performMetadataCheck(config: ClientConfig): Promise { - const endpoint = getEndpoint(config); - - // Check if metadata check is already in progress or completed for this endpoint - const existingPromise = metadataCheckCache.get(endpoint); - if (existingPromise) { - return existingPromise; - } - - // Create and store the promise immediately to prevent racing requests - const metadataCheckPromise = (async () => { - try { - const metadataData = await getMetadata(config); + if (!runConfig.disableMetadataLookup) { + // This should never error, since it uses pRetry. If it does for + // any reason, we'll surface the error anywhere #metadataPromise is + // awaited. + this.#metadataPromise = lookupMetadataCached(this.#config).then( + (metadataData) => { + // Override endpoint for all future requests + if (metadataData.clientEndpoint) { + this.#config.endpoint = metadataData.clientEndpoint; + logger().info({ + msg: "overriding cached client endpoint", + endpoint: metadataData.clientEndpoint, + }); + } - if (metadataData.clientEndpoint) { logger().info({ - msg: "received new client endpoint from metadata", - endpoint: metadataData.clientEndpoint, + msg: "connected to rivetkit manager", + runtime: metadataData.runtime, + version: metadataData.version, + runner: metadataData.runner, }); - this.#config.endpoint = metadataData.clientEndpoint; - } - - // Log successful metadata check with runtime and version info - logger().info({ - msg: "connected to rivetkit manager", - runtime: metadataData.runtime, - version: metadataData.version, - runner: metadataData.runner, - }); - } catch (error) { - logger().error({ - msg: "health check failed, validate the Rivet endpoint is configured correctly", - endpoint, - error: stringifyError(error), - }); - } - })(); - - metadataCheckCache.set(endpoint, metadataCheckPromise); - return metadataCheckPromise; + }, + ); + } } async getForId({