Skip to content

Commit f77aa65

Browse files
diptanuJulio Martinezj3m7
authored
Region configuration to namespace blob stores (#1655)
* Added region to the remaining blob config and used it when initializing S3 blob stores --------- Co-authored-by: Julio Martinez <julio@tensorflow.ai> Co-authored-by: Julio Martínez <julio@tensorlake.ai>
1 parent 3f0c561 commit f77aa65

File tree

14 files changed

+45
-12
lines changed

14 files changed

+45
-12
lines changed

server/src/blob_store/mod.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ pub mod registry;
2828
#[derive(Debug, Clone, Serialize, Deserialize)]
2929
pub struct BlobStorageConfig {
3030
pub path: String,
31+
pub region: String,
3132
}
3233

3334
impl Default for BlobStorageConfig {
@@ -42,6 +43,7 @@ impl Default for BlobStorageConfig {
4243
);
4344
BlobStorageConfig {
4445
path: blob_store_path,
46+
region: "".to_string(),
4547
}
4648
}
4749
}
@@ -65,7 +67,7 @@ impl BlobStorage {
6567
pub fn new(config: BlobStorageConfig) -> Result<Self> {
6668
let url = &config.path.clone();
6769
debug!("using blob store path: {}", url);
68-
let (object_store, path) = Self::build_object_store(url)?;
70+
let (object_store, path) = Self::build_object_store(url, &config.region)?;
6971
Ok(Self {
7072
object_store: Arc::new(object_store),
7173
url_scheme: url.parse::<Url>()?.scheme().to_string(),
@@ -75,7 +77,7 @@ impl BlobStorage {
7577
})
7678
}
7779

78-
pub fn build_object_store(url_str: &str) -> Result<(Box<dyn ObjectStore>, Path)> {
80+
pub fn build_object_store(url_str: &str, region: &str) -> Result<(Box<dyn ObjectStore>, Path)> {
7981
let url = &url_str.parse::<Url>()?;
8082
let (scheme, _) = ObjectStoreScheme::parse(url)?;
8183
match scheme {
@@ -86,6 +88,7 @@ impl BlobStorage {
8688
.with_url(url_str)
8789
.with_allow_http(true)
8890
.with_conditional_put(S3ConditionalPut::ETagMatch)
91+
.with_region(region)
8992
.build()
9093
.expect("failed to create object store");
9194
let (_, path) = parse_url(url)?;

server/src/blob_store/registry.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,21 @@ pub struct BlobStorageRegistry {
1313
}
1414

1515
impl BlobStorageRegistry {
16-
pub fn new(default_blob_storage_path: &str) -> Result<Self> {
16+
pub fn new(default_blob_storage_path: &str, default_blob_storage_region: &str) -> Result<Self> {
1717
let default_blob_storage = BlobStorage::new(BlobStorageConfig {
1818
path: default_blob_storage_path.to_string(),
19+
region: default_blob_storage_region.to_string(),
1920
})?;
2021
Ok(Self {
2122
blob_storage_buckets: Mutex::new(HashMap::new()),
2223
default_blob_storage: Arc::new(default_blob_storage),
2324
})
2425
}
2526

26-
pub fn create_new_blob_store(&self, namespace: &str, path: &str) -> Result<()> {
27+
pub fn create_new_blob_store(&self, namespace: &str, path: &str, region: &str) -> Result<()> {
2728
let blob_storage = BlobStorage::new(BlobStorageConfig {
2829
path: path.to_string(),
30+
region: region.to_string(),
2931
})?;
3032
self.blob_storage_buckets
3133
.lock()

server/src/data_model/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2162,6 +2162,7 @@ pub struct Namespace {
21622162
pub name: String,
21632163
pub created_at: u64,
21642164
pub blob_storage_bucket: Option<String>,
2165+
pub blob_storage_region: Option<String>,
21652166
}
21662167

21672168
#[cfg(test)]

server/src/http_objects.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -633,6 +633,7 @@ impl From<data_model::ComputeGraph> for ComputeGraph {
633633
pub struct CreateNamespace {
634634
pub name: String,
635635
pub blob_storage_bucket: Option<String>,
636+
pub blob_storage_region: Option<String>,
636637
}
637638

638639
#[derive(Debug, Serialize, Deserialize, ToSchema)]

server/src/routes_internal.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ async fn namespace_middleware(
282282
payload: RequestPayload::CreateNameSpace(NamespaceRequest {
283283
name: namespace.to_string(),
284284
blob_storage_bucket: None,
285+
blob_storage_region: None,
285286
}),
286287
processed_state_changes: vec![],
287288
})
@@ -311,10 +312,16 @@ async fn create_namespace(
311312
Json(namespace): Json<CreateNamespace>,
312313
) -> Result<(), IndexifyAPIError> {
313314
if let Some(blob_storage_bucket) = &namespace.blob_storage_bucket {
314-
if let Err(e) = state
315-
.blob_storage
316-
.create_new_blob_store(&namespace.name, blob_storage_bucket)
317-
{
315+
let Some(blob_storage_region) = &namespace.blob_storage_region else {
316+
return Err(IndexifyAPIError::bad_request(
317+
"blob storage region is required",
318+
));
319+
};
320+
if let Err(e) = state.blob_storage.create_new_blob_store(
321+
&namespace.name,
322+
blob_storage_bucket,
323+
&blob_storage_region,
324+
) {
318325
error!("failed to create blob storage bucket: {:?}", e);
319326
return Err(IndexifyAPIError::internal_error(e));
320327
}
@@ -323,6 +330,7 @@ async fn create_namespace(
323330
payload: RequestPayload::CreateNameSpace(NamespaceRequest {
324331
name: namespace.name.clone(),
325332
blob_storage_bucket: namespace.blob_storage_bucket,
333+
blob_storage_region: namespace.blob_storage_region,
326334
}),
327335
processed_state_changes: vec![],
328336
};

server/src/routes_v1.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ async fn namespace_middleware(
196196
payload: RequestPayload::CreateNameSpace(NamespaceRequest {
197197
name: namespace.to_string(),
198198
blob_storage_bucket: None,
199+
blob_storage_region: None,
199200
}),
200201
processed_state_changes: vec![],
201202
})

server/src/service.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,14 +64,23 @@ impl Service {
6464

6565
let indexify_state = IndexifyState::new(config.state_store_path.parse()?).await?;
6666

67-
let blob_storage_registry =
68-
Arc::new(BlobStorageRegistry::new(config.blob_storage.path.as_str())?);
67+
let blob_storage_registry = Arc::new(BlobStorageRegistry::new(
68+
config.blob_storage.path.as_str(),
69+
config.blob_storage.region.as_str(),
70+
)?);
6971

7072
let namespaces = indexify_state.reader().get_all_namespaces()?;
7173
for namespace in namespaces {
7274
if let Some(blob_storage_bucket) = namespace.blob_storage_bucket {
73-
blob_storage_registry
74-
.create_new_blob_store(&namespace.name, &blob_storage_bucket)?;
75+
let blob_storage_region = namespace.blob_storage_region.unwrap_or_else(|| {
76+
std::env::var("AWS_REGION").unwrap_or_else(|_| "us-east-1".to_string())
77+
});
78+
79+
blob_storage_registry.create_new_blob_store(
80+
&namespace.name,
81+
&blob_storage_bucket,
82+
&blob_storage_region,
83+
)?;
7584
}
7685
}
7786

server/src/state_store/in_memory_state.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -592,6 +592,7 @@ impl InMemoryState {
592592
name: req.name.clone(),
593593
created_at,
594594
blob_storage_bucket: req.blob_storage_bucket.clone(),
595+
blob_storage_region: req.blob_storage_region.clone(),
595596
}),
596597
);
597598
}

server/src/state_store/kv.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ mod tests {
9191
"file://{}",
9292
temp_dir.path().join("blob_store").to_str().unwrap()
9393
),
94+
region: "local".to_string(),
9495
})?),
9596
"test",
9697
)

server/src/state_store/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,7 @@ mod tests {
442442
payload: RequestPayload::CreateNameSpace(NamespaceRequest {
443443
name: "namespace1".to_string(),
444444
blob_storage_bucket: None,
445+
blob_storage_region: None,
445446
}),
446447
processed_state_changes: vec![],
447448
})
@@ -451,6 +452,7 @@ mod tests {
451452
payload: RequestPayload::CreateNameSpace(NamespaceRequest {
452453
name: "namespace2".to_string(),
453454
blob_storage_bucket: Some("bucket2".to_string()),
455+
blob_storage_region: Some("local".to_string()),
454456
}),
455457
processed_state_changes: vec![],
456458
})

0 commit comments

Comments
 (0)