Skip to content

Commit 5c2b9e2

Browse files
authored
Blob store separation (#1644)
* Add support for JSON and SSE responses to invoke endpoint * blob store separation * fix gc url * fix gc bug * fix gc test * initialize blob storage for each namespace
1 parent a952c3d commit 5c2b9e2

23 files changed

+307
-338
lines changed

server/src/blob_store/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ use url::Url;
2323

2424
use crate::metrics::{blob_storage, Timer};
2525

26+
pub mod registry;
27+
2628
#[derive(Debug, Clone, Serialize, Deserialize)]
2729
pub struct BlobStorageConfig {
2830
pub path: String,

server/src/blob_store/registry.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
use std::{
2+
collections::HashMap,
3+
sync::{Arc, Mutex},
4+
};
5+
6+
use anyhow::Result;
7+
8+
use super::{BlobStorage, BlobStorageConfig};
9+
10+
pub struct BlobStorageRegistry {
11+
pub blob_storage_buckets: Mutex<HashMap<String, Arc<BlobStorage>>>,
12+
pub default_blob_storage: Arc<BlobStorage>,
13+
}
14+
15+
impl BlobStorageRegistry {
16+
pub fn new(default_blob_storage_path: &str) -> Result<Self> {
17+
let default_blob_storage = BlobStorage::new(BlobStorageConfig {
18+
path: default_blob_storage_path.to_string(),
19+
})?;
20+
Ok(Self {
21+
blob_storage_buckets: Mutex::new(HashMap::new()),
22+
default_blob_storage: Arc::new(default_blob_storage),
23+
})
24+
}
25+
26+
pub fn create_new_blob_store(&self, namespace: &str, path: &str) -> Result<()> {
27+
let blob_storage = BlobStorage::new(BlobStorageConfig {
28+
path: path.to_string(),
29+
})?;
30+
self.blob_storage_buckets
31+
.lock()
32+
.unwrap()
33+
.insert(namespace.to_string(), Arc::new(blob_storage));
34+
Ok(())
35+
}
36+
37+
pub fn get_blob_store(&self, namespace: &str) -> Arc<BlobStorage> {
38+
self.blob_storage_buckets
39+
.lock()
40+
.unwrap()
41+
.get(namespace)
42+
.cloned()
43+
.unwrap_or_else(|| self.default_blob_storage.clone())
44+
}
45+
}

server/src/data_model/mod.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1772,6 +1772,19 @@ pub struct FunctionExecutorResources {
17721772
pub gpu: Option<GPUResources>, // None if no GPU.
17731773
}
17741774

1775+
#[derive(Debug, Clone, Serialize, Deserialize, Builder)]
1776+
#[builder(build_fn(skip))]
1777+
pub struct GcUrl {
1778+
pub url: String,
1779+
pub namespace: String,
1780+
}
1781+
1782+
impl GcUrl {
1783+
pub fn key(&self) -> String {
1784+
format!("{}|{}", self.namespace, self.url)
1785+
}
1786+
}
1787+
17751788
#[derive(Debug, Clone, Serialize, Deserialize, Builder)]
17761789
#[builder(build_fn(skip))]
17771790
pub struct FunctionExecutor {
@@ -2176,6 +2189,7 @@ impl Display for StateChange {
21762189
pub struct Namespace {
21772190
pub name: String,
21782191
pub created_at: u64,
2192+
pub blob_storage_bucket: Option<String>,
21792193
}
21802194

21812195
#[cfg(test)]

server/src/executor_api.rs

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use tonic::{Request, Response, Status};
2828
use tracing::{debug, error, info, instrument, trace, warn};
2929

3030
use crate::{
31-
blob_store::{self, BlobStorage},
31+
blob_store::{self, registry::BlobStorageRegistry},
3232
data_model::{
3333
self,
3434
Allocation,
@@ -415,7 +415,7 @@ impl TryFrom<FunctionExecutorState> for data_model::FunctionExecutor {
415415

416416
fn to_function_executor_diagnostics(
417417
function_executor_update: &executor_api_pb::FunctionExecutorUpdate,
418-
blob_storage: &BlobStorage,
418+
blob_storage_registry: &BlobStorageRegistry,
419419
) -> Result<FunctionExecutorDiagnostics> {
420420
let description = function_executor_update
421421
.description
@@ -441,15 +441,19 @@ fn to_function_executor_diagnostics(
441441
.graph_version
442442
.clone()
443443
.ok_or(anyhow::anyhow!("graph_version is required"))?;
444+
let blob_storage_url_schema = blob_storage_registry
445+
.get_blob_store(&namespace)
446+
.get_url_scheme();
447+
let blob_storage_url = blob_storage_registry.get_blob_store(&namespace).get_url();
444448
let startup_stdout = prepare_data_payload(
445449
function_executor_update.startup_stdout.clone(),
446-
&blob_storage.get_url_scheme(),
447-
&blob_storage.get_url(),
450+
&blob_storage_url_schema,
451+
&blob_storage_url,
448452
);
449453
let startup_stderr = prepare_data_payload(
450454
function_executor_update.startup_stderr.clone(),
451-
&blob_storage.get_url_scheme(),
452-
&blob_storage.get_url(),
455+
&blob_storage_url_schema,
456+
&blob_storage_url,
453457
);
454458

455459
Ok(data_model::FunctionExecutorDiagnostics {
@@ -465,12 +469,12 @@ fn to_function_executor_diagnostics(
465469

466470
fn to_function_executor_diagnostics_vector(
467471
function_executor_updates: &Vec<executor_api_pb::FunctionExecutorUpdate>,
468-
blob_storage: &BlobStorage,
472+
blob_storage_registry: &BlobStorageRegistry,
469473
) -> Result<Vec<FunctionExecutorDiagnostics>> {
470474
function_executor_updates
471475
.iter()
472476
.map(|function_executor_update| {
473-
to_function_executor_diagnostics(function_executor_update, blob_storage)
477+
to_function_executor_diagnostics(function_executor_update, blob_storage_registry)
474478
})
475479
.collect()
476480
}
@@ -479,21 +483,21 @@ pub struct ExecutorAPIService {
479483
indexify_state: Arc<IndexifyState>,
480484
executor_manager: Arc<ExecutorManager>,
481485
api_metrics: Arc<api_io_stats::Metrics>,
482-
blob_storage: Arc<blob_store::BlobStorage>,
486+
blob_storage_registry: Arc<blob_store::registry::BlobStorageRegistry>,
483487
}
484488

485489
impl ExecutorAPIService {
486490
pub fn new(
487491
indexify_state: Arc<IndexifyState>,
488492
executor_manager: Arc<ExecutorManager>,
489493
api_metrics: Arc<api_io_stats::Metrics>,
490-
blob_storage: Arc<blob_store::BlobStorage>,
494+
blob_storage_registry: Arc<blob_store::registry::BlobStorageRegistry>,
491495
) -> Self {
492496
Self {
493497
indexify_state,
494498
executor_manager,
495499
api_metrics,
496-
blob_storage,
500+
blob_storage_registry,
497501
}
498502
}
499503

@@ -577,16 +581,21 @@ impl ExecutorAPIService {
577581

578582
let mut payloads = Vec::new();
579583
let mut encoding_str = String::new();
584+
let blob_storage_url_schema = self
585+
.blob_storage_registry
586+
.get_blob_store(&namespace)
587+
.get_url_scheme();
588+
let blob_storage_url = self
589+
.blob_storage_registry
590+
.get_blob_store(&namespace)
591+
.get_url();
580592
for output in task_result.function_outputs.clone() {
581593
let url = output
582594
.uri
583595
.ok_or(Status::invalid_argument("uri is required"))?;
584596

585-
let path = blob_store_url_to_path(
586-
&url,
587-
&self.blob_storage.get_url_scheme(),
588-
&self.blob_storage.get_url(),
589-
);
597+
let path =
598+
blob_store_url_to_path(&url, &blob_storage_url_schema, &blob_storage_url);
590599
let size = output
591600
.size
592601
.ok_or(Status::invalid_argument("size is required"))?;
@@ -621,8 +630,8 @@ impl ExecutorAPIService {
621630
}
622631
let invocation_error_payload = prepare_data_payload(
623632
task_result.invocation_error_output.clone(),
624-
&self.blob_storage.get_url_scheme(),
625-
&self.blob_storage.get_url(),
633+
&blob_storage_url_schema,
634+
&blob_storage_url,
626635
);
627636

628637
let node_output = NodeOutputBuilder::default()
@@ -642,13 +651,13 @@ impl ExecutorAPIService {
642651
let task_diagnostic = TaskDiagnostics {
643652
stdout: prepare_data_payload(
644653
task_result.stdout.clone(),
645-
&self.blob_storage.get_url_scheme(),
646-
&self.blob_storage.get_url(),
654+
&blob_storage_url_schema,
655+
&blob_storage_url,
647656
),
648657
stderr: prepare_data_payload(
649658
task_result.stderr.clone(),
650-
&self.blob_storage.get_url_scheme(),
651-
&self.blob_storage.get_url(),
659+
&blob_storage_url_schema,
660+
&blob_storage_url,
652661
),
653662
};
654663
let allocation_key = Allocation::key_from(
@@ -849,7 +858,7 @@ impl ExecutorApi for ExecutorAPIService {
849858

850859
let function_executor_diagnostics = to_function_executor_diagnostics_vector(
851860
&executor_update.function_executor_updates,
852-
&self.blob_storage,
861+
&self.blob_storage_registry,
853862
)
854863
.map_err(|e| Status::invalid_argument(e.to_string()))?;
855864

server/src/executors.rs

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use tokio::{
1616
use tracing::{debug, error, trace};
1717

1818
use crate::{
19+
blob_store::registry::BlobStorageRegistry,
1920
data_model::{self, ExecutorId, ExecutorMetadata},
2021
executor_api::{
2122
blob_store_path_to_url,
@@ -118,20 +119,18 @@ impl ExecutorRuntimeData {
118119
}
119120

120121
pub struct ExecutorManager {
121-
blob_store_url_scheme: String,
122-
blob_store_url: String,
123122
heartbeat_deadline_queue: Mutex<PriorityQueue<ExecutorId, ReverseInstant>>,
124123
heartbeat_future: Arc<Mutex<DynamicSleepFuture>>,
125124
heartbeat_deadline_updater: watch::Sender<Instant>,
126125
indexify_state: Arc<IndexifyState>,
127126
runtime_data: RwLock<HashMap<ExecutorId, ExecutorRuntimeData>>,
127+
blob_store_registry: Arc<BlobStorageRegistry>,
128128
}
129129

130130
impl ExecutorManager {
131131
pub async fn new(
132132
indexify_state: Arc<IndexifyState>,
133-
blob_store_url_scheme: String,
134-
blob_store_url: String,
133+
blob_store_registry: Arc<BlobStorageRegistry>,
135134
) -> Arc<Self> {
136135
let (heartbeat_future, heartbeat_sender) = DynamicSleepFuture::new(
137136
far_future(),
@@ -149,8 +148,7 @@ impl ExecutorManager {
149148
heartbeat_deadline_queue: Mutex::new(PriorityQueue::new()),
150149
heartbeat_deadline_updater: heartbeat_sender,
151150
heartbeat_future,
152-
blob_store_url_scheme,
153-
blob_store_url,
151+
blob_store_registry,
154152
};
155153

156154
let em = Arc::new(em);
@@ -387,20 +385,37 @@ impl ExecutorManager {
387385
let mut function_executors_pb = vec![];
388386
let mut task_allocations = vec![];
389387
for desired_state_fe in desired_executor_state.function_executors.iter() {
388+
let blob_store_url_schema = self
389+
.blob_store_registry
390+
.get_blob_store(
391+
&desired_state_fe
392+
.function_executor
393+
.function_executor
394+
.namespace,
395+
)
396+
.get_url_scheme();
397+
let blob_store_url = self
398+
.blob_store_registry
399+
.get_blob_store(
400+
&desired_state_fe
401+
.function_executor
402+
.function_executor
403+
.namespace,
404+
)
405+
.get_url();
390406
let code_payload_pb = DataPayload {
391407
uri: Some(blob_store_path_to_url(
392408
&desired_state_fe.code_payload.path,
393-
&self.blob_store_url_scheme,
394-
&self.blob_store_url,
409+
&blob_store_url_schema,
410+
&blob_store_url,
395411
)),
396412
size: Some(desired_state_fe.code_payload.size),
397413
sha256_hash: Some(desired_state_fe.code_payload.sha256_hash.clone()),
398414
encoding: Some(DataPayloadEncoding::BinaryZip.into()),
399415
encoding_version: None,
400416
};
401417
let fe = &desired_state_fe.function_executor.function_executor;
402-
let fe_output_payload_uri_prefix =
403-
format!("{}/function_executors", self.blob_store_url,);
418+
let fe_output_payload_uri_prefix = format!("{}/function_executors", blob_store_url,);
404419
let fe_description_pb = FunctionExecutorDescription {
405420
id: Some(fe.id.get().to_string()),
406421
namespace: Some(fe.namespace.clone()),
@@ -466,11 +481,19 @@ impl ExecutorManager {
466481

467482
/// Extracts only the computed fields from a data_model::Task
468483
pub fn extract_computed_fields(&self, task: &data_model::Task) -> anyhow::Result<ComputedTask> {
484+
let blob_store_url_schema = self
485+
.blob_store_registry
486+
.get_blob_store(&task.namespace)
487+
.get_url_scheme();
488+
let blob_store_url = self
489+
.blob_store_registry
490+
.get_blob_store(&task.namespace)
491+
.get_url();
469492
let input_payload = DataPayload {
470493
uri: Some(blob_store_path_to_url(
471494
&task.input.path,
472-
&self.blob_store_url_scheme,
473-
&self.blob_store_url,
495+
&blob_store_url_schema,
496+
&blob_store_url,
474497
)),
475498
size: Some(task.input.size),
476499
sha256_hash: Some(task.input.sha256_hash.clone()),
@@ -483,8 +506,8 @@ impl ExecutorManager {
483506
let reducer_input = task.acc_input.clone().map(|input| DataPayload {
484507
uri: Some(blob_store_path_to_url(
485508
&input.path,
486-
&self.blob_store_url_scheme,
487-
&self.blob_store_url,
509+
&blob_store_url_schema,
510+
&blob_store_url,
488511
)),
489512
size: Some(input.size),
490513
sha256_hash: Some(input.sha256_hash.clone()),
@@ -499,7 +522,7 @@ impl ExecutorManager {
499522
// Create output payload URI prefix
500523
let output_payload_uri_prefix = format!(
501524
"{}/{}.{}.{}.{}",
502-
self.blob_store_url,
525+
blob_store_url,
503526
task.namespace,
504527
task.compute_graph_name,
505528
task.compute_fn_name,

0 commit comments

Comments
 (0)