Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/cursors-raw-websocket/src/frontend/App.tsx

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rivetkit-openapi/openapi.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"openapi": "3.0.0",
"info": {
"version": "2.0.22",
"version": "2.0.24-rc.1",
"title": "RivetKit API"
},
"components": {
Expand Down
4 changes: 2 additions & 2 deletions rivetkit-rust/packages/client/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub const HEADER_RIVET_ACTOR: &str = "x-rivet-actor";
pub const HEADER_RIVET_TOKEN: &str = "x-rivet-token";

// Paths
pub const PATH_CONNECT_WEBSOCKET: &str = "/connect/websocket";
pub const PATH_CONNECT_WEBSOCKET: &str = "/connect";

// WebSocket protocol prefixes
pub const WS_PROTOCOL_STANDARD: &str = "rivet";
Expand Down Expand Up @@ -63,4 +63,4 @@ impl ToString for EncodingKind {


// Max size of each entry is 128 bytes
pub type ActorKey = Vec<String>;
pub type ActorKey = Vec<String>;
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ import {
type ManagerDisplayInformation,
type ManagerDriver,
WS_PROTOCOL_ACTOR,
WS_PROTOCOL_CONN_ID,
WS_PROTOCOL_CONN_PARAMS,
WS_PROTOCOL_CONN_TOKEN,
WS_PROTOCOL_ENCODING,
WS_PROTOCOL_STANDARD,
WS_PROTOCOL_TARGET,
Expand Down Expand Up @@ -76,8 +74,6 @@ export class CloudflareActorsManagerDriver implements ManagerDriver {
actorId: string,
encoding: Encoding,
params: unknown,
connId?: string,
connToken?: string,
): Promise<UniversalWebSocket> {
const env = getCloudflareAmbientEnv();

Expand All @@ -101,12 +97,6 @@ export class CloudflareActorsManagerDriver implements ManagerDriver {
`${WS_PROTOCOL_CONN_PARAMS}${encodeURIComponent(JSON.stringify(params))}`,
);
}
if (connId) {
protocols.push(`${WS_PROTOCOL_CONN_ID}${connId}`);
}
if (connToken) {
protocols.push(`${WS_PROTOCOL_CONN_TOKEN}${connToken}`);
}

const headers: Record<string, string> = {
Upgrade: "websocket",
Expand Down
2 changes: 1 addition & 1 deletion rivetkit-typescript/packages/rivetkit/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@
],
"scripts": {
"build": "tsup src/mod.ts src/client/mod.ts src/common/log.ts src/common/websocket.ts src/actor/errors.ts src/topologies/coordinate/mod.ts src/topologies/partition/mod.ts src/utils.ts src/driver-helpers/mod.ts src/driver-test-suite/mod.ts src/test/mod.ts src/inspector/mod.ts",
"build:schema": "./scripts/compile-bare.ts compile schemas/client-protocol/v1.bare -o dist/schemas/client-protocol/v1.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v1.bare -o dist/schemas/file-system-driver/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v1.bare -o dist/schemas/actor-persist/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v2.bare -o dist/schemas/actor-persist/v2.ts",
"build:schema": "./scripts/compile-bare.ts compile schemas/client-protocol/v1.bare -o dist/schemas/client-protocol/v1.ts && ./scripts/compile-bare.ts compile schemas/client-protocol/v2.bare -o dist/schemas/client-protocol/v2.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v1.bare -o dist/schemas/file-system-driver/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v1.bare -o dist/schemas/actor-persist/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v2.bare -o dist/schemas/actor-persist/v2.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v3.bare -o dist/schemas/actor-persist/v3.ts",
"check-types": "tsc --noEmit",
"test": "vitest run",
"test:watch": "vitest",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# MARK: Connection
# Connection associated with hibernatable WebSocket that should persist across lifecycles.
type PersistedHibernatableConn struct {
# Connection ID generated by RivetKit
id: str
parameters: data
state: data

# Request ID of the hibernatable WebSocket
hibernatableRequestId: data
# Last seen message from this WebSocket
lastSeenTimestamp: i64
# Last seem message index for this WebSocket
msgIndex: i64
}

# MARK: Schedule Event
type PersistedScheduleEvent struct {
eventId: str
timestamp: i64
action: str
args: optional<data>
}

# MARK: Actor
type PersistedActor struct {
# Input data passed to the actor on initialization
input: optional<data>
hasInitialized: bool
state: data
hibernatableConns: list<PersistedHibernatableConn>
scheduledEvents: list<PersistedScheduleEvent>
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# MARK: Message To Client
type Init struct {
actorId: str
connectionId: str
}

type Error struct {
group: str
code: str
message: str
metadata: optional<data>
actionId: optional<uint>
}

type ActionResponse struct {
id: uint
output: data
}

type Event struct {
name: str
# CBOR array
args: data
}

type ToClientBody union {
Init |
Error |
ActionResponse |
Event
}

type ToClient struct {
body: ToClientBody
}

# MARK: Message To Server
type ActionRequest struct {
id: uint
name: str
# CBOR array
args: data
}

type SubscriptionRequest struct {
eventName: str
subscribe: bool
}

type ToServerBody union {
ActionRequest |
SubscriptionRequest
}

type ToServer struct {
body: ToServerBody
}

# MARK: HTTP Action
type HttpActionRequest struct {
# CBOR array
args: data
}

type HttpActionResponse struct {
output: data
}

# MARK: HTTP Error
type HttpResponseError struct {
group: str
code: str
message: str
metadata: optional<data>
}

# MARK: HTTP Resolve
type HttpResolveRequest void

type HttpResolveResponse struct {
actorId: str
}
45 changes: 0 additions & 45 deletions rivetkit-typescript/packages/rivetkit/src/actor/conn-drivers.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import type { SSEStreamingApi } from "hono/streaming";
import type { WSContext } from "hono/ws";
import type { WebSocket } from "ws";
import type { AnyConn } from "@/actor/conn";
Expand All @@ -11,7 +10,6 @@ import { assertUnreachable, type promiseWithResolvers } from "@/utils";

export enum ConnDriverKind {
WEBSOCKET = 0,
SSE = 1,
HTTP = 2,
}

Expand All @@ -29,16 +27,10 @@ export interface ConnDriverWebSocketState {
closePromise: ReturnType<typeof promiseWithResolvers<void>>;
}

export interface ConnDriverSseState {
encoding: Encoding;
stream: SSEStreamingApi;
}

export type ConnDriverHttpState = Record<never, never>;

export type ConnDriverState =
| { [ConnDriverKind.WEBSOCKET]: ConnDriverWebSocketState }
| { [ConnDriverKind.SSE]: ConnDriverSseState }
| { [ConnDriverKind.HTTP]: ConnDriverHttpState };

export interface ConnDriver<State> {
Expand Down Expand Up @@ -152,41 +144,6 @@ const WEBSOCKET_DRIVER: ConnDriver<ConnDriverWebSocketState> = {
},
};

// MARK: SSE
const SSE_DRIVER: ConnDriver<ConnDriverSseState> = {
sendMessage: (
_actor: AnyActorInstance,
_conn: AnyConn,
state: ConnDriverSseState,
message: CachedSerializer<protocol.ToClient>,
) => {
state.stream.writeSSE({
data: encodeDataToString(message.serialize(state.encoding)),
});
},

disconnect: async (
_actor: AnyActorInstance,
_conn: AnyConn,
state: ConnDriverSseState,
_reason?: string,
) => {
state.stream.close();
},

getConnectionReadyState: (
_actor: AnyActorInstance,
_conn: AnyConn,
state: ConnDriverSseState,
): ConnReadyState | undefined => {
if (state.stream.aborted || state.stream.closed) {
return ConnReadyState.CLOSED;
}

return ConnReadyState.OPEN;
},
};

// MARK: HTTP
const HTTP_DRIVER: ConnDriver<ConnDriverHttpState> = {
getConnectionReadyState(_actor, _conn) {
Expand All @@ -202,15 +159,13 @@ const HTTP_DRIVER: ConnDriver<ConnDriverHttpState> = {
/** List of all connection drivers. */
export const CONN_DRIVERS: Record<ConnDriverKind, ConnDriver<unknown>> = {
[ConnDriverKind.WEBSOCKET]: WEBSOCKET_DRIVER,
[ConnDriverKind.SSE]: SSE_DRIVER,
[ConnDriverKind.HTTP]: HTTP_DRIVER,
};

export function getConnDriverKindFromState(
state: ConnDriverState,
): ConnDriverKind {
if (ConnDriverKind.WEBSOCKET in state) return ConnDriverKind.WEBSOCKET;
else if (ConnDriverKind.SSE in state) return ConnDriverKind.SSE;
else if (ConnDriverKind.HTTP in state) return ConnDriverKind.HTTP;
else assertUnreachable(state);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { ConnDriverState } from "./conn-drivers";

export interface ConnSocket {
/** This is the request ID provided by the given framework. If not provided this is a random UUID. */
requestId: string;
requestIdBuf?: ArrayBuffer;
hibernatable: boolean;
Expand Down
44 changes: 5 additions & 39 deletions rivetkit-typescript/packages/rivetkit/src/actor/conn.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
import * as cbor from "cbor-x";
import invariant from "invariant";
import { PersistedHibernatableWebSocket } from "@/schemas/actor-persist/mod";
import type * as protocol from "@/schemas/client-protocol/mod";
import { TO_CLIENT_VERSIONED } from "@/schemas/client-protocol/versioned";
import { arrayBuffersEqual, bufferToArrayBuffer } from "@/utils";
import {
CONN_DRIVERS,
ConnDriverKind,
type ConnDriverState,
ConnReadyState,
getConnDriverKindFromState,
} from "./conn-drivers";
import type { ConnSocket } from "./conn-socket";
Expand All @@ -17,15 +13,6 @@ import * as errors from "./errors";
import { type ActorInstance, PERSIST_SYMBOL } from "./instance";
import type { PersistedConn } from "./persisted";
import { CachedSerializer } from "./protocol/serde";
import { generateSecureToken } from "./utils";

export function generateConnId(): string {
return crypto.randomUUID();
}

export function generateConnToken(): string {
return generateSecureToken(32);
}

export function generateConnRequestId(): string {
return crypto.randomUUID();
Expand All @@ -35,8 +22,6 @@ export type ConnId = string;

export type AnyConn = Conn<any, any, any, any, any, any>;

export type ConnectionStatus = "connected" | "reconnecting";

/**
* Represents a client connection to a actor.
*
Expand All @@ -53,7 +38,11 @@ export class Conn<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
/**
* The proxied state that notifies of changes automatically.
*
* Any data that should be stored indefinitely should be held within this object.
* Any data that should be stored indefinitely should be held within this
* object.
*
* This will only be persisted if using hibernatable WebSockets. If not,
* this is just used to hole state.
*/
__persist: PersistedConn<CP, CS>;

Expand All @@ -68,15 +57,6 @@ export class Conn<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
*/
__socket?: ConnSocket;

get __status(): ConnectionStatus {
// TODO: isHibernatible might be true while the actual hibernatable websocket has disconnected
if (this.__socket || this.isHibernatable) {
return "connected";
} else {
return "reconnecting";
}
}

public get params(): CP {
return this.__persist.params;
}
Expand Down Expand Up @@ -113,20 +93,6 @@ export class Conn<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
return this.__persist.connId;
}

/**
* Token used to authenticate this request.
*/
public get _token(): string {
return this.__persist.token;
}

/**
* Status of the connection.
*/
public get status(): ConnectionStatus {
return this.__status;
}

/**
* @experimental
*
Expand Down
Loading
Loading