From 728f761bdb4b42d0953942c726b578520e85f337 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Fri, 7 Nov 2025 18:17:23 -0800 Subject: [PATCH] feat(gas): implement smart pulling for horizontally scaled workers --- .gitattributes | 5 + Cargo.lock | 43 ++ Cargo.toml | 1 + engine/artifacts/openapi.json | 2 +- .../dev-host/grafana/dashboards/gasoline.json | 8 +- .../core/grafana/dashboards/gasoline.json | 8 +- .../core/grafana/dashboards/gasoline.json | 8 +- .../grafana/dashboards/gasoline.json | 8 +- .../dev/grafana/dashboards/gasoline.json | 8 +- .../template/grafana-dashboards/gasoline.json | 8 +- engine/packages/gasoline/Cargo.toml | 3 +- .../src/builder/workflow/sub_workflow.rs | 9 +- engine/packages/gasoline/src/ctx/common.rs | 6 +- engine/packages/gasoline/src/ctx/workflow.rs | 30 +- engine/packages/gasoline/src/db/kv/debug.rs | 33 +- .../packages/gasoline/src/db/kv/keys/mod.rs | 2 +- .../gasoline/src/db/kv/keys/worker.rs | 187 +++++++ .../src/db/kv/keys/worker_instance.rs | 92 ---- .../gasoline/src/db/kv/keys/workflow.rs | 24 +- engine/packages/gasoline/src/db/kv/mod.rs | 483 +++++++++++------- .../packages/gasoline/src/db/kv/subjects.rs | 13 + engine/packages/gasoline/src/db/kv/system.rs | 33 ++ engine/packages/gasoline/src/db/mod.rs | 28 +- engine/packages/gasoline/src/metrics.rs | 16 +- engine/packages/gasoline/src/worker.rs | 190 +++---- engine/packages/universaldb/src/utils/keys.rs | 4 +- engine/packages/util-id/src/lib.rs | 2 +- 27 files changed, 797 insertions(+), 457 deletions(-) create mode 100644 engine/packages/gasoline/src/db/kv/keys/worker.rs delete mode 100644 engine/packages/gasoline/src/db/kv/keys/worker_instance.rs create mode 100644 engine/packages/gasoline/src/db/kv/subjects.rs create mode 100644 engine/packages/gasoline/src/db/kv/system.rs diff --git a/.gitattributes b/.gitattributes index 1ade211256..37e61d9e31 100644 --- a/.gitattributes +++ b/.gitattributes @@ -15,6 +15,11 @@ engine/sdks/** linguist-generated=true engine/sdks/typescript/runner/** linguist-generated=false engine/sdks/typescript/test-runner/** linguist-generated=false engine/sdks/schema/** linguist-generated=false +engine/docker/dev/** linguist-generated=true +engine/docker/dev-host/** linguist-generated=true +engine/docker/dev-multidc/** linguist-generated=true +engine/docker/dev-multidc-multinode/** linguist-generated=true +engine/docker/dev-multinode/** linguist-generated=true website/public/llms.txt linguist-generated=true website/public/llms-full.txt linguist-generated=true diff --git a/Cargo.lock b/Cargo.lock index 4431806248..da8c1dc40e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1828,6 +1828,7 @@ dependencies = [ "serde_json", "statrs", "strum", + "sysinfo", "thiserror 1.0.69", "tokio", "tokio-util", @@ -3012,6 +3013,15 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" +[[package]] +name = "ntapi" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4" +dependencies = [ + "winapi", +] + [[package]] name = "nu-ansi-term" version = "0.50.1" @@ -3115,6 +3125,25 @@ dependencies = [ "libc", ] +[[package]] +name = "objc2-core-foundation" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a180dd8642fa45cdb7dd721cd4c11b1cadd4929ce112ebd8b9f5803cc79d536" +dependencies = [ + "bitflags 2.10.0", +] + +[[package]] +name = "objc2-io-kit" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33fafba39597d6dc1fb709123dfa8289d39406734be322956a69f0931c73bb15" +dependencies = [ + "libc", + "objc2-core-foundation", +] + [[package]] name = "object" version = "0.36.7" @@ -5914,6 +5943,20 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "sysinfo" +version = "0.37.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16607d5caffd1c07ce073528f9ed972d88db15dd44023fa57142963be3feb11f" +dependencies = [ + "libc", + "memchr", + "ntapi", + "objc2-core-foundation", + "objc2-io-kit", + "windows 0.61.3", +] + [[package]] name = "system-configuration" version = "0.6.1" diff --git a/Cargo.toml b/Cargo.toml index 7630afc160..0f353940a3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,6 +69,7 @@ slog = "2.7" slog-async = "2.8" slog-term = "2.9" statrs = "0.18" +sysinfo = "0.37.2" tabled = "0.17.0" tempfile = "3.13.0" testcontainers = "0.24" diff --git a/engine/artifacts/openapi.json b/engine/artifacts/openapi.json index e13da9b43f..0e59ac1fc0 100644 --- a/engine/artifacts/openapi.json +++ b/engine/artifacts/openapi.json @@ -11,7 +11,7 @@ "name": "Apache-2.0", "identifier": "Apache-2.0" }, - "version": "2.0.23" + "version": "2.0.24-rc.1" }, "paths": { "/actors": { diff --git a/engine/docker/dev-host/grafana/dashboards/gasoline.json b/engine/docker/dev-host/grafana/dashboards/gasoline.json index 2b0bffca01..16a9f37025 100644 --- a/engine/docker/dev-host/grafana/dashboards/gasoline.json +++ b/engine/docker/dev-host/grafana/dashboards/gasoline.json @@ -1087,12 +1087,12 @@ "editorType": "sql", "format": 1, "instant": false, - "legendFormat": "{{worker_instance_id}}", + "legendFormat": "{{worker_id}}", "meta": {}, "pluginVersion": "4.11.2", "queryType": "table", "range": true, - "rawSql": "WITH 1 AS smoothness\nSELECT\n time,\n label,\n avg(value) OVER (PARTITION BY label ORDER BY time ROWS BETWEEN smoothness - 1 PRECEDING AND CURRENT ROW) as value\nFROM (\n\tSELECT\n $__timeInterval(TimeUnix) as time,\n Attributes['worker_instance_id'] as label,\n max(Value) as value\nFROM otel.otel_metrics_gauge\nWHERE MetricName = 'rivet_gasoline_last_pull_workflows_duration'\n AND ResourceAttributes['rivet.project'] IN array($project)\n AND ResourceAttributes['rivet.datacenter'] IN array($datacenter)\n AND $__timeFilter(TimeUnix)\nGROUP BY time, label\n)\nORDER BY label", + "rawSql": "WITH 1 AS smoothness\nSELECT\n time,\n label,\n avg(value) OVER (PARTITION BY label ORDER BY time ROWS BETWEEN smoothness - 1 PRECEDING AND CURRENT ROW) as value\nFROM (\n\tSELECT\n $__timeInterval(TimeUnix) as time,\n Attributes['worker_id'] as label,\n max(Value) as value\nFROM otel.otel_metrics_gauge\nWHERE MetricName = 'rivet_gasoline_last_pull_workflows_duration'\n AND ResourceAttributes['rivet.project'] IN array($project)\n AND ResourceAttributes['rivet.datacenter'] IN array($datacenter)\n AND $__timeFilter(TimeUnix)\nGROUP BY time, label\n)\nORDER BY label", "refId": "A" } ], @@ -1211,12 +1211,12 @@ "editorType": "sql", "format": 1, "instant": false, - "legendFormat": "{{worker_instance_id}}", + "legendFormat": "{{worker_id}}", "meta": {}, "pluginVersion": "4.11.2", "queryType": "table", "range": true, - "rawSql": "WITH 1 AS smoothness\nSELECT\n time,\n label,\n avg(value) OVER (PARTITION BY label ORDER BY time ROWS BETWEEN smoothness - 1 PRECEDING AND CURRENT ROW) as value\nFROM (\n\tSELECT\n $__timeInterval(TimeUnix) as time,\n Attributes['worker_instance_id'] as label,\n max(Value) as value\nFROM otel.otel_metrics_gauge\nWHERE MetricName = 'rivet_gasoline_last_pull_workflows_history_duration'\n AND ResourceAttributes['rivet.project'] IN array($project)\n AND ResourceAttributes['rivet.datacenter'] IN array($datacenter)\n AND $__timeFilter(TimeUnix)\nGROUP BY time, label\n)\nORDER BY label", + "rawSql": "WITH 1 AS smoothness\nSELECT\n time,\n label,\n avg(value) OVER (PARTITION BY label ORDER BY time ROWS BETWEEN smoothness - 1 PRECEDING AND CURRENT ROW) as value\nFROM (\n\tSELECT\n $__timeInterval(TimeUnix) as time,\n Attributes['worker_id'] as label,\n max(Value) as value\nFROM otel.otel_metrics_gauge\nWHERE MetricName = 'rivet_gasoline_last_pull_workflows_history_duration'\n AND ResourceAttributes['rivet.project'] IN array($project)\n AND ResourceAttributes['rivet.datacenter'] IN array($datacenter)\n AND $__timeFilter(TimeUnix)\nGROUP BY time, label\n)\nORDER BY label", "refId": "A" } ], diff --git a/engine/docker/dev-multidc-multinode/core/grafana/dashboards/gasoline.json b/engine/docker/dev-multidc-multinode/core/grafana/dashboards/gasoline.json index 2b0bffca01..16a9f37025 100644 --- a/engine/docker/dev-multidc-multinode/core/grafana/dashboards/gasoline.json +++ b/engine/docker/dev-multidc-multinode/core/grafana/dashboards/gasoline.json @@ -1087,12 +1087,12 @@ "editorType": "sql", "format": 1, "instant": false, - "legendFormat": "{{worker_instance_id}}", + "legendFormat": "{{worker_id}}", "meta": {}, "pluginVersion": "4.11.2", "queryType": "table", "range": true, - "rawSql": "WITH 1 AS smoothness\nSELECT\n time,\n label,\n avg(value) OVER (PARTITION BY label ORDER BY time ROWS BETWEEN smoothness - 1 PRECEDING AND CURRENT ROW) as value\nFROM (\n\tSELECT\n $__timeInterval(TimeUnix) as time,\n Attributes['worker_instance_id'] as label,\n max(Value) as value\nFROM otel.otel_metrics_gauge\nWHERE MetricName = 'rivet_gasoline_last_pull_workflows_duration'\n AND ResourceAttributes['rivet.project'] IN array($project)\n AND ResourceAttributes['rivet.datacenter'] IN array($datacenter)\n AND $__timeFilter(TimeUnix)\nGROUP BY time, label\n)\nORDER BY label", + "rawSql": "WITH 1 AS smoothness\nSELECT\n time,\n label,\n avg(value) OVER (PARTITION BY label ORDER BY time ROWS BETWEEN smoothness - 1 PRECEDING AND CURRENT ROW) as value\nFROM (\n\tSELECT\n $__timeInterval(TimeUnix) as time,\n Attributes['worker_id'] as label,\n max(Value) as value\nFROM otel.otel_metrics_gauge\nWHERE MetricName = 'rivet_gasoline_last_pull_workflows_duration'\n AND ResourceAttributes['rivet.project'] IN array($project)\n AND ResourceAttributes['rivet.datacenter'] IN array($datacenter)\n AND $__timeFilter(TimeUnix)\nGROUP BY time, label\n)\nORDER BY label", "refId": "A" } ], @@ -1211,12 +1211,12 @@ "editorType": "sql", "format": 1, "instant": false, - "legendFormat": "{{worker_instance_id}}", + "legendFormat": "{{worker_id}}", "meta": {}, "pluginVersion": "4.11.2", "queryType": "table", "range": true, - "rawSql": "WITH 1 AS smoothness\nSELECT\n time,\n label,\n avg(value) OVER (PARTITION BY label ORDER BY time ROWS BETWEEN smoothness - 1 PRECEDING AND CURRENT ROW) as value\nFROM (\n\tSELECT\n $__timeInterval(TimeUnix) as time,\n Attributes['worker_instance_id'] as label,\n max(Value) as value\nFROM otel.otel_metrics_gauge\nWHERE MetricName = 'rivet_gasoline_last_pull_workflows_history_duration'\n AND ResourceAttributes['rivet.project'] IN array($project)\n AND ResourceAttributes['rivet.datacenter'] IN array($datacenter)\n AND $__timeFilter(TimeUnix)\nGROUP BY time, label\n)\nORDER BY label", + "rawSql": "WITH 1 AS smoothness\nSELECT\n time,\n label,\n avg(value) OVER (PARTITION BY label ORDER BY time ROWS BETWEEN smoothness - 1 PRECEDING AND CURRENT ROW) as value\nFROM (\n\tSELECT\n $__timeInterval(TimeUnix) as time,\n Attributes['worker_id'] as label,\n max(Value) as value\nFROM otel.otel_metrics_gauge\nWHERE MetricName = 'rivet_gasoline_last_pull_workflows_history_duration'\n AND ResourceAttributes['rivet.project'] IN array($project)\n AND ResourceAttributes['rivet.datacenter'] IN array($datacenter)\n AND $__timeFilter(TimeUnix)\nGROUP BY time, label\n)\nORDER BY label", "refId": "A" } ], diff --git a/engine/docker/dev-multidc/core/grafana/dashboards/gasoline.json b/engine/docker/dev-multidc/core/grafana/dashboards/gasoline.json index 2b0bffca01..16a9f37025 100644 --- a/engine/docker/dev-multidc/core/grafana/dashboards/gasoline.json +++ b/engine/docker/dev-multidc/core/grafana/dashboards/gasoline.json @@ -1087,12 +1087,12 @@ "editorType": "sql", "format": 1, "instant": false, - "legendFormat": "{{worker_instance_id}}", + "legendFormat": "{{worker_id}}", "meta": {}, "pluginVersion": "4.11.2", "queryType": "table", "range": true, - "rawSql": "WITH 1 AS smoothness\nSELECT\n time,\n label,\n avg(value) OVER (PARTITION BY label ORDER BY time ROWS BETWEEN smoothness - 1 PRECEDING AND CURRENT ROW) as value\nFROM (\n\tSELECT\n $__timeInterval(TimeUnix) as time,\n Attributes['worker_instance_id'] as label,\n max(Value) as value\nFROM otel.otel_metrics_gauge\nWHERE MetricName = 'rivet_gasoline_last_pull_workflows_duration'\n AND ResourceAttributes['rivet.project'] IN array($project)\n AND ResourceAttributes['rivet.datacenter'] IN array($datacenter)\n AND $__timeFilter(TimeUnix)\nGROUP BY time, label\n)\nORDER BY label", + "rawSql": "WITH 1 AS smoothness\nSELECT\n time,\n label,\n avg(value) OVER (PARTITION BY label ORDER BY time ROWS BETWEEN smoothness - 1 PRECEDING AND CURRENT ROW) as value\nFROM (\n\tSELECT\n $__timeInterval(TimeUnix) as time,\n Attributes['worker_id'] as label,\n max(Value) as value\nFROM otel.otel_metrics_gauge\nWHERE MetricName = 'rivet_gasoline_last_pull_workflows_duration'\n AND ResourceAttributes['rivet.project'] IN array($project)\n AND ResourceAttributes['rivet.datacenter'] IN array($datacenter)\n AND $__timeFilter(TimeUnix)\nGROUP BY time, label\n)\nORDER BY label", "refId": "A" } ], @@ -1211,12 +1211,12 @@ "editorType": "sql", "format": 1, "instant": false, - "legendFormat": "{{worker_instance_id}}", + "legendFormat": "{{worker_id}}", "meta": {}, "pluginVersion": "4.11.2", "queryType": "table", "range": true, - "rawSql": "WITH 1 AS smoothness\nSELECT\n time,\n label,\n avg(value) OVER (PARTITION BY label ORDER BY time ROWS BETWEEN smoothness - 1 PRECEDING AND CURRENT ROW) as value\nFROM (\n\tSELECT\n $__timeInterval(TimeUnix) as time,\n Attributes['worker_instance_id'] as label,\n max(Value) as value\nFROM otel.otel_metrics_gauge\nWHERE MetricName = 'rivet_gasoline_last_pull_workflows_history_duration'\n AND ResourceAttributes['rivet.project'] IN array($project)\n AND ResourceAttributes['rivet.datacenter'] IN array($datacenter)\n AND $__timeFilter(TimeUnix)\nGROUP BY time, label\n)\nORDER BY label", + "rawSql": "WITH 1 AS smoothness\nSELECT\n time,\n label,\n avg(value) OVER (PARTITION BY label ORDER BY time ROWS BETWEEN smoothness - 1 PRECEDING AND CURRENT ROW) as value\nFROM (\n\tSELECT\n $__timeInterval(TimeUnix) as time,\n Attributes['worker_id'] as label,\n max(Value) as value\nFROM otel.otel_metrics_gauge\nWHERE MetricName = 'rivet_gasoline_last_pull_workflows_history_duration'\n AND ResourceAttributes['rivet.project'] IN array($project)\n AND ResourceAttributes['rivet.datacenter'] IN array($datacenter)\n AND $__timeFilter(TimeUnix)\nGROUP BY time, label\n)\nORDER BY label", "refId": "A" } ], diff --git a/engine/docker/dev-multinode/grafana/dashboards/gasoline.json b/engine/docker/dev-multinode/grafana/dashboards/gasoline.json index 2b0bffca01..16a9f37025 100644 --- a/engine/docker/dev-multinode/grafana/dashboards/gasoline.json +++ b/engine/docker/dev-multinode/grafana/dashboards/gasoline.json @@ -1087,12 +1087,12 @@ "editorType": "sql", "format": 1, "instant": false, - "legendFormat": "{{worker_instance_id}}", + "legendFormat": "{{worker_id}}", "meta": {}, "pluginVersion": "4.11.2", "queryType": "table", "range": true, - "rawSql": "WITH 1 AS smoothness\nSELECT\n time,\n label,\n avg(value) OVER (PARTITION BY label ORDER BY time ROWS BETWEEN smoothness - 1 PRECEDING AND CURRENT ROW) as value\nFROM (\n\tSELECT\n $__timeInterval(TimeUnix) as time,\n Attributes['worker_instance_id'] as label,\n max(Value) as value\nFROM otel.otel_metrics_gauge\nWHERE MetricName = 'rivet_gasoline_last_pull_workflows_duration'\n AND ResourceAttributes['rivet.project'] IN array($project)\n AND ResourceAttributes['rivet.datacenter'] IN array($datacenter)\n AND $__timeFilter(TimeUnix)\nGROUP BY time, label\n)\nORDER BY label", + "rawSql": "WITH 1 AS smoothness\nSELECT\n time,\n label,\n avg(value) OVER (PARTITION BY label ORDER BY time ROWS BETWEEN smoothness - 1 PRECEDING AND CURRENT ROW) as value\nFROM (\n\tSELECT\n $__timeInterval(TimeUnix) as time,\n Attributes['worker_id'] as label,\n max(Value) as value\nFROM otel.otel_metrics_gauge\nWHERE MetricName = 'rivet_gasoline_last_pull_workflows_duration'\n AND ResourceAttributes['rivet.project'] IN array($project)\n AND ResourceAttributes['rivet.datacenter'] IN array($datacenter)\n AND $__timeFilter(TimeUnix)\nGROUP BY time, label\n)\nORDER BY label", "refId": "A" } ], @@ -1211,12 +1211,12 @@ "editorType": "sql", "format": 1, "instant": false, - "legendFormat": "{{worker_instance_id}}", + "legendFormat": "{{worker_id}}", "meta": {}, "pluginVersion": "4.11.2", "queryType": "table", "range": true, - "rawSql": "WITH 1 AS smoothness\nSELECT\n time,\n label,\n avg(value) OVER (PARTITION BY label ORDER BY time ROWS BETWEEN smoothness - 1 PRECEDING AND CURRENT ROW) as value\nFROM (\n\tSELECT\n $__timeInterval(TimeUnix) as time,\n Attributes['worker_instance_id'] as label,\n max(Value) as value\nFROM otel.otel_metrics_gauge\nWHERE MetricName = 'rivet_gasoline_last_pull_workflows_history_duration'\n AND ResourceAttributes['rivet.project'] IN array($project)\n AND ResourceAttributes['rivet.datacenter'] IN array($datacenter)\n AND $__timeFilter(TimeUnix)\nGROUP BY time, label\n)\nORDER BY label", + "rawSql": "WITH 1 AS smoothness\nSELECT\n time,\n label,\n avg(value) OVER (PARTITION BY label ORDER BY time ROWS BETWEEN smoothness - 1 PRECEDING AND CURRENT ROW) as value\nFROM (\n\tSELECT\n $__timeInterval(TimeUnix) as time,\n Attributes['worker_id'] as label,\n max(Value) as value\nFROM otel.otel_metrics_gauge\nWHERE MetricName = 'rivet_gasoline_last_pull_workflows_history_duration'\n AND ResourceAttributes['rivet.project'] IN array($project)\n AND ResourceAttributes['rivet.datacenter'] IN array($datacenter)\n AND $__timeFilter(TimeUnix)\nGROUP BY time, label\n)\nORDER BY label", "refId": "A" } ], diff --git a/engine/docker/dev/grafana/dashboards/gasoline.json b/engine/docker/dev/grafana/dashboards/gasoline.json index 2b0bffca01..16a9f37025 100644 --- a/engine/docker/dev/grafana/dashboards/gasoline.json +++ b/engine/docker/dev/grafana/dashboards/gasoline.json @@ -1087,12 +1087,12 @@ "editorType": "sql", "format": 1, "instant": false, - "legendFormat": "{{worker_instance_id}}", + "legendFormat": "{{worker_id}}", "meta": {}, "pluginVersion": "4.11.2", "queryType": "table", "range": true, - "rawSql": "WITH 1 AS smoothness\nSELECT\n time,\n label,\n avg(value) OVER (PARTITION BY label ORDER BY time ROWS BETWEEN smoothness - 1 PRECEDING AND CURRENT ROW) as value\nFROM (\n\tSELECT\n $__timeInterval(TimeUnix) as time,\n Attributes['worker_instance_id'] as label,\n max(Value) as value\nFROM otel.otel_metrics_gauge\nWHERE MetricName = 'rivet_gasoline_last_pull_workflows_duration'\n AND ResourceAttributes['rivet.project'] IN array($project)\n AND ResourceAttributes['rivet.datacenter'] IN array($datacenter)\n AND $__timeFilter(TimeUnix)\nGROUP BY time, label\n)\nORDER BY label", + "rawSql": "WITH 1 AS smoothness\nSELECT\n time,\n label,\n avg(value) OVER (PARTITION BY label ORDER BY time ROWS BETWEEN smoothness - 1 PRECEDING AND CURRENT ROW) as value\nFROM (\n\tSELECT\n $__timeInterval(TimeUnix) as time,\n Attributes['worker_id'] as label,\n max(Value) as value\nFROM otel.otel_metrics_gauge\nWHERE MetricName = 'rivet_gasoline_last_pull_workflows_duration'\n AND ResourceAttributes['rivet.project'] IN array($project)\n AND ResourceAttributes['rivet.datacenter'] IN array($datacenter)\n AND $__timeFilter(TimeUnix)\nGROUP BY time, label\n)\nORDER BY label", "refId": "A" } ], @@ -1211,12 +1211,12 @@ "editorType": "sql", "format": 1, "instant": false, - "legendFormat": "{{worker_instance_id}}", + "legendFormat": "{{worker_id}}", "meta": {}, "pluginVersion": "4.11.2", "queryType": "table", "range": true, - "rawSql": "WITH 1 AS smoothness\nSELECT\n time,\n label,\n avg(value) OVER (PARTITION BY label ORDER BY time ROWS BETWEEN smoothness - 1 PRECEDING AND CURRENT ROW) as value\nFROM (\n\tSELECT\n $__timeInterval(TimeUnix) as time,\n Attributes['worker_instance_id'] as label,\n max(Value) as value\nFROM otel.otel_metrics_gauge\nWHERE MetricName = 'rivet_gasoline_last_pull_workflows_history_duration'\n AND ResourceAttributes['rivet.project'] IN array($project)\n AND ResourceAttributes['rivet.datacenter'] IN array($datacenter)\n AND $__timeFilter(TimeUnix)\nGROUP BY time, label\n)\nORDER BY label", + "rawSql": "WITH 1 AS smoothness\nSELECT\n time,\n label,\n avg(value) OVER (PARTITION BY label ORDER BY time ROWS BETWEEN smoothness - 1 PRECEDING AND CURRENT ROW) as value\nFROM (\n\tSELECT\n $__timeInterval(TimeUnix) as time,\n Attributes['worker_id'] as label,\n max(Value) as value\nFROM otel.otel_metrics_gauge\nWHERE MetricName = 'rivet_gasoline_last_pull_workflows_history_duration'\n AND ResourceAttributes['rivet.project'] IN array($project)\n AND ResourceAttributes['rivet.datacenter'] IN array($datacenter)\n AND $__timeFilter(TimeUnix)\nGROUP BY time, label\n)\nORDER BY label", "refId": "A" } ], diff --git a/engine/docker/template/grafana-dashboards/gasoline.json b/engine/docker/template/grafana-dashboards/gasoline.json index 2b0bffca01..16a9f37025 100644 --- a/engine/docker/template/grafana-dashboards/gasoline.json +++ b/engine/docker/template/grafana-dashboards/gasoline.json @@ -1087,12 +1087,12 @@ "editorType": "sql", "format": 1, "instant": false, - "legendFormat": "{{worker_instance_id}}", + "legendFormat": "{{worker_id}}", "meta": {}, "pluginVersion": "4.11.2", "queryType": "table", "range": true, - "rawSql": "WITH 1 AS smoothness\nSELECT\n time,\n label,\n avg(value) OVER (PARTITION BY label ORDER BY time ROWS BETWEEN smoothness - 1 PRECEDING AND CURRENT ROW) as value\nFROM (\n\tSELECT\n $__timeInterval(TimeUnix) as time,\n Attributes['worker_instance_id'] as label,\n max(Value) as value\nFROM otel.otel_metrics_gauge\nWHERE MetricName = 'rivet_gasoline_last_pull_workflows_duration'\n AND ResourceAttributes['rivet.project'] IN array($project)\n AND ResourceAttributes['rivet.datacenter'] IN array($datacenter)\n AND $__timeFilter(TimeUnix)\nGROUP BY time, label\n)\nORDER BY label", + "rawSql": "WITH 1 AS smoothness\nSELECT\n time,\n label,\n avg(value) OVER (PARTITION BY label ORDER BY time ROWS BETWEEN smoothness - 1 PRECEDING AND CURRENT ROW) as value\nFROM (\n\tSELECT\n $__timeInterval(TimeUnix) as time,\n Attributes['worker_id'] as label,\n max(Value) as value\nFROM otel.otel_metrics_gauge\nWHERE MetricName = 'rivet_gasoline_last_pull_workflows_duration'\n AND ResourceAttributes['rivet.project'] IN array($project)\n AND ResourceAttributes['rivet.datacenter'] IN array($datacenter)\n AND $__timeFilter(TimeUnix)\nGROUP BY time, label\n)\nORDER BY label", "refId": "A" } ], @@ -1211,12 +1211,12 @@ "editorType": "sql", "format": 1, "instant": false, - "legendFormat": "{{worker_instance_id}}", + "legendFormat": "{{worker_id}}", "meta": {}, "pluginVersion": "4.11.2", "queryType": "table", "range": true, - "rawSql": "WITH 1 AS smoothness\nSELECT\n time,\n label,\n avg(value) OVER (PARTITION BY label ORDER BY time ROWS BETWEEN smoothness - 1 PRECEDING AND CURRENT ROW) as value\nFROM (\n\tSELECT\n $__timeInterval(TimeUnix) as time,\n Attributes['worker_instance_id'] as label,\n max(Value) as value\nFROM otel.otel_metrics_gauge\nWHERE MetricName = 'rivet_gasoline_last_pull_workflows_history_duration'\n AND ResourceAttributes['rivet.project'] IN array($project)\n AND ResourceAttributes['rivet.datacenter'] IN array($datacenter)\n AND $__timeFilter(TimeUnix)\nGROUP BY time, label\n)\nORDER BY label", + "rawSql": "WITH 1 AS smoothness\nSELECT\n time,\n label,\n avg(value) OVER (PARTITION BY label ORDER BY time ROWS BETWEEN smoothness - 1 PRECEDING AND CURRENT ROW) as value\nFROM (\n\tSELECT\n $__timeInterval(TimeUnix) as time,\n Attributes['worker_id'] as label,\n max(Value) as value\nFROM otel.otel_metrics_gauge\nWHERE MetricName = 'rivet_gasoline_last_pull_workflows_history_duration'\n AND ResourceAttributes['rivet.project'] IN array($project)\n AND ResourceAttributes['rivet.datacenter'] IN array($datacenter)\n AND $__timeFilter(TimeUnix)\nGROUP BY time, label\n)\nORDER BY label", "refId": "A" } ], diff --git a/engine/packages/gasoline/Cargo.toml b/engine/packages/gasoline/Cargo.toml index 98f9bfef06..6221326339 100644 --- a/engine/packages/gasoline/Cargo.toml +++ b/engine/packages/gasoline/Cargo.toml @@ -34,14 +34,15 @@ sentry.workspace = true serde_json.workspace = true serde.workspace = true strum.workspace = true +sysinfo.workspace = true thiserror.workspace = true tokio-util.workspace = true tokio.workspace = true tracing-logfmt.workspace = true tracing-opentelemetry.workspace = true -universaldb.workspace = true tracing-subscriber = { workspace = true, features = ["env-filter"] } tracing.workspace = true +universaldb.workspace = true universalpubsub.workspace = true url.workspace = true uuid.workspace = true diff --git a/engine/packages/gasoline/src/builder/workflow/sub_workflow.rs b/engine/packages/gasoline/src/builder/workflow/sub_workflow.rs index 6c1180ae2f..cca32f4319 100644 --- a/engine/packages/gasoline/src/builder/workflow/sub_workflow.rs +++ b/engine/packages/gasoline/src/builder/workflow/sub_workflow.rs @@ -10,6 +10,7 @@ use tracing::Instrument; use crate::{ builder::{BuilderError, WorkflowRepr}, ctx::WorkflowCtx, + db::BumpSubSubject, error::{WorkflowError, WorkflowResult}, history::cursor::HistoryResult, metrics, @@ -266,7 +267,13 @@ where tracing::debug!("waiting for sub workflow"); - let mut bump_sub = self.ctx.db().bump_sub().await?; + let mut bump_sub = self + .ctx + .db() + .bump_sub(BumpSubSubject::WorkflowComplete { + workflow_id: sub_workflow_id, + }) + .await?; let mut retries = self.ctx.db().max_sub_workflow_poll_retries(); let mut interval = tokio::time::interval(self.ctx.db().sub_workflow_poll_interval()); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); diff --git a/engine/packages/gasoline/src/ctx/common.rs b/engine/packages/gasoline/src/ctx/common.rs index 49a3a34efe..e66387334b 100644 --- a/engine/packages/gasoline/src/ctx/common.rs +++ b/engine/packages/gasoline/src/ctx/common.rs @@ -11,7 +11,7 @@ pub const WORKFLOW_TIMEOUT: Duration = Duration::from_secs(60); use crate::{ ctx::OperationCtx, - db::{DatabaseHandle, WorkflowData}, + db::{BumpSubSubject, DatabaseHandle, WorkflowData}, error::WorkflowError, operation::{Operation, OperationInput}, utils::tags::AsTags, @@ -26,7 +26,9 @@ pub async fn wait_for_workflow_output( ) -> Result { tracing::debug!(?workflow_id, "waiting for workflow"); - let mut bump_sub = db.bump_sub().await?; + let mut bump_sub = db + .bump_sub(BumpSubSubject::WorkflowComplete { workflow_id }) + .await?; let mut interval = tokio::time::interval(db.sub_workflow_poll_interval()); // Skip first tick, we wait after the db call instead of before diff --git a/engine/packages/gasoline/src/ctx/workflow.rs b/engine/packages/gasoline/src/ctx/workflow.rs index 8d184036a4..236ee2e7d9 100644 --- a/engine/packages/gasoline/src/ctx/workflow.rs +++ b/engine/packages/gasoline/src/ctx/workflow.rs @@ -19,7 +19,7 @@ use crate::{ activity::{Activity, ActivityInput}, builder::{WorkflowRepr, workflow as builder}, ctx::{ActivityCtx, ListenCtx, MessageCtx, VersionedWorkflowCtx}, - db::{DatabaseHandle, PulledWorkflowData}, + db::{BumpSubSubject, DatabaseHandle, PulledWorkflowData}, error::{WorkflowError, WorkflowResult}, executable::{AsyncResult, Executable}, history::{ @@ -716,7 +716,12 @@ impl WorkflowCtx { else { tracing::debug!("listening for signal"); - let mut bump_sub = self.db.bump_sub().await?; + let mut bump_sub = self + .db + .bump_sub(BumpSubSubject::SignalPublish { + to_workflow_id: self.workflow_id, + }) + .await?; let mut retries = self.db.max_signal_poll_retries(); let mut interval = tokio::time::interval(self.db.signal_poll_interval()); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); @@ -779,7 +784,12 @@ impl WorkflowCtx { else { tracing::debug!("listening for signal"); - let mut bump_sub = self.db.bump_sub().await?; + let mut bump_sub = self + .db + .bump_sub(BumpSubSubject::SignalPublish { + to_workflow_id: self.workflow_id, + }) + .await?; let mut retries = self.db.max_signal_poll_retries(); let mut interval = tokio::time::interval(self.db.signal_poll_interval()); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); @@ -1186,7 +1196,12 @@ impl WorkflowCtx { (async { tracing::debug!("listening for signal with timeout"); - let mut bump_sub = self.db.bump_sub().await?; + let mut bump_sub = self + .db + .bump_sub(BumpSubSubject::SignalPublish { + to_workflow_id: self.workflow_id, + }) + .await?; let mut interval = tokio::time::interval(self.db.signal_poll_interval()); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); @@ -1229,7 +1244,12 @@ impl WorkflowCtx { else { tracing::debug!("listening for signal with timeout"); - let mut bump_sub = self.db.bump_sub().await?; + let mut bump_sub = self + .db + .bump_sub(BumpSubSubject::SignalPublish { + to_workflow_id: self.workflow_id, + }) + .await?; let mut retries = self.db.max_signal_poll_retries(); let mut interval = tokio::time::interval(self.db.signal_poll_interval()); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); diff --git a/engine/packages/gasoline/src/db/kv/debug.rs b/engine/packages/gasoline/src/db/kv/debug.rs index aff9967940..a2e0b68420 100644 --- a/engine/packages/gasoline/src/db/kv/debug.rs +++ b/engine/packages/gasoline/src/db/kv/debug.rs @@ -18,10 +18,13 @@ use universaldb::{ use super::{DatabaseKv, keys, update_metric}; use crate::{ - db::debug::{ - ActivityError, ActivityEvent, DatabaseDebug, Event, EventData, HistoryData, LoopEvent, - MessageSendEvent, SignalData, SignalEvent, SignalSendEvent, SignalState, SubWorkflowEvent, - WorkflowData, WorkflowState, + db::{ + BumpSubSubject, + debug::{ + ActivityError, ActivityEvent, DatabaseDebug, Event, EventData, HistoryData, LoopEvent, + MessageSendEvent, SignalData, SignalEvent, SignalSendEvent, SignalState, + SubWorkflowEvent, WorkflowData, WorkflowState, + }, }, error::{WorkflowError, WorkflowResult}, history::{ @@ -54,7 +57,7 @@ impl DatabaseKv { let output_subspace = self.subspace.subspace(&output_key); let error_key = keys::workflow::ErrorKey::new(workflow_id); let has_wake_condition_key = keys::workflow::HasWakeConditionKey::new(workflow_id); - let worker_instance_id_key = keys::workflow::WorkerInstanceIdKey::new(workflow_id); + let worker_id_key = keys::workflow::WorkerIdKey::new(workflow_id); let silence_ts_key = keys::workflow::SilenceTsKey::new(workflow_id); let ( @@ -66,7 +69,7 @@ impl DatabaseKv { output_chunks, error_entry, has_wake_condition_entry, - worker_instance_id_entry, + worker_id_entry, silence_ts_entry, ) = tokio::try_join!( tx.get_ranges_keyvalues( @@ -111,7 +114,7 @@ impl DatabaseKv { .try_collect::>(), tx.get(&self.subspace.pack(&error_key), Snapshot), tx.get(&self.subspace.pack(&has_wake_condition_key), Snapshot), - tx.get(&self.subspace.pack(&worker_instance_id_key), Snapshot), + tx.get(&self.subspace.pack(&worker_id_key), Snapshot), tx.get(&self.subspace.pack(&silence_ts_key), Snapshot), )?; @@ -148,7 +151,7 @@ impl DatabaseKv { WorkflowState::Silenced } else if output.is_some() { WorkflowState::Complete - } else if worker_instance_id_entry.is_some() { + } else if worker_id_entry.is_some() { WorkflowState::Running } else if has_wake_condition_entry.is_some() { WorkflowState::Sleeping @@ -362,7 +365,7 @@ impl DatabaseDebug for DatabaseKv { } } else if let Ok(_) = self .subspace - .unpack::(entry.key()) + .unpack::(entry.key()) { match state { Some(WorkflowState::Running) => state_matches = true, @@ -425,8 +428,7 @@ impl DatabaseDebug for DatabaseKv { .subspace .subspace(&keys::workflow::TagKey::subspace(workflow_id)); let name_key = keys::workflow::NameKey::new(workflow_id); - let worker_instance_id_key = - keys::workflow::WorkerInstanceIdKey::new(workflow_id); + let worker_id_key = keys::workflow::WorkerIdKey::new(workflow_id); let output_key = keys::workflow::OutputKey::new(workflow_id); let output_subspace = self.subspace.subspace(&output_key); let has_wake_condition_key = @@ -509,7 +511,7 @@ impl DatabaseDebug for DatabaseKv { }) .try_collect::>(), async { - tx.get(&self.subspace.pack(&worker_instance_id_key), Serializable) + tx.get(&self.subspace.pack(&worker_id_key), Serializable) .await .map(|x| x.is_some()) }, @@ -645,8 +647,7 @@ impl DatabaseDebug for DatabaseKv { for workflow_id in workflow_ids { let name_key = keys::workflow::NameKey::new(workflow_id); - let worker_instance_id_key = - keys::workflow::WorkerInstanceIdKey::new(workflow_id); + let worker_id_key = keys::workflow::WorkerIdKey::new(workflow_id); let has_wake_condition_key = keys::workflow::HasWakeConditionKey::new(workflow_id); let error_key = keys::workflow::ErrorKey::new(workflow_id); @@ -663,7 +664,7 @@ impl DatabaseDebug for DatabaseKv { error, ) = tokio::try_join!( tx.read(&name_key, Serializable), - tx.exists(&worker_instance_id_key, Serializable), + tx.exists(&worker_id_key, Serializable), tx.exists(&has_wake_condition_key, Serializable), tx.exists(&silence_ts_key, Serializable), async { @@ -718,7 +719,7 @@ impl DatabaseDebug for DatabaseKv { .instrument(tracing::info_span!("wake_workflows_tx")) .await?; - self.bump_workers(); + self.bump(BumpSubSubject::Worker); Ok(()) } diff --git a/engine/packages/gasoline/src/db/kv/keys/mod.rs b/engine/packages/gasoline/src/db/kv/keys/mod.rs index c14fed814f..4c794ec07a 100644 --- a/engine/packages/gasoline/src/db/kv/keys/mod.rs +++ b/engine/packages/gasoline/src/db/kv/keys/mod.rs @@ -2,5 +2,5 @@ pub mod history; pub mod metric; pub mod signal; pub mod wake; -pub mod worker_instance; +pub mod worker; pub mod workflow; diff --git a/engine/packages/gasoline/src/db/kv/keys/worker.rs b/engine/packages/gasoline/src/db/kv/keys/worker.rs new file mode 100644 index 0000000000..376e6b1d27 --- /dev/null +++ b/engine/packages/gasoline/src/db/kv/keys/worker.rs @@ -0,0 +1,187 @@ +use std::result::Result::Ok; + +use anyhow::*; +use rivet_util::Id; +use universaldb::prelude::*; + +#[derive(Debug)] +pub struct LastPingTsKey { + worker_id: Id, +} + +impl LastPingTsKey { + pub fn new(worker_id: Id) -> Self { + LastPingTsKey { worker_id } + } +} + +impl FormalKey for LastPingTsKey { + // Timestamp. + type Value = i64; + + fn deserialize(&self, raw: &[u8]) -> Result { + Ok(i64::from_be_bytes(raw.try_into()?)) + } + + fn serialize(&self, value: Self::Value) -> Result> { + Ok(value.to_be_bytes().to_vec()) + } +} + +impl TuplePack for LastPingTsKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let t = (WORKER, DATA, self.worker_id, LAST_PING_TS); + t.pack(w, tuple_depth) + } +} + +impl<'de> TupleUnpack<'de> for LastPingTsKey { + fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { + let (input, (_, _, worker_id, _)) = + <(usize, usize, Id, usize)>::unpack(input, tuple_depth)?; + let v = LastPingTsKey { worker_id }; + + Ok((input, v)) + } +} + +#[derive(Debug)] +pub struct ActiveWorkerIdxKey { + last_ping_ts: i64, + pub worker_id: Id, +} + +impl ActiveWorkerIdxKey { + pub fn new(last_ping_ts: i64, worker_id: Id) -> Self { + ActiveWorkerIdxKey { + last_ping_ts, + worker_id, + } + } + + pub fn subspace(last_ping_ts: i64) -> ActiveWorkerIdxSubspaceKey { + ActiveWorkerIdxSubspaceKey::new(last_ping_ts) + } + + pub fn entire_subspace() -> ActiveWorkerIdxSubspaceKey { + ActiveWorkerIdxSubspaceKey::entire() + } +} + +impl FormalKey for ActiveWorkerIdxKey { + type Value = (); + + fn deserialize(&self, _raw: &[u8]) -> Result { + Ok(()) + } + + fn serialize(&self, _value: Self::Value) -> Result> { + Ok(Vec::new()) + } +} + +impl TuplePack for ActiveWorkerIdxKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let t = (WORKER, ACTIVE, self.last_ping_ts, self.worker_id); + t.pack(w, tuple_depth) + } +} + +impl<'de> TupleUnpack<'de> for ActiveWorkerIdxKey { + fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { + let (input, (_, _, last_ping_ts, worker_id)) = + <(usize, usize, i64, Id)>::unpack(input, tuple_depth)?; + let v = ActiveWorkerIdxKey { + last_ping_ts, + worker_id, + }; + + Ok((input, v)) + } +} + +#[derive(Debug)] +pub struct ActiveWorkerIdxSubspaceKey { + last_ping_ts: Option, +} + +impl ActiveWorkerIdxSubspaceKey { + pub fn new(last_ping_ts: i64) -> Self { + ActiveWorkerIdxSubspaceKey { + last_ping_ts: Some(last_ping_ts), + } + } + + pub fn entire() -> Self { + ActiveWorkerIdxSubspaceKey { last_ping_ts: None } + } +} + +impl TuplePack for ActiveWorkerIdxSubspaceKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let mut offset = VersionstampOffset::None { size: 0 }; + + let t = (WORKER, ACTIVE); + offset += t.pack(w, tuple_depth)?; + + if let Some(last_ping_ts) = self.last_ping_ts { + offset += last_ping_ts.pack(w, tuple_depth)?; + } + + Ok(offset) + } +} + +#[derive(Debug)] +pub struct MetricsLockKey {} + +impl MetricsLockKey { + pub fn new() -> Self { + MetricsLockKey {} + } +} + +impl FormalKey for MetricsLockKey { + // Timestamp. + type Value = i64; + + fn deserialize(&self, raw: &[u8]) -> Result { + Ok(i64::from_be_bytes(raw.try_into()?)) + } + + fn serialize(&self, value: Self::Value) -> Result> { + Ok(value.to_be_bytes().to_vec()) + } +} + +impl TuplePack for MetricsLockKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let t = (WORKER, METRICS_LOCK); + t.pack(w, tuple_depth) + } +} + +impl<'de> TupleUnpack<'de> for MetricsLockKey { + fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { + let (input, (_, _)) = <(usize, usize)>::unpack(input, tuple_depth)?; + let v = MetricsLockKey {}; + + Ok((input, v)) + } +} diff --git a/engine/packages/gasoline/src/db/kv/keys/worker_instance.rs b/engine/packages/gasoline/src/db/kv/keys/worker_instance.rs deleted file mode 100644 index 47b4a1103a..0000000000 --- a/engine/packages/gasoline/src/db/kv/keys/worker_instance.rs +++ /dev/null @@ -1,92 +0,0 @@ -use std::result::Result::Ok; - -use anyhow::*; -use rivet_util::Id; -use universaldb::prelude::*; - -#[derive(Debug)] -pub struct LastPingTsKey { - worker_instance_id: Id, -} - -impl LastPingTsKey { - pub fn new(worker_instance_id: Id) -> Self { - LastPingTsKey { worker_instance_id } - } -} - -impl FormalKey for LastPingTsKey { - // Timestamp. - type Value = i64; - - fn deserialize(&self, raw: &[u8]) -> Result { - Ok(i64::from_be_bytes(raw.try_into()?)) - } - - fn serialize(&self, value: Self::Value) -> Result> { - Ok(value.to_be_bytes().to_vec()) - } -} - -impl TuplePack for LastPingTsKey { - fn pack( - &self, - w: &mut W, - tuple_depth: TupleDepth, - ) -> std::io::Result { - let t = (WORKER_INSTANCE, DATA, self.worker_instance_id, LAST_PING_TS); - t.pack(w, tuple_depth) - } -} - -impl<'de> TupleUnpack<'de> for LastPingTsKey { - fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { - let (input, (_, _, worker_instance_id, _)) = - <(usize, usize, Id, usize)>::unpack(input, tuple_depth)?; - let v = LastPingTsKey { worker_instance_id }; - - Ok((input, v)) - } -} - -#[derive(Debug)] -pub struct MetricsLockKey {} - -impl MetricsLockKey { - pub fn new() -> Self { - MetricsLockKey {} - } -} - -impl FormalKey for MetricsLockKey { - // Timestamp. - type Value = i64; - - fn deserialize(&self, raw: &[u8]) -> Result { - Ok(i64::from_be_bytes(raw.try_into()?)) - } - - fn serialize(&self, value: Self::Value) -> Result> { - Ok(value.to_be_bytes().to_vec()) - } -} - -impl TuplePack for MetricsLockKey { - fn pack( - &self, - w: &mut W, - tuple_depth: TupleDepth, - ) -> std::io::Result { - let t = (WORKER_INSTANCE, METRICS_LOCK); - t.pack(w, tuple_depth) - } -} - -impl<'de> TupleUnpack<'de> for MetricsLockKey { - fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { - let (input, (_, _)) = <(usize, usize)>::unpack(input, tuple_depth)?; - let v = MetricsLockKey {}; - - Ok((input, v)) - } -} diff --git a/engine/packages/gasoline/src/db/kv/keys/workflow.rs b/engine/packages/gasoline/src/db/kv/keys/workflow.rs index 5ef01bb389..494e26ff52 100644 --- a/engine/packages/gasoline/src/db/kv/keys/workflow.rs +++ b/engine/packages/gasoline/src/db/kv/keys/workflow.rs @@ -20,7 +20,7 @@ impl LeaseKey { } impl FormalKey for LeaseKey { - /// Workflow name, worker instance id. + /// Workflow name, worker id. type Value = (String, Id); fn deserialize(&self, raw: &[u8]) -> Result { @@ -1059,17 +1059,17 @@ impl<'de> TupleUnpack<'de> for HasWakeConditionKey { } #[derive(Debug)] -pub struct WorkerInstanceIdKey { +pub struct WorkerIdKey { pub workflow_id: Id, } -impl WorkerInstanceIdKey { +impl WorkerIdKey { pub fn new(workflow_id: Id) -> Self { - WorkerInstanceIdKey { workflow_id } + WorkerIdKey { workflow_id } } } -impl FormalKey for WorkerInstanceIdKey { +impl FormalKey for WorkerIdKey { type Value = Id; fn deserialize(&self, raw: &[u8]) -> Result { @@ -1081,28 +1081,26 @@ impl FormalKey for WorkerInstanceIdKey { } } -impl TuplePack for WorkerInstanceIdKey { +impl TuplePack for WorkerIdKey { fn pack( &self, w: &mut W, tuple_depth: TupleDepth, ) -> std::io::Result { - let t = (WORKFLOW, DATA, self.workflow_id, WORKER_INSTANCE_ID); + let t = (WORKFLOW, DATA, self.workflow_id, WORKER_ID); t.pack(w, tuple_depth) } } -impl<'de> TupleUnpack<'de> for WorkerInstanceIdKey { +impl<'de> TupleUnpack<'de> for WorkerIdKey { fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { let (input, (_, _, workflow_id, data)) = <(usize, usize, Id, usize)>::unpack(input, tuple_depth)?; - if data != WORKER_INSTANCE_ID { - return Err(PackError::Message( - "expected WORKER_INSTANCE_ID data".into(), - )); + if data != WORKER_ID { + return Err(PackError::Message("expected WORKER_ID data".into())); } - let v = WorkerInstanceIdKey { workflow_id }; + let v = WorkerIdKey { workflow_id }; Ok((input, v)) } diff --git a/engine/packages/gasoline/src/db/kv/mod.rs b/engine/packages/gasoline/src/db/kv/mod.rs index a53fdb85ad..43992653e2 100644 --- a/engine/packages/gasoline/src/db/kv/mod.rs +++ b/engine/packages/gasoline/src/db/kv/mod.rs @@ -3,6 +3,7 @@ use std::{ collections::{HashMap, HashSet}, + hash::{DefaultHasher, Hash, Hasher}, sync::Arc, time::Instant, }; @@ -12,6 +13,7 @@ use futures_util::{StreamExt, TryStreamExt, stream::BoxStream}; use rivet_util::Id; use rivet_util::future::CustomInstrumentExt; use serde_json::json; +use tokio::sync::Mutex; use tracing::Instrument; use universaldb::utils::{ FormalChunkedKey, FormalKey, IsolationLevel::*, end_of_key_range, keys::*, @@ -23,7 +25,7 @@ use universaldb::{ use rivet_metrics::KeyValue; -use super::{Database, PulledWorkflowData, SignalData, WorkflowData}; +use super::{BumpSubSubject, Database, PulledWorkflowData, SignalData, WorkflowData}; use crate::{ error::{WorkflowError, WorkflowResult}, history::{ @@ -34,26 +36,28 @@ use crate::{ location::Location, }, metrics, + worker::PING_INTERVAL, }; mod debug; mod keys; +mod subjects; +mod system; -/// How long before considering the leases of a given worker instance expired. -const WORKER_INSTANCE_LOST_THRESHOLD_MS: i64 = rivet_util::duration::seconds(30); +/// How long before considering the leases of a given worker expired. +const WORKER_LOST_THRESHOLD_MS: i64 = rivet_util::duration::seconds(30); /// How long before overwriting an existing metrics lock. const METRICS_LOCK_TIMEOUT_MS: i64 = rivet_util::duration::seconds(30); -/// For pubsub bump mechanism. -const WORKER_BUMP_SUBJECT: &str = "gasoline.worker.bump"; pub struct DatabaseKv { pools: rivet_pools::Pools, subspace: universaldb::utils::Subspace, + system: Mutex, } impl DatabaseKv { - /// Spawns a new thread and publishes a worker bump message to pubsub. - fn bump_workers(&self) { + /// Spawns a new thread and gracefully publishes a bump message to pubsub. + fn bump(&self, subject: BumpSubSubject) { let Ok(pubsub) = self.pools.ups() else { tracing::debug!("failed to acquire pubsub pool"); return; @@ -64,7 +68,7 @@ impl DatabaseKv { // Fail gracefully if let Err(err) = pubsub .publish( - WORKER_BUMP_SUBJECT, + &subjects::convert(subject), &Vec::new(), universalpubsub::PublishOpts::broadcast(), ) @@ -420,16 +424,20 @@ impl Database for DatabaseKv { Ok(Arc::new(DatabaseKv { pools, subspace: universaldb::utils::Subspace::new(&(RIVET, GASOLINE, KV)), + system: Mutex::new(system::SystemInfo::new()), })) } #[tracing::instrument(skip_all)] - async fn bump_sub<'a, 'b>(&'a self) -> WorkflowResult> { + async fn bump_sub<'a, 'b>( + &'a self, + subject: BumpSubSubject, + ) -> WorkflowResult> { let mut subscriber = self .pools .ups() .map_err(WorkflowError::PoolsGeneric)? - .subscribe(WORKER_BUMP_SUBJECT) + .subscribe(&subjects::convert(subject)) .await .map_err(|x| WorkflowError::CreateSubscription(x.into()))?; @@ -451,8 +459,8 @@ impl Database for DatabaseKv { } #[tracing::instrument(skip_all)] - async fn clear_expired_leases(&self, _worker_instance_id: Id) -> WorkflowResult<()> { - let (lost_worker_instance_ids, expired_workflow_count) = self + async fn clear_expired_leases(&self, _worker_id: Id) -> WorkflowResult<()> { + let (lost_worker_ids, expired_workflow_count) = self .pools .udb() .map_err(WorkflowError::PoolsGeneric)? @@ -461,7 +469,7 @@ impl Database for DatabaseKv { let now = rivet_util::timestamp::now(); let mut last_ping_cache: Vec<(Id, i64)> = Vec::new(); - let mut lost_worker_instance_ids = HashSet::new(); + let mut lost_worker_ids = HashSet::new(); let mut expired_workflow_count = 0; let lease_subspace = self @@ -483,15 +491,13 @@ impl Database for DatabaseKv { let lease_key = self .subspace .unpack::(lease_key_entry.key())?; - let (workflow_name, worker_instance_id) = + let (workflow_name, worker_id) = lease_key.deserialize(lease_key_entry.value())?; - let last_ping_ts_key = - keys::worker_instance::LastPingTsKey::new(worker_instance_id); + let last_ping_ts_key = keys::worker::LastPingTsKey::new(worker_id); - // Get last ping of worker instance for this lease - let last_ping_ts = if let Some((_, last_ping_ts)) = last_ping_cache - .iter() - .find(|(k, _)| k == &worker_instance_id) + // Get last ping of worker for this lease + let last_ping_ts = if let Some((_, last_ping_ts)) = + last_ping_cache.iter().find(|(k, _)| k == &worker_id) { *last_ping_ts } else if let Some(last_ping_entry) = tx @@ -506,18 +512,18 @@ impl Database for DatabaseKv { let last_ping_ts = last_ping_ts_key.deserialize(&last_ping_entry)?; // Update cache - last_ping_cache.push((worker_instance_id, last_ping_ts)); + last_ping_cache.push((worker_id, last_ping_ts)); last_ping_ts } else { // Update cache - last_ping_cache.push((worker_instance_id, 0)); + last_ping_cache.push((worker_id, 0)); 0 }; // Worker has not pinged within the threshold, meaning the lease is expired - if last_ping_ts < now - WORKER_INSTANCE_LOST_THRESHOLD_MS { + if last_ping_ts < now - WORKER_LOST_THRESHOLD_MS { // Check if the workflow is silenced and ignore let silence_ts_key = keys::workflow::SilenceTsKey::new(lease_key.workflow_id); @@ -540,9 +546,9 @@ impl Database for DatabaseKv { // Clear lease tx.clear(lease_key_entry.key()); - let worker_instance_id_key = - keys::workflow::WorkerInstanceIdKey::new(lease_key.workflow_id); - tx.clear(&self.subspace.pack(&worker_instance_id_key)); + let worker_id_key = + keys::workflow::WorkerIdKey::new(lease_key.workflow_id); + tx.clear(&self.subspace.pack(&worker_id_key)); // Add immediate wake for workflow let wake_condition_key = keys::wake::WorkflowWakeConditionKey::new( @@ -566,13 +572,13 @@ impl Database for DatabaseKv { ); expired_workflow_count += 1; - lost_worker_instance_ids.insert(worker_instance_id); + lost_worker_ids.insert(worker_id); tracing::debug!(?lease_key.workflow_id, "failed over wf"); } } - Ok((lost_worker_instance_ids, expired_workflow_count)) + Ok((lost_worker_ids, expired_workflow_count)) } }) .custom_instrument(tracing::info_span!("clear_expired_leases_tx")) @@ -581,19 +587,19 @@ impl Database for DatabaseKv { if expired_workflow_count != 0 { tracing::info!( - worker_instance_ids=?lost_worker_instance_ids, + worker_ids=?lost_worker_ids, total_workflows=%expired_workflow_count, "handled failover", ); - self.bump_workers(); + self.bump(BumpSubSubject::Worker); } Ok(()) } #[tracing::instrument(skip_all)] - async fn publish_metrics(&self, _worker_instance_id: Id) -> WorkflowResult<()> { + async fn publish_metrics(&self, _worker_id: Id) -> WorkflowResult<()> { // Attempt to be the only worker publishing metrics by writing to the lock key let acquired_lock = self .pools @@ -605,7 +611,7 @@ impl Database for DatabaseKv { // Read existing lock let lock_expired = if let Some(lock_ts) = tx - .read_opt(&keys::worker_instance::MetricsLockKey::new(), Serializable) + .read_opt(&keys::worker::MetricsLockKey::new(), Serializable) .await? { lock_ts < rivet_util::timestamp::now() - METRICS_LOCK_TIMEOUT_MS @@ -617,7 +623,7 @@ impl Database for DatabaseKv { // Write to lock key. UDB transactions guarantee that if multiple workers are running this // query at the same time only one will succeed which means only one will have the lock. tx.write( - &keys::worker_instance::MetricsLockKey::new(), + &keys::worker::MetricsLockKey::new(), rivet_util::timestamp::now(), )?; } @@ -739,7 +745,7 @@ impl Database for DatabaseKv { .udb() .map_err(WorkflowError::PoolsGeneric)? .run(|tx| async move { - let metrics_lock_key = keys::worker_instance::MetricsLockKey::new(); + let metrics_lock_key = keys::worker::MetricsLockKey::new(); tx.clear(&self.subspace.pack(&metrics_lock_key)); Ok(()) @@ -753,30 +759,36 @@ impl Database for DatabaseKv { } #[tracing::instrument(skip_all)] - async fn update_worker_ping(&self, worker_instance_id: Id) -> WorkflowResult<()> { + async fn update_worker_ping(&self, worker_id: Id) -> WorkflowResult<()> { metrics::WORKER_LAST_PING.record( rivet_util::timestamp::now() as u64, - &[KeyValue::new( - "worker_instance_id", - worker_instance_id.to_string(), - )], + &[KeyValue::new("worker_id", worker_id.to_string())], ); self.pools .udb() .map_err(WorkflowError::PoolsGeneric)? - .run(|tx| { - async move { - // Update worker instance ping - let last_ping_ts_key = - keys::worker_instance::LastPingTsKey::new(worker_instance_id); - tx.set( - &self.subspace.pack(&last_ping_ts_key), - &last_ping_ts_key.serialize(rivet_util::timestamp::now())?, - ); + .run(|tx| async move { + let tx = tx.with_subspace(self.subspace.clone()); - Ok(()) + let last_ping_ts = rivet_util::timestamp::now(); + let last_ping_ts_key = keys::worker::LastPingTsKey::new(worker_id); + + if let Some(last_last_ping_ts) = + tx.read_opt(&last_ping_ts_key, Serializable).await? + { + let active_worker_idx_key = + keys::worker::ActiveWorkerIdxKey::new(last_last_ping_ts, worker_id); + tx.delete(&active_worker_idx_key); } + + tx.write(&last_ping_ts_key, last_ping_ts)?; + + let active_worker_idx_key = + keys::worker::ActiveWorkerIdxKey::new(last_ping_ts, worker_id); + tx.write(&active_worker_idx_key, ())?; + + Ok(()) }) .custom_instrument(tracing::info_span!("update_worker_ping_tx")) .await @@ -785,6 +797,31 @@ impl Database for DatabaseKv { Ok(()) } + #[tracing::instrument(skip_all)] + async fn mark_worker_inactive(&self, worker_id: Id) -> WorkflowResult<()> { + self.pools + .udb() + .map_err(WorkflowError::PoolsGeneric)? + .run(|tx| async move { + let last_ping_ts_key = keys::worker::LastPingTsKey::new(worker_id); + + if let Some(last_last_ping_ts) = + tx.read_opt(&last_ping_ts_key, Serializable).await? + { + let active_worker_idx_key = + keys::worker::ActiveWorkerIdxKey::new(last_last_ping_ts, worker_id); + tx.delete(&active_worker_idx_key); + } + + Ok(()) + }) + .custom_instrument(tracing::info_span!("mark_worker_inactive_tx")) + .await + .map_err(WorkflowError::Udb)?; + + Ok(()) + } + #[tracing::instrument(skip_all, fields(%workflow_id, %workflow_name, unique))] async fn dispatch_workflow( &self, @@ -815,7 +852,7 @@ impl Database for DatabaseKv { .await .map_err(WorkflowError::Udb)?; - self.bump_workers(); + self.bump(BumpSubSubject::Worker); Ok(workflow_id) } @@ -949,7 +986,7 @@ impl Database for DatabaseKv { #[tracing::instrument(skip_all)] async fn pull_workflows( &self, - worker_instance_id: Id, + worker_id: Id, filter: &[&str], ) -> WorkflowResult> { let start_instant = Instant::now(); @@ -966,125 +1003,186 @@ impl Database for DatabaseKv { let owned_filter = owned_filter.clone(); async move { + let tx = tx.with_subspace(self.subspace.clone()); let now = rivet_util::timestamp::now(); // All wake conditions with a timestamp after this timestamp will be pulled let pull_before = now + i64::try_from(self.worker_poll_interval().as_millis())?; + // Only consider workers that have pinged within 2 ping intervals ago + let active_workers_after = now - i64::try_from(PING_INTERVAL.as_millis() * 2)?; + + // Determine load shedding ratio based on linear mapping on cpu usage. We will gradually + // pull less workflows as the cpu usage increases + let cpu_usage = { self.system.lock().await.cpu_usage() }; + let load_shed_ratio_x1000 = + calc_pull_ratio((cpu_usage.max(100.0) * 10.0) as u64, 500, 1000, 800, 100); + + let active_worker_subspace_start = tx.pack( + &keys::worker::ActiveWorkerIdxKey::subspace(active_workers_after), + ); + let active_worker_subspace_end = self + .subspace + .subspace(&keys::worker::ActiveWorkerIdxKey::entire_subspace()) + .range() + .1; // Pull all available wake conditions from all registered wf names - let entries = futures_util::stream::iter(owned_filter) - .map(|wf_name| { - let wake_subspace_start = self - .subspace - .subspace( + let (mut active_worker_ids, wake_keys) = tokio::try_join!( + // Check + tx.get_ranges_keyvalues( + universaldb::RangeOption { + mode: StreamingMode::WantAll, + ..(active_worker_subspace_start, active_worker_subspace_end).into() + }, + // This is Snapshot to reduce contention and exact timestamps are not important + Snapshot, + ) + .map(|res| { + let key = tx.unpack::(res?.key())?; + Ok(key.worker_id) + }) + .try_collect::>(), + futures_util::stream::iter(owned_filter) + .map(|wf_name| { + let wake_subspace_start = end_of_key_range(&tx.pack( &keys::wake::WorkflowWakeConditionKey::subspace_without_ts( wf_name.clone(), ), + )); + let wake_subspace_end = + tx.pack(&keys::wake::WorkflowWakeConditionKey::subspace( + wf_name, + pull_before, + )); + + tx.read_range( + universaldb::RangeOption { + mode: StreamingMode::WantAll, + ..(wake_subspace_start, wake_subspace_end).into() + }, + // This is Snapshot to reduce contention with any new wake conditions + // being inserted. Conflicts are handled by workflow leases. + Snapshot, ) - .bytes() - .iter() - .map(|x| *x) - // https://github.com/apple/foundationdb/blob/main/design/tuple.md - .chain(std::iter::once(0x00)) - .collect::>(); - let wake_subspace_end = self - .subspace - .subspace(&keys::wake::WorkflowWakeConditionKey::subspace( - wf_name, - pull_before, - )) - .bytes() - .to_vec(); - - tx.get_ranges_keyvalues( - universaldb::RangeOption { - mode: StreamingMode::WantAll, - ..(wake_subspace_start, wake_subspace_end).into() - }, - // Must be a Snapshot to not conflict with any new wake conditions being - // inserted - Snapshot, - ) - }) - .flatten() - .map(|res| { - let entry = res?; + }) + .flatten() + .map(|res| tx.unpack::(res?.key())) + .try_collect::>(), + )?; - anyhow::Ok(( - entry.key().to_vec(), - self.subspace - .unpack::(entry.key())?, - )) - }) - .try_collect::>() - .await?; + // Sort for consistency across all workers + active_worker_ids.sort(); + + // Get a globally unique idx for the current worker relative to all active workers + let current_worker_idx = if let Some(current_worker_idx) = active_worker_ids + .iter() + .enumerate() + .find_map(|(i, other_worker_id)| { + (&worker_id == other_worker_id).then_some(i) + }) { + current_worker_idx as u64 + } else { + tracing::error!( + ?worker_id, + "current worker should have valid ping, defaulting to worker index 0" + ); + + 0 + }; + let active_worker_count = active_worker_ids.len() as u64; // Collect name and deadline ts for each wf id - let mut dedup_workflows: Vec<(Id, String, Option)> = Vec::new(); - for (_, key) in &entries { - if let Some((_, _, last_wake_deadline_ts)) = dedup_workflows + let mut dedup_workflows: Vec = Vec::new(); + for wake_key in &wake_keys { + if let Some(wf) = dedup_workflows .iter_mut() - .find(|(wf_id, _, _)| wf_id == &key.workflow_id) + .find(|wf| wf.workflow_id == wake_key.workflow_id) { - let wake_deadline_ts = key.condition.deadline_ts(); + let key_wake_deadline_ts = wake_key.condition.deadline_ts(); + + // Update wake condition ts if earlier + if wake_key.ts < wf.earliest_wake_condition_ts { + wf.earliest_wake_condition_ts = wake_key.ts; + } // Update wake deadline ts if earlier - if last_wake_deadline_ts.is_none() - || wake_deadline_ts < *last_wake_deadline_ts + if wf.wake_deadline_ts.is_none() + || key_wake_deadline_ts < wf.wake_deadline_ts { - *last_wake_deadline_ts = wake_deadline_ts; + wf.wake_deadline_ts = key_wake_deadline_ts; } continue; } - dedup_workflows.push(( - key.workflow_id, - key.workflow_name.clone(), - key.condition.deadline_ts(), - )); + dedup_workflows.push(MinimalPulledWorkflow { + workflow_id: wake_key.workflow_id, + workflow_name: wake_key.workflow_name.clone(), + wake_deadline_ts: wake_key.condition.deadline_ts(), + earliest_wake_condition_ts: wake_key.ts, + }); } + // Filter workflows in a way that spreads all current pending workflows across all active + // workers evenly + let assigned_workflows = dedup_workflows.into_iter().filter(|wf| { + let mut hasher = DefaultHasher::new(); + + // Earliest wake condition ts is consistent for hashing purposes because when it + // changes it means a worker has leased it + wf.earliest_wake_condition_ts.hash(&mut hasher); + let wf_hash = hasher.finish(); + + let pseudorandom_value_x1000 = { + // Add a little pizazz to the hash so its a different number than wf_hash but + // still consistent + 1234i32.hash(&mut hasher); + hasher.finish() % 1000 // 0-1000 + }; + + if pseudorandom_value_x1000 < load_shed_ratio_x1000 { + return false; + } + + let wf_worker_idx = wf_hash % active_worker_count; + + // Every worker pulls workflows that has to the current worker as well as the next + // worker for redundancy. this results in increased txn conflicts but less chance of + // orphaned workflows + let next_worker_idx = (current_worker_idx + 1) % active_worker_count; + + wf_worker_idx == current_worker_idx || wf_worker_idx == next_worker_idx + }); + // Check leases - let leased_workflows = futures_util::stream::iter(dedup_workflows) - .map(|(workflow_id, workflow_name, wake_deadline_ts)| { + let leased_workflows = futures_util::stream::iter(assigned_workflows) + .map(|wf| { let tx = tx.clone(); async move { - let lease_key = keys::workflow::LeaseKey::new(workflow_id); - let lease_key_buf = self.subspace.pack(&lease_key); + let lease_key = keys::workflow::LeaseKey::new(wf.workflow_id); // Check lease - if tx.get(&lease_key_buf, Serializable).await?.is_some() { + if tx.exists(&lease_key, Serializable).await? { Result::<_>::Ok(None) } else { - // Write lease - tx.set( - &lease_key_buf, - &lease_key.serialize(( - workflow_name.clone(), - worker_instance_id, - ))?, - ); + tx.write(&lease_key, (wf.workflow_name.clone(), worker_id))?; - // Write worker instance id - let worker_instance_id_key = - keys::workflow::WorkerInstanceIdKey::new(workflow_id); - tx.set( - &self.subspace.pack(&worker_instance_id_key), - &worker_instance_id_key.serialize(worker_instance_id)?, - ); + tx.write( + &keys::workflow::WorkerIdKey::new(wf.workflow_id), + worker_id, + )?; update_metric( - &tx.with_subspace(self.subspace.clone()), + &tx, Some(keys::metric::GaugeMetric::WorkflowSleeping( - workflow_name.clone(), + wf.workflow_name.clone(), )), Some(keys::metric::GaugeMetric::WorkflowActive( - workflow_name.clone(), + wf.workflow_name.clone(), )), ); - Ok(Some((workflow_id, workflow_name, wake_deadline_ts))) + Ok(Some(wf)) } } }) @@ -1095,22 +1193,21 @@ impl Database for DatabaseKv { .instrument(tracing::trace_span!("map_to_leased_workflows")) .await?; - for (raw_key, key) in &entries { - // Filter unleased entries + // Clear all wake conditions from workflows that we have leased + for wake_key in &wake_keys { if !leased_workflows .iter() - .any(|(wf_id, _, _)| wf_id == &key.workflow_id) + .any(|wf| wf.workflow_id == wake_key.workflow_id) { continue; } - // Clear fetched wake conditions - tx.clear(raw_key); + tx.delete(wake_key); } let leased_workflow_ids = leased_workflows .iter() - .map(|(workflow_id, _, _)| *workflow_id) + .map(|wf| wf.workflow_id) .collect::>(); // Clear secondary indexes so that we don't get any new wake conditions inserted while @@ -1160,22 +1257,12 @@ impl Database for DatabaseKv { .await .map_err(WorkflowError::Udb)?; - let worker_instance_id_str = worker_instance_id.to_string(); + let worker_id_str = worker_id.to_string(); let dt = start_instant.elapsed().as_secs_f64(); - metrics::LAST_PULL_WORKFLOWS_DURATION.record( - dt, - &[KeyValue::new( - "worker_instance_id", - worker_instance_id_str.clone(), - )], - ); - metrics::PULL_WORKFLOWS_DURATION.record( - dt, - &[KeyValue::new( - "worker_instance_id", - worker_instance_id_str.clone(), - )], - ); + metrics::LAST_PULL_WORKFLOWS_DURATION + .record(dt, &[KeyValue::new("worker_id", worker_id_str.clone())]); + metrics::PULL_WORKFLOWS_DURATION + .record(dt, &[KeyValue::new("worker_id", worker_id_str.clone())]); if leased_workflows.is_empty() { return Ok(Vec::new()); @@ -1193,18 +1280,19 @@ impl Database for DatabaseKv { async move { // Read required wf data for each leased wf futures_util::stream::iter(leased_workflows) - .map(|(workflow_id, workflow_name, wake_deadline_ts)| { + .map(|wf| { let tx = tx.clone(); async move { - let create_ts_key = keys::workflow::CreateTsKey::new(workflow_id); - let ray_id_key = keys::workflow::RayIdKey::new(workflow_id); - let input_key = keys::workflow::InputKey::new(workflow_id); - let state_key = keys::workflow::StateKey::new(workflow_id); + let create_ts_key = + keys::workflow::CreateTsKey::new(wf.workflow_id); + let ray_id_key = keys::workflow::RayIdKey::new(wf.workflow_id); + let input_key = keys::workflow::InputKey::new(wf.workflow_id); + let state_key = keys::workflow::StateKey::new(wf.workflow_id); let input_subspace = self.subspace.subspace(&input_key); let state_subspace = self.subspace.subspace(&state_key); let active_history_subspace = self.subspace.subspace( &keys::history::HistorySubspaceKey::new( - workflow_id, + wf.workflow_id, keys::history::HistorySubspaceVariant::Active, ), ); @@ -1417,13 +1505,13 @@ impl Database for DatabaseKv { }; Result::<_>::Ok(PulledWorkflowData { - workflow_id, - workflow_name, + workflow_id: wf.workflow_id, + workflow_name: wf.workflow_name, create_ts, ray_id, input, state, - wake_deadline_ts, + wake_deadline_ts: wf.wake_deadline_ts, events, }) } @@ -1441,34 +1529,16 @@ impl Database for DatabaseKv { let dt2 = start_instant2.elapsed().as_secs_f64(); let dt = start_instant.elapsed().as_secs_f64(); - metrics::LAST_PULL_WORKFLOWS_FULL_DURATION.record( - dt, - &[KeyValue::new( - "worker_instance_id", - worker_instance_id_str.clone(), - )], - ); - metrics::PULL_WORKFLOWS_FULL_DURATION.record( - dt, - &[KeyValue::new( - "worker_instance_id", - worker_instance_id_str.clone(), - )], - ); + metrics::LAST_PULL_WORKFLOWS_FULL_DURATION + .record(dt, &[KeyValue::new("worker_id", worker_id_str.clone())]); + metrics::PULL_WORKFLOWS_FULL_DURATION + .record(dt, &[KeyValue::new("worker_id", worker_id_str.clone())]); metrics::LAST_PULL_WORKFLOWS_HISTORY_DURATION.record( dt2 as u64, - &[KeyValue::new( - "worker_instance_id", - worker_instance_id_str.clone(), - )], - ); - metrics::PULL_WORKFLOWS_HISTORY_DURATION.record( - dt2, - &[KeyValue::new( - "worker_instance_id", - worker_instance_id_str.clone(), - )], + &[KeyValue::new("worker_id", worker_id_str.clone())], ); + metrics::PULL_WORKFLOWS_HISTORY_DURATION + .record(dt2, &[KeyValue::new("worker_id", worker_id_str.clone())]); Ok(pulled_workflows) } @@ -1610,9 +1680,8 @@ impl Database for DatabaseKv { // Clear lease let lease_key = keys::workflow::LeaseKey::new(workflow_id); tx.clear(&self.subspace.pack(&lease_key)); - let worker_instance_id_key = - keys::workflow::WorkerInstanceIdKey::new(workflow_id); - tx.clear(&self.subspace.pack(&worker_instance_id_key)); + let worker_id_key = keys::workflow::WorkerIdKey::new(workflow_id); + tx.clear(&self.subspace.pack(&worker_id_key)); update_metric( &tx.with_subspace(self.subspace.clone()), @@ -1633,7 +1702,8 @@ impl Database for DatabaseKv { // Wake worker again in case some other workflow was waiting for this one to complete if wrote_to_wake_idx { - self.bump_workers(); + self.bump(BumpSubSubject::WorkflowComplete { workflow_id }); + self.bump(BumpSubSubject::Worker); } let dt = start_instant.elapsed().as_secs_f64(); @@ -1759,9 +1829,8 @@ impl Database for DatabaseKv { // Clear lease let lease_key = keys::workflow::LeaseKey::new(workflow_id); tx.clear(&self.subspace.pack(&lease_key)); - let worker_instance_id_key = - keys::workflow::WorkerInstanceIdKey::new(workflow_id); - tx.clear(&self.subspace.pack(&worker_instance_id_key)); + let worker_id_key = keys::workflow::WorkerIdKey::new(workflow_id); + tx.clear(&self.subspace.pack(&worker_id_key)); update_metric( &tx.with_subspace(self.subspace.clone()), @@ -1802,7 +1871,7 @@ impl Database for DatabaseKv { // - would involve informing the worker to restart the workflow in memory instead of the usual // workflow lifecycle // - the worker is already designed to pull wake conditions frequently - self.bump_workers(); + self.bump(BumpSubSubject::Worker); let dt = start_instant.elapsed().as_secs_f64(); metrics::COMMIT_WORKFLOW_DURATION.record( @@ -2111,7 +2180,10 @@ impl Database for DatabaseKv { .await .map_err(WorkflowError::Udb)?; - self.bump_workers(); + self.bump(BumpSubSubject::SignalPublish { + to_workflow_id: workflow_id, + }); + self.bump(BumpSubSubject::Worker); Ok(()) } @@ -2163,7 +2235,8 @@ impl Database for DatabaseKv { .await .map_err(WorkflowError::Udb)?; - self.bump_workers(); + self.bump(BumpSubSubject::SignalPublish { to_workflow_id }); + self.bump(BumpSubSubject::Worker); Ok(()) } @@ -2219,7 +2292,7 @@ impl Database for DatabaseKv { .await .map_err(WorkflowError::Udb)?; - self.bump_workers(); + self.bump(BumpSubSubject::Worker); Ok(sub_workflow_id) } @@ -2696,6 +2769,14 @@ impl Database for DatabaseKv { } } +#[derive(Debug, Clone)] +struct MinimalPulledWorkflow { + workflow_id: Id, + workflow_name: String, + wake_deadline_ts: Option, + earliest_wake_condition_ts: i64, +} + fn update_metric( tx: &universaldb::Transaction, previous: Option, @@ -2949,3 +3030,15 @@ fn value_to_str(v: &serde_json::Value) -> WorkflowResult { _ => cjson::to_string(&v).map_err(WorkflowError::CjsonSerializeTags), } } + +fn calc_pull_ratio(x: u64, ax: u64, ay: u64, bx: u64, by: u64) -> u64 { + // must have neg slope, inversely proportional + assert!(ax < bx); + assert!(ay > by); + + let neg_dy = ay - by; + let dx = bx - ax; + let neg_b = ay * neg_dy / dx; + + return neg_b.saturating_sub(x * neg_dy / dx); +} diff --git a/engine/packages/gasoline/src/db/kv/subjects.rs b/engine/packages/gasoline/src/db/kv/subjects.rs new file mode 100644 index 0000000000..420d79c4aa --- /dev/null +++ b/engine/packages/gasoline/src/db/kv/subjects.rs @@ -0,0 +1,13 @@ +use crate::db::BumpSubSubject; + +pub fn convert(subject: BumpSubSubject) -> String { + match subject { + BumpSubSubject::Worker => "gasoline.worker.bump".into(), + BumpSubSubject::WorkflowComplete { workflow_id } => { + format!("gasoline.workflow.complete.{workflow_id}") + } + BumpSubSubject::SignalPublish { to_workflow_id } => { + format!("gasoline.signal.for-workflow.{to_workflow_id}") + } + } +} diff --git a/engine/packages/gasoline/src/db/kv/system.rs b/engine/packages/gasoline/src/db/kv/system.rs new file mode 100644 index 0000000000..fc09dee12c --- /dev/null +++ b/engine/packages/gasoline/src/db/kv/system.rs @@ -0,0 +1,33 @@ +use std::time::Instant; + +use sysinfo::{CpuRefreshKind, MINIMUM_CPU_UPDATE_INTERVAL, RefreshKind, System}; + +pub struct SystemInfo { + system: System, + last_cpu_usage_read: Instant, +} + +impl SystemInfo { + pub fn new() -> Self { + SystemInfo { + system: System::new_with_specifics( + RefreshKind::nothing().with_cpu(CpuRefreshKind::nothing().with_cpu_usage()), + ), + last_cpu_usage_read: Instant::now(), + } + } + + /// Returns a float 0.0-100.0 of the avg cpu usage over the entire system. + pub fn cpu_usage(&mut self) -> f32 { + if self.last_cpu_usage_read.elapsed() > MINIMUM_CPU_UPDATE_INTERVAL { + self.system.refresh_cpu_usage(); + self.last_cpu_usage_read = Instant::now(); + } + + self.system + .cpus() + .iter() + .fold(0.0, |s, cpu| s + cpu.cpu_usage()) + / self.system.cpus().len() as f32 + } +} diff --git a/engine/packages/gasoline/src/db/mod.rs b/engine/packages/gasoline/src/db/mod.rs index 7b5d2a11cb..a4e94020ca 100644 --- a/engine/packages/gasoline/src/db/mod.rs +++ b/engine/packages/gasoline/src/db/mod.rs @@ -57,19 +57,25 @@ pub trait Database: Send { // MARK: Worker fns - /// This function returns a subscription which should resolve once the worker should fetch the database - /// again. - async fn bump_sub<'a, 'b>(&'a self) -> WorkflowResult>; + /// This function returns a subscription which should resolve once the expected event given the subject + /// occurs. + async fn bump_sub<'a, 'b>( + &'a self, + subject: BumpSubSubject, + ) -> WorkflowResult>; /// Updates the last ping ts for this worker. - async fn update_worker_ping(&self, worker_instance_id: Id) -> WorkflowResult<()>; + async fn update_worker_ping(&self, worker_id: Id) -> WorkflowResult<()>; + + /// Removes the worker from consideration for `pull_workflows` delegation. + async fn mark_worker_inactive(&self, worker_id: Id) -> WorkflowResult<()>; /// Releases workflows that were leased by workers that have since expired (their last ping has passed /// the expired threshold), making them eligible to be run again. Called periodically. - async fn clear_expired_leases(&self, worker_instance_id: Id) -> WorkflowResult<()>; + async fn clear_expired_leases(&self, worker_id: Id) -> WorkflowResult<()>; /// Function to publish metrics. Called periodically. - async fn publish_metrics(&self, worker_instance_id: Id) -> WorkflowResult<()>; + async fn publish_metrics(&self, worker_id: Id) -> WorkflowResult<()>; // MARK: Workflows/signals @@ -96,10 +102,10 @@ pub trait Database: Send { ) -> WorkflowResult>; /// Pulls workflows for processing by the worker. Will only pull workflows with names matching the filter. - /// Should also update the ping of this worker instance. + /// Should also update the ping of this worker. async fn pull_workflows( &self, - worker_instance_id: Id, + worker_id: Id, filter: &[&str], ) -> WorkflowResult>; @@ -330,3 +336,9 @@ pub struct SignalData { pub body: Box, pub create_ts: i64, } + +pub enum BumpSubSubject { + Worker, + WorkflowComplete { workflow_id: Id }, + SignalPublish { to_workflow_id: Id }, +} diff --git a/engine/packages/gasoline/src/metrics.rs b/engine/packages/gasoline/src/metrics.rs index 71afdd524b..5687c8d2a8 100644 --- a/engine/packages/gasoline/src/metrics.rs +++ b/engine/packages/gasoline/src/metrics.rs @@ -5,33 +5,33 @@ use rivet_metrics::{ lazy_static::lazy_static! { static ref METER: Meter = meter("rivet-gasoline"); - /// Expected attributes: "worker_instance_id" + /// Expected attributes: "worker_id" pub static ref WORKER_LAST_PING: Gauge = METER.u64_gauge("rivet_gasoline_worker_last_ping") - .with_description("Last ping of a worker instance as a unix ts.") + .with_description("Last ping of a worker as a unix ts.") .build(); - /// Expected attributes: "worker_instance_id" + /// Expected attributes: "worker_id" pub static ref LAST_PULL_WORKFLOWS_DURATION: Gauge = METER.f64_gauge("rivet_gasoline_last_pull_workflows_duration") .with_description("Last duration of pulling workflow data.") .build(); - /// Expected attributes: "worker_instance_id" + /// Expected attributes: "worker_id" pub static ref LAST_PULL_WORKFLOWS_HISTORY_DURATION: Gauge = METER.u64_gauge("rivet_gasoline_last_pull_workflows_history_duration") .with_description("Last duration of pulling workflow histories.") .build(); - /// Expected attributes: "worker_instance_id" + /// Expected attributes: "worker_id" pub static ref LAST_PULL_WORKFLOWS_FULL_DURATION: Gauge = METER.f64_gauge("rivet_gasoline_last_pull_workflows_full_duration") .with_description("Last duration of pulling workflow data and history.") .build(); - /// Expected attributes: "worker_instance_id" + /// Expected attributes: "worker_id" pub static ref PULL_WORKFLOWS_DURATION: Histogram = METER.f64_histogram("rivet_gasoline_pull_workflows_duration") .with_description("Duration of pulling workflow data.") .with_boundaries(BUCKETS.to_vec()) .build(); - /// Expected attributes: "worker_instance_id" + /// Expected attributes: "worker_id" pub static ref PULL_WORKFLOWS_HISTORY_DURATION: Histogram = METER.f64_histogram("rivet_gasoline_pull_workflows_history_duration") .with_description("Duration of pulling workflow histories.") .with_boundaries(BUCKETS.to_vec()) .build(); - /// Expected attributes: "worker_instance_id" + /// Expected attributes: "worker_id" pub static ref PULL_WORKFLOWS_FULL_DURATION: Histogram = METER.f64_histogram("rivet_gasoline_pull_workflows_full_duration") .with_description("Duration of pulling workflow data and history.") .with_boundaries(BUCKETS.to_vec()) diff --git a/engine/packages/gasoline/src/worker.rs b/engine/packages/gasoline/src/worker.rs index 5321656a28..1b3bde06fd 100644 --- a/engine/packages/gasoline/src/worker.rs +++ b/engine/packages/gasoline/src/worker.rs @@ -11,10 +11,15 @@ use tokio::{signal::ctrl_c, sync::watch, task::JoinHandle}; use tracing::Instrument; use tracing_opentelemetry::OpenTelemetrySpanExt; -use crate::{ctx::WorkflowCtx, db::DatabaseHandle, error::WorkflowError, registry::RegistryHandle}; +use crate::{ + ctx::WorkflowCtx, + db::{BumpSubSubject, DatabaseHandle}, + error::WorkflowError, + registry::RegistryHandle, +}; /// How often to run gc and update ping. -const PING_INTERVAL: Duration = Duration::from_secs(10); +pub(crate) const PING_INTERVAL: Duration = Duration::from_secs(10); /// How often to publish metrics. const METRICS_INTERVAL: Duration = Duration::from_secs(20); /// Time to allow running workflows to shutdown after receiving a SIGINT or SIGTERM. @@ -26,7 +31,7 @@ const PULL_WORKFLOWS_TIMEOUT: Duration = Duration::from_secs(10); /// that are registered in its registry. After pulling, the workflows are ran and their state is written to /// the database. pub struct Worker { - worker_instance_id: Id, + worker_id: Id, registry: RegistryHandle, db: DatabaseHandle, @@ -45,7 +50,7 @@ impl Worker { pools: rivet_pools::Pools, ) -> Self { Worker { - worker_instance_id: Id::new_v1(config.dc_label()), + worker_id: Id::new_v1(config.dc_label()), registry, db, @@ -58,23 +63,30 @@ impl Worker { } /// Polls the database periodically or wakes immediately when `Database::bump_sub` finishes - #[tracing::instrument(skip_all, fields(worker_instance_id=%self.worker_instance_id))] + #[tracing::instrument(skip_all, fields(worker_id=%self.worker_id))] pub async fn start(mut self, mut shutdown_rx: Option>) -> Result<()> { tracing::debug!( registered_workflows = ?self.registry.size(), - "started worker instance", + "started worker", ); let cache = rivet_cache::CacheInner::from_env(&self.config, self.pools.clone())?; - let mut bump_sub = { self.db.bump_sub().await? }; + let mut bump_sub = { self.db.bump_sub(BumpSubSubject::Worker).await? }; let mut tick_interval = tokio::time::interval(self.db.worker_poll_interval()); tick_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); let mut term_signal = - TermSignal::new().expect("failed to setup termination signal handler"); + TermSignal::new().context("failed to setup termination signal handler")?; + + // Update ping at least once before doing anything else + self.db + .update_worker_ping(self.worker_id) + .await + .context("failed updating worker ping")?; + // Create handles for bg tasks let mut gc_handle = self.gc(); let mut metrics_handle = self.publish_metrics(); @@ -139,79 +151,6 @@ impl Worker { res } - #[tracing::instrument(skip_all)] - async fn shutdown(mut self, mut term_signal: TermSignal) { - // Shutdown sequence - tracing::info!( - duration=?SHUTDOWN_DURATION, - remaining_workflows=?self.running_workflows.len(), - "starting worker shutdown" - ); - - let shutdown_start = Instant::now(); - - for (workflow_id, wf) in &self.running_workflows { - if wf.stop.send(()).is_err() { - tracing::warn!( - ?workflow_id, - "stop channel closed, workflow likely already stopped" - ); - } - } - - let mut second_sigterm = false; - loop { - self.running_workflows - .retain(|_, wf| !wf.handle.is_finished()); - - // Shutdown complete - if self.running_workflows.is_empty() { - break; - } - - if shutdown_start.elapsed() > SHUTDOWN_DURATION { - tracing::debug!("shutdown timed out"); - break; - } - - tokio::select! { - _ = ctrl_c() => { - if second_sigterm { - tracing::warn!("received third SIGTERM, aborting shutdown"); - break; - } - - tracing::warn!("received second SIGTERM"); - second_sigterm = true; - - continue; - } - _ = term_signal.recv() => { - if second_sigterm { - tracing::warn!("received third SIGTERM, aborting shutdown"); - break; - } - - tracing::warn!("received second SIGTERM"); - second_sigterm = true; - - continue; - } - _ = tokio::time::sleep(Duration::from_secs(2)) => {} - } - } - - if self.running_workflows.is_empty() { - tracing::info!("all workflows evicted"); - } else { - tracing::warn!(remaining_workflows=?self.running_workflows.len(), "not all workflows evicted"); - } - - tracing::info!("shutdown complete"); - - rivet_runtime::shutdown().await; - } - /// Query the database for new workflows and run them. #[tracing::instrument(skip_all)] async fn tick(&mut self, cache: &rivet_cache::Cache) -> Result<()> { @@ -226,7 +165,7 @@ impl Worker { // Query awake workflows let workflows = tokio::time::timeout( PULL_WORKFLOWS_TIMEOUT, - self.db.pull_workflows(self.worker_instance_id, &filter), + self.db.pull_workflows(self.worker_id, &filter), ) .await .context("took too long pulling workflows, worker cannot continue")??; @@ -285,7 +224,7 @@ impl Worker { fn gc(&self) -> JoinHandle<()> { let db = self.db.clone(); - let worker_instance_id = self.worker_instance_id; + let worker_id = self.worker_id; tokio::task::spawn( async move { @@ -295,11 +234,11 @@ impl Worker { loop { ping_interval.tick().await; - if let Err(err) = db.update_worker_ping(worker_instance_id).await { + if let Err(err) = db.update_worker_ping(worker_id).await { tracing::error!(?err, "unhandled update ping error"); } - if let Err(err) = db.clear_expired_leases(worker_instance_id).await { + if let Err(err) = db.clear_expired_leases(worker_id).await { tracing::error!(?err, "unhandled gc error"); } } @@ -310,7 +249,7 @@ impl Worker { fn publish_metrics(&self) -> JoinHandle<()> { let db = self.db.clone(); - let worker_instance_id = self.worker_instance_id; + let worker_id = self.worker_id; tokio::task::spawn( async move { @@ -320,7 +259,7 @@ impl Worker { loop { metrics_interval.tick().await; - if let Err(err) = db.publish_metrics(worker_instance_id).await { + if let Err(err) = db.publish_metrics(worker_id).await { tracing::error!(?err, "unhandled metrics error"); } } @@ -328,6 +267,83 @@ impl Worker { .instrument(tracing::info_span!("worker_metrics_task")), ) } + + #[tracing::instrument(skip_all)] + async fn shutdown(mut self, mut term_signal: TermSignal) { + // Shutdown sequence + tracing::info!( + duration=?SHUTDOWN_DURATION, + remaining_workflows=?self.running_workflows.len(), + "starting worker shutdown" + ); + + let shutdown_start = Instant::now(); + + if let Err(err) = self.db.mark_worker_inactive(self.worker_id).await { + tracing::error!(?err, worker_id=?self.worker_id, "failed to mark worker as inactive"); + } + + for (workflow_id, wf) in &self.running_workflows { + if wf.stop.send(()).is_err() { + tracing::warn!( + ?workflow_id, + "stop channel closed, workflow likely already stopped" + ); + } + } + + let mut second_sigterm = false; + loop { + self.running_workflows + .retain(|_, wf| !wf.handle.is_finished()); + + // Shutdown complete + if self.running_workflows.is_empty() { + break; + } + + if shutdown_start.elapsed() > SHUTDOWN_DURATION { + tracing::debug!("shutdown timed out"); + break; + } + + tokio::select! { + _ = ctrl_c() => { + if second_sigterm { + tracing::warn!("received third SIGTERM, aborting shutdown"); + break; + } + + tracing::warn!("received second SIGTERM"); + second_sigterm = true; + + continue; + } + _ = term_signal.recv() => { + if second_sigterm { + tracing::warn!("received third SIGTERM, aborting shutdown"); + break; + } + + tracing::warn!("received second SIGTERM"); + second_sigterm = true; + + continue; + } + _ = tokio::time::sleep(Duration::from_secs(2)) => {} + } + } + + if self.running_workflows.is_empty() { + tracing::info!("all workflows evicted"); + } else { + tracing::warn!(remaining_workflows=?self.running_workflows.len(), "not all workflows evicted"); + } + + tracing::info!("shutdown complete"); + + rivet_runtime::shutdown().await; + } } struct WorkflowHandle { diff --git a/engine/packages/universaldb/src/utils/keys.rs b/engine/packages/universaldb/src/utils/keys.rs index 73e40ec8c9..9257733a92 100644 --- a/engine/packages/universaldb/src/utils/keys.rs +++ b/engine/packages/universaldb/src/utils/keys.rs @@ -48,14 +48,14 @@ define_keys! { (20, WORKFLOW_ID, "workflow_id"), (21, WAKE, "wake"), (22, SUB_WORKFLOW, "sub_workflow"), - (23, WORKER_INSTANCE, "worker_instance"), + (23, WORKER, "worker"), (24, LAST_PING_TS, "last_ping_ts"), (25, METRICS_LOCK, "metrics_lock"), (26, ERROR, "error"), (27, WAKE_SUB_WORKFLOW_ID, "wake_sub_workflow_id"), (28, BY_NAME_AND_TAG, "by_name_and_tag"), (29, HAS_WAKE_CONDITION, "has_wake_condition"), - (30, WORKER_INSTANCE_ID, "worker_instance_id"), + (30, WORKER_ID, "worker_id"), (31, DBS, "dbs"), (32, ACTOR, "actor"), (33, BY_NAME, "by_name"), diff --git a/engine/packages/util-id/src/lib.rs b/engine/packages/util-id/src/lib.rs index dd86754a8e..e3dffd7986 100644 --- a/engine/packages/util-id/src/lib.rs +++ b/engine/packages/util-id/src/lib.rs @@ -30,7 +30,7 @@ pub enum IdError { UnsupportedVersion(u8), } -#[derive(Clone, Copy, PartialEq, Eq, Hash)] +#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] pub enum Id { V1([u8; 18]), }