Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 3 additions & 36 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 1 addition & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

[workspace]
resolver = "2"
members = ["engine/packages/actor-kv","engine/packages/api-builder","engine/packages/api-peer","engine/packages/api-public","engine/packages/api-types","engine/packages/api-util","engine/packages/bootstrap","engine/packages/cache","engine/packages/cache-purge","engine/packages/cache-result","engine/packages/clickhouse-inserter","engine/packages/clickhouse-user-query","engine/packages/config","engine/packages/dump-openapi","engine/packages/engine","engine/packages/env","engine/packages/epoxy","engine/packages/error","engine/packages/error-macros","engine/packages/gasoline","engine/packages/gasoline-macros","engine/packages/guard","engine/packages/guard-core","engine/packages/internal","engine/packages/logs","engine/packages/metrics","engine/packages/namespace","engine/packages/pegboard","engine/packages/pegboard-gateway","engine/packages/pegboard-runner","engine/packages/pegboard-serverless","engine/packages/pools","engine/packages/runtime","engine/packages/service-manager","engine/packages/telemetry","engine/packages/test-deps","engine/packages/test-deps-docker","engine/packages/tracing-reconfigure","engine/packages/types","engine/packages/universaldb","engine/packages/universalpubsub","engine/packages/util","engine/packages/util-id","engine/packages/workflow-worker","engine/sdks/rust/api-full","engine/sdks/rust/data","engine/sdks/rust/epoxy-protocol","engine/sdks/rust/runner-protocol","engine/sdks/rust/ups-protocol"]
members = ["engine/packages/actor-kv","engine/packages/api-builder","engine/packages/api-peer","engine/packages/api-public","engine/packages/api-types","engine/packages/api-util","engine/packages/bootstrap","engine/packages/cache","engine/packages/cache-purge","engine/packages/cache-result","engine/packages/clickhouse-inserter","engine/packages/clickhouse-user-query","engine/packages/config","engine/packages/dump-openapi","engine/packages/engine","engine/packages/env","engine/packages/epoxy","engine/packages/error","engine/packages/error-macros","engine/packages/gasoline","engine/packages/gasoline-macros","engine/packages/guard","engine/packages/guard-core","engine/packages/logs","engine/packages/metrics","engine/packages/namespace","engine/packages/pegboard","engine/packages/pegboard-gateway","engine/packages/pegboard-runner","engine/packages/pools","engine/packages/runtime","engine/packages/service-manager","engine/packages/telemetry","engine/packages/test-deps","engine/packages/test-deps-docker","engine/packages/tracing-reconfigure","engine/packages/types","engine/packages/universaldb","engine/packages/universalpubsub","engine/packages/util","engine/packages/util-id","engine/packages/workflow-worker","engine/sdks/rust/api-full","engine/sdks/rust/data","engine/sdks/rust/epoxy-protocol","engine/sdks/rust/runner-protocol","engine/sdks/rust/ups-protocol"]

[workspace.package]
version = "2.0.24-rc.1"
Expand Down Expand Up @@ -334,9 +334,6 @@ path = "engine/packages/guard"
[workspace.dependencies.rivet-guard-core]
path = "engine/packages/guard-core"

[workspace.dependencies.internal]
path = "engine/packages/internal"

[workspace.dependencies.rivet-logs]
path = "engine/packages/logs"

Expand All @@ -355,9 +352,6 @@ path = "engine/packages/pegboard-gateway"
[workspace.dependencies.pegboard-runner]
path = "engine/packages/pegboard-runner"

[workspace.dependencies.pegboard-serverless]
path = "engine/packages/pegboard-serverless"

[workspace.dependencies.rivet-pools]
path = "engine/packages/pools"

Expand Down
5 changes: 0 additions & 5 deletions engine/artifacts/errors/api.bad_request.json

This file was deleted.

5 changes: 0 additions & 5 deletions engine/artifacts/errors/api.forbidden.json

This file was deleted.

5 changes: 0 additions & 5 deletions engine/artifacts/errors/api.internal_error.json

This file was deleted.

5 changes: 0 additions & 5 deletions engine/artifacts/errors/api.not_found.json

This file was deleted.

5 changes: 0 additions & 5 deletions engine/artifacts/errors/api.rate_limited.json

This file was deleted.

5 changes: 0 additions & 5 deletions engine/artifacts/errors/api.unauthorized.json

This file was deleted.

5 changes: 0 additions & 5 deletions engine/artifacts/errors/kv.leader_forwarding_failed.json

This file was deleted.

5 changes: 0 additions & 5 deletions engine/artifacts/errors/kv.no_leader_elected.json

This file was deleted.

5 changes: 0 additions & 5 deletions engine/artifacts/errors/kv.not_leader.json

This file was deleted.

5 changes: 0 additions & 5 deletions engine/artifacts/errors/kv.response_channel_closed.json

This file was deleted.

5 changes: 0 additions & 5 deletions engine/artifacts/errors/namespace.invalid_name.json

This file was deleted.

5 changes: 5 additions & 0 deletions engine/artifacts/errors/serverless_runner_pool.not_found.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion engine/artifacts/openapi.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions engine/packages/api-peer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ serde.workspace = true
serde_json.workspace = true
indexmap.workspace = true

tokio.workspace = true
tracing.workspace = true
namespace.workspace = true
pegboard.workspace = true
tokio.workspace = true
tracing.workspace = true
universalpubsub.workspace = true
uuid.workspace = true
utoipa.workspace = true
26 changes: 23 additions & 3 deletions engine/packages/api-peer/src/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ pub async fn cache_purge(
Ok(CachePurgeResponse {})
}

#[derive(Serialize, Deserialize)]
pub struct BumpServerlessAutoscalerRequest {
pub namespace_id: Id,
pub runner_name: String,
}

#[derive(Serialize)]
#[serde(deny_unknown_fields)]
pub struct BumpServerlessAutoscalerResponse {}
Expand All @@ -37,11 +43,25 @@ pub async fn bump_serverless_autoscaler(
ctx: ApiCtx,
_path: (),
_query: (),
_body: (),
body: BumpServerlessAutoscalerRequest,
) -> Result<BumpServerlessAutoscalerResponse> {
ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {})
let res = ctx
.signal(pegboard::workflows::serverless::pool::Bump {})
.to_workflow::<pegboard::workflows::serverless::pool::Workflow>()
.tag("namespace_id", body.namespace_id)
.tag("runner_name", body.runner_name.clone())
.send()
.await?;
.await;

if let Some(WorkflowError::WorkflowNotFound) = res
.as_ref()
.err()
.and_then(|x| x.chain().find_map(|x| x.downcast_ref::<WorkflowError>()))
{
return Err(pegboard::errors::ServerlessRunnerPool::NotFound.build());
} else {
res?;
}

Ok(BumpServerlessAutoscalerResponse {})
}
Expand Down
8 changes: 4 additions & 4 deletions engine/packages/api-peer/src/runner_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub async fn list(ctx: ApiCtx, _path: ListPath, query: ListQuery) -> Result<List

if let Some(runner_names) = query.runner_names {
let runner_configs = ctx
.op(namespace::ops::runner_config::get::Input {
.op(pegboard::ops::runner_config::get::Input {
runners: runner_names
.split(',')
.map(|name| (namespace.namespace_id, name.to_string()))
Expand Down Expand Up @@ -54,7 +54,7 @@ pub async fn list(ctx: ApiCtx, _path: ListPath, query: ListQuery) -> Result<List
};

let runner_configs = ctx
.op(namespace::ops::runner_config::list::Input {
.op(pegboard::ops::runner_config::list::Input {
namespace_id: namespace.namespace_id,
variant,
after_name,
Expand Down Expand Up @@ -112,7 +112,7 @@ pub async fn upsert(
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;

let endpoint_config_changed = ctx
.op(namespace::ops::runner_config::upsert::Input {
.op(pegboard::ops::runner_config::upsert::Input {
namespace_id: namespace.namespace_id,
name: path.runner_name,
config: body.0.into(),
Expand Down Expand Up @@ -150,7 +150,7 @@ pub async fn delete(ctx: ApiCtx, path: DeletePath, query: DeleteQuery) -> Result
.await?
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;

ctx.op(namespace::ops::runner_config::delete::Input {
ctx.op(pegboard::ops::runner_config::delete::Input {
namespace_id: namespace.namespace_id,
name: path.runner_name,
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async fn refresh_metadata_inner(
.collect();

let runner_configs = ctx
.op(namespace::ops::runner_config::get::Input {
.op(pegboard::ops::runner_config::get::Input {
runners,
bypass_cache: true,
})
Expand Down
1 change: 0 additions & 1 deletion engine/packages/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ include_dir.workspace = true
indoc.workspace = true
lz4_flex.workspace = true
pegboard-runner.workspace = true
pegboard-serverless.workspace = true
reqwest.workspace = true
rivet-api-peer.workspace = true
rivet-bootstrap.workspace = true
Expand Down
6 changes: 0 additions & 6 deletions engine/packages/engine/src/run_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,6 @@ pub fn config(_rivet_config: rivet_config::Config) -> Result<RunConfigData> {
Service::new("bootstrap", ServiceKind::Oneshot, |config, pools| {
Box::pin(rivet_bootstrap::start(config, pools))
}),
Service::new(
"pegboard_serverless",
// There should only be one of these, since it's auto-scaling requests
ServiceKind::Singleton,
|config, pools| Box::pin(pegboard_serverless::start(config, pools)),
),
// Core services
Service::new("tracing_reconfigure", ServiceKind::Core, |config, pools| {
Box::pin(rivet_tracing_reconfigure::start(config, pools))
Expand Down
33 changes: 31 additions & 2 deletions engine/packages/gasoline/src/ctx/activity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ use tokio::sync::Mutex;
use tracing::Instrument;

use crate::{
builder::common as builder,
ctx::{
common,
message::{MessageCtx, SubscriptionHandle},
},
db::DatabaseHandle,
db::{DatabaseHandle, WorkflowData},
error::{WorkflowError, WorkflowResult},
message::Message,
operation::{Operation, OperationInput},
signal::Signal,
utils::tags::AsTags,
workflow::StateGuard,
workflow::{StateGuard, Workflow},
};

pub struct ActivityCtx {
Expand Down Expand Up @@ -77,6 +79,33 @@ impl ActivityCtx {
}

impl ActivityCtx {
/// Finds the first incomplete workflow with the given tags.
#[tracing::instrument(skip_all, ret(Debug), fields(workflow_name=W::NAME))]
pub async fn find_workflow<W: Workflow>(&self, tags: impl AsTags) -> Result<Option<Id>> {
common::find_workflow::<W>(&self.db, tags)
.in_current_span()
.await
}

/// Finds the first incomplete workflow with the given tags.
#[tracing::instrument(skip_all)]
pub async fn get_workflows(&self, workflow_ids: Vec<Id>) -> Result<Vec<WorkflowData>> {
common::get_workflows(&self.db, workflow_ids)
.in_current_span()
.await
}

/// Creates a signal builder.
pub fn signal<T: Signal + Serialize>(&self, body: T) -> builder::signal::SignalBuilder<T> {
builder::signal::SignalBuilder::new(
self.db.clone(),
self.config.clone(),
self.ray_id,
body,
true,
)
}

#[tracing::instrument(skip_all)]
pub fn state<T: Serialize + DeserializeOwned>(&self) -> Result<StateGuard<'_, T>> {
if self.parallelized {
Expand Down
1 change: 0 additions & 1 deletion engine/packages/gasoline/src/ctx/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ impl OperationCtx {

/// Creates a signal builder.
pub fn signal<T: Signal + Serialize>(&self, body: T) -> builder::signal::SignalBuilder<T> {
// TODO: Add check for from_workflow so you cant dispatch a signal
builder::signal::SignalBuilder::new(
self.db.clone(),
self.config.clone(),
Expand Down
4 changes: 4 additions & 0 deletions engine/packages/gasoline/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,10 @@ impl WorkflowData {
.transpose()
.map_err(WorkflowError::DeserializeWorkflowOutput)
}

pub fn has_output(&self) -> bool {
self.output.is_some()
}
}

#[derive(Debug)]
Expand Down
Loading
Loading