Skip to content

Commit a58b6de

Browse files
Paul.masurel/add kv migration (#5920)
* add migration for in-metastore kv store * store cluster identity * test get-identity --------- Co-authored-by: trinity Pointard <trinity.pointard@datadoghq.com>
1 parent 73205fc commit a58b6de

File tree

13 files changed

+446
-36
lines changed

13 files changed

+446
-36
lines changed

quickwit/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-metastore/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ tokio-stream = { workspace = true }
3737
tower = { workspace = true }
3838
tracing = { workspace = true }
3939
ulid = { workspace = true, features = ["serde"] }
40+
uuid = { workspace = true }
4041
utoipa = { workspace = true }
4142

4243
quickwit-common = { workspace = true }
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
DROP TABLE kv;
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
CREATE TABLE IF NOT EXISTS kv (
2+
key VARCHAR(50) PRIMARY KEY,
3+
value TEXT NOT NULL
4+
);

quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,17 @@ use quickwit_proto::metastore::{
2222
CreateIndexResponse, CreateIndexTemplateRequest, DeleteIndexRequest,
2323
DeleteIndexTemplatesRequest, DeleteQuery, DeleteShardsRequest, DeleteShardsResponse,
2424
DeleteSourceRequest, DeleteSplitsRequest, DeleteTask, EmptyResponse,
25-
FindIndexTemplateMatchesRequest, FindIndexTemplateMatchesResponse, GetIndexTemplateRequest,
26-
GetIndexTemplateResponse, IndexMetadataRequest, IndexMetadataResponse, IndexesMetadataRequest,
27-
IndexesMetadataResponse, LastDeleteOpstampRequest, LastDeleteOpstampResponse,
28-
ListDeleteTasksRequest, ListDeleteTasksResponse, ListIndexTemplatesRequest,
29-
ListIndexTemplatesResponse, ListIndexesMetadataRequest, ListIndexesMetadataResponse,
30-
ListShardsRequest, ListShardsResponse, ListSplitsRequest, ListSplitsResponse,
31-
ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreResult, MetastoreService,
32-
MetastoreServiceClient, MetastoreServiceStream, OpenShardsRequest, OpenShardsResponse,
33-
PruneShardsRequest, PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest,
34-
ToggleSourceRequest, UpdateIndexRequest, UpdateSourceRequest, UpdateSplitsDeleteOpstampRequest,
35-
UpdateSplitsDeleteOpstampResponse,
25+
FindIndexTemplateMatchesRequest, FindIndexTemplateMatchesResponse, GetClusterIdentityRequest,
26+
GetClusterIdentityResponse, GetIndexTemplateRequest, GetIndexTemplateResponse,
27+
IndexMetadataRequest, IndexMetadataResponse, IndexesMetadataRequest, IndexesMetadataResponse,
28+
LastDeleteOpstampRequest, LastDeleteOpstampResponse, ListDeleteTasksRequest,
29+
ListDeleteTasksResponse, ListIndexTemplatesRequest, ListIndexTemplatesResponse,
30+
ListIndexesMetadataRequest, ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse,
31+
ListSplitsRequest, ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest,
32+
MetastoreResult, MetastoreService, MetastoreServiceClient, MetastoreServiceStream,
33+
OpenShardsRequest, OpenShardsResponse, PruneShardsRequest, PublishSplitsRequest,
34+
ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest,
35+
UpdateSourceRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse,
3636
};
3737

3838
/// A [`MetastoreService`] implementation that proxies some requests to the control plane so it can
@@ -274,4 +274,11 @@ impl MetastoreService for ControlPlaneMetastore {
274274
) -> MetastoreResult<EmptyResponse> {
275275
self.metastore.delete_index_templates(request).await
276276
}
277+
278+
async fn get_cluster_identity(
279+
&self,
280+
request: GetClusterIdentityRequest,
281+
) -> MetastoreResult<GetClusterIdentityResponse> {
282+
self.metastore.get_cluster_identity(request).await
283+
}
277284
}

quickwit/quickwit-metastore/src/metastore/file_backed/manifest.rs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use quickwit_proto::types::{DocMappingUid, IndexId};
2323
use quickwit_storage::{OwnedBytes, Storage, StorageError, StorageErrorKind, StorageResult};
2424
use serde::{Deserialize, Serialize};
2525
use tracing::error;
26+
use uuid::Uuid;
2627

2728
pub(super) const MANIFEST_FILE_NAME: &str = "manifest.json";
2829

@@ -40,6 +41,7 @@ impl LegacyManifest {
4041
Manifest {
4142
indexes: self.indexes,
4243
templates: HashMap::new(),
44+
identity: Uuid::nil(),
4345
}
4446
}
4547
}
@@ -64,6 +66,7 @@ pub(crate) struct Manifest {
6466
// The templates are serialized as a sorted `Vec<IndexTemplate>` so the btree map is
6567
// unnecessary here and we can pass the hash map as is to the `MetastoreState`
6668
pub templates: HashMap<IndexTemplateId, IndexTemplate>,
69+
pub identity: Uuid,
6770
}
6871

6972
#[derive(Clone, Debug, Serialize, Deserialize)]
@@ -103,6 +106,8 @@ impl From<VersionedManifest> for Manifest {
103106
struct ManifestV0_8 {
104107
indexes: BTreeMap<IndexId, IndexStatus>,
105108
templates: Vec<IndexTemplate>,
109+
#[serde(default, skip_serializing_if = "Uuid::is_nil")]
110+
identity: Uuid,
106111
}
107112

108113
impl From<Manifest> for ManifestV0_8 {
@@ -115,6 +120,7 @@ impl From<Manifest> for ManifestV0_8 {
115120
ManifestV0_8 {
116121
indexes: manifest.indexes,
117122
templates,
123+
identity: manifest.identity,
118124
}
119125
}
120126
}
@@ -127,7 +133,11 @@ impl From<ManifestV0_8> for Manifest {
127133
.into_iter()
128134
.map(|template| (template.template_id.clone(), template))
129135
.collect();
130-
Manifest { indexes, templates }
136+
Manifest {
137+
indexes,
138+
templates,
139+
identity: manifest.identity,
140+
}
131141
}
132142
}
133143

@@ -144,7 +154,11 @@ impl quickwit_config::TestableForRegression for Manifest {
144154
"test-template-1".to_string(),
145155
IndexTemplate::sample_for_regression(),
146156
);
147-
Manifest { indexes, templates }
157+
Manifest {
158+
indexes,
159+
templates,
160+
identity: Uuid::nil(),
161+
}
148162
}
149163

150164
fn assert_equality(&self, other: &Self) {
@@ -320,7 +334,11 @@ mod tests {
320334
IndexTemplate::for_test("test-template-2", &["test-index-bar*"], 200),
321335
),
322336
]);
323-
let manifest = Manifest { indexes, templates };
337+
let manifest = Manifest {
338+
indexes,
339+
templates,
340+
identity: Uuid::nil(),
341+
};
324342
let manifest_json = serde_json::to_string_pretty(&manifest).unwrap();
325343
let manifest_deserialized: Manifest = serde_json::from_str(&manifest_json).unwrap();
326344
assert_eq!(manifest, manifest_deserialized);

quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -44,24 +44,26 @@ use quickwit_proto::metastore::{
4444
CreateIndexResponse, CreateIndexTemplateRequest, DeleteIndexRequest,
4545
DeleteIndexTemplatesRequest, DeleteQuery, DeleteShardsRequest, DeleteShardsResponse,
4646
DeleteSourceRequest, DeleteSplitsRequest, DeleteTask, EmptyResponse, EntityKind,
47-
FindIndexTemplateMatchesRequest, FindIndexTemplateMatchesResponse, GetIndexTemplateRequest,
48-
GetIndexTemplateResponse, IndexMetadataFailure, IndexMetadataFailureReason,
49-
IndexMetadataRequest, IndexMetadataResponse, IndexTemplateMatch, IndexesMetadataRequest,
50-
IndexesMetadataResponse, LastDeleteOpstampRequest, LastDeleteOpstampResponse,
51-
ListDeleteTasksRequest, ListDeleteTasksResponse, ListIndexTemplatesRequest,
52-
ListIndexTemplatesResponse, ListIndexesMetadataRequest, ListIndexesMetadataResponse,
53-
ListShardsRequest, ListShardsResponse, ListSplitsRequest, ListSplitsResponse,
54-
ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, MetastoreResult,
55-
MetastoreService, MetastoreServiceStream, OpenShardSubrequest, OpenShardsRequest,
56-
OpenShardsResponse, PruneShardsRequest, PublishSplitsRequest, ResetSourceCheckpointRequest,
57-
StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, UpdateSourceRequest,
58-
UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, serde_utils,
47+
FindIndexTemplateMatchesRequest, FindIndexTemplateMatchesResponse, GetClusterIdentityRequest,
48+
GetClusterIdentityResponse, GetIndexTemplateRequest, GetIndexTemplateResponse,
49+
IndexMetadataFailure, IndexMetadataFailureReason, IndexMetadataRequest, IndexMetadataResponse,
50+
IndexTemplateMatch, IndexesMetadataRequest, IndexesMetadataResponse, LastDeleteOpstampRequest,
51+
LastDeleteOpstampResponse, ListDeleteTasksRequest, ListDeleteTasksResponse,
52+
ListIndexTemplatesRequest, ListIndexTemplatesResponse, ListIndexesMetadataRequest,
53+
ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, ListSplitsRequest,
54+
ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError,
55+
MetastoreResult, MetastoreService, MetastoreServiceStream, OpenShardSubrequest,
56+
OpenShardsRequest, OpenShardsResponse, PruneShardsRequest, PublishSplitsRequest,
57+
ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest,
58+
UpdateSourceRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse,
59+
serde_utils,
5960
};
6061
use quickwit_proto::types::{IndexId, IndexUid};
6162
use quickwit_storage::Storage;
6263
use time::OffsetDateTime;
6364
use tokio::sync::{Mutex, OwnedMutexGuard, RwLock};
6465
use ulid::Ulid;
66+
use uuid::Uuid;
6567

6668
use self::file_backed_index::FileBackedIndex;
6769
pub use self::file_backed_metastore_factory::FileBackedMetastoreFactory;
@@ -1202,6 +1204,32 @@ impl MetastoreService for FileBackedMetastore {
12021204
}
12031205
Ok(EmptyResponse {})
12041206
}
1207+
1208+
// Get cluster identity api
1209+
1210+
// this returns a constant uuid. on first call, it generate said uuid if it doesn't already
1211+
// exists
1212+
async fn get_cluster_identity(
1213+
&self,
1214+
_: GetClusterIdentityRequest,
1215+
) -> MetastoreResult<GetClusterIdentityResponse> {
1216+
let mut state_wlock_guard = self.state.write().await;
1217+
1218+
if state_wlock_guard.identity.is_nil() {
1219+
state_wlock_guard.identity = Uuid::new_v4();
1220+
1221+
let manifest = state_wlock_guard.as_manifest();
1222+
1223+
if let Err(error) = save_manifest(&*self.storage, &manifest).await {
1224+
state_wlock_guard.identity = Uuid::nil();
1225+
return Err(error);
1226+
}
1227+
}
1228+
1229+
Ok(GetClusterIdentityResponse {
1230+
uuid: state_wlock_guard.identity.hyphenated().to_string(),
1231+
})
1232+
}
12051233
}
12061234

12071235
impl MetastoreServiceExt for FileBackedMetastore {}

quickwit/quickwit-metastore/src/metastore/file_backed/state.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use quickwit_config::{IndexTemplate, IndexTemplateId};
2020
use quickwit_proto::metastore::MetastoreResult;
2121
use quickwit_proto::types::IndexId;
2222
use quickwit_storage::Storage;
23+
use uuid::Uuid;
2324

2425
use super::LazyIndexStatus;
2526
use super::index_template_matcher::IndexTemplateMatcher;
@@ -31,6 +32,7 @@ pub(super) struct MetastoreState {
3132
pub indexes: HashMap<IndexId, LazyIndexStatus>,
3233
pub templates: HashMap<IndexTemplateId, IndexTemplate>,
3334
pub template_matcher: IndexTemplateMatcher,
35+
pub identity: Uuid,
3436
}
3537

3638
impl MetastoreState {
@@ -64,6 +66,7 @@ impl MetastoreState {
6466
indexes,
6567
templates: manifest.templates,
6668
template_matcher,
69+
identity: manifest.identity,
6770
};
6871
Ok(state)
6972
}
@@ -82,6 +85,10 @@ impl MetastoreState {
8285
})
8386
.collect();
8487
let templates = self.templates.clone();
85-
Manifest { indexes, templates }
88+
Manifest {
89+
indexes,
90+
templates,
91+
identity: self.identity,
92+
}
8693
}
8794
}

quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,15 @@ use quickwit_proto::metastore::{
3131
CreateIndexResponse, CreateIndexTemplateRequest, DeleteIndexRequest,
3232
DeleteIndexTemplatesRequest, DeleteQuery, DeleteShardsRequest, DeleteShardsResponse,
3333
DeleteSourceRequest, DeleteSplitsRequest, DeleteTask, EmptyResponse, EntityKind,
34-
FindIndexTemplateMatchesRequest, FindIndexTemplateMatchesResponse, GetIndexTemplateRequest,
35-
GetIndexTemplateResponse, IndexMetadataFailure, IndexMetadataFailureReason,
36-
IndexMetadataRequest, IndexMetadataResponse, IndexTemplateMatch, IndexesMetadataRequest,
37-
IndexesMetadataResponse, LastDeleteOpstampRequest, LastDeleteOpstampResponse,
38-
ListDeleteTasksRequest, ListDeleteTasksResponse, ListIndexTemplatesRequest,
39-
ListIndexTemplatesResponse, ListIndexesMetadataRequest, ListIndexesMetadataResponse,
40-
ListShardsRequest, ListShardsResponse, ListShardsSubresponse, ListSplitsRequest,
41-
ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError,
42-
MetastoreResult, MetastoreService, MetastoreServiceStream, OpenShardSubrequest,
34+
FindIndexTemplateMatchesRequest, FindIndexTemplateMatchesResponse, GetClusterIdentityRequest,
35+
GetClusterIdentityResponse, GetIndexTemplateRequest, GetIndexTemplateResponse,
36+
IndexMetadataFailure, IndexMetadataFailureReason, IndexMetadataRequest, IndexMetadataResponse,
37+
IndexTemplateMatch, IndexesMetadataRequest, IndexesMetadataResponse, LastDeleteOpstampRequest,
38+
LastDeleteOpstampResponse, ListDeleteTasksRequest, ListDeleteTasksResponse,
39+
ListIndexTemplatesRequest, ListIndexTemplatesResponse, ListIndexesMetadataRequest,
40+
ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, ListShardsSubresponse,
41+
ListSplitsRequest, ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest,
42+
MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceStream, OpenShardSubrequest,
4343
OpenShardSubresponse, OpenShardsRequest, OpenShardsResponse, PruneShardsRequest,
4444
PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest,
4545
UpdateIndexRequest, UpdateSourceRequest, UpdateSplitsDeleteOpstampRequest,
@@ -51,6 +51,7 @@ use sea_query_binder::SqlxBinder;
5151
use sqlx::{Acquire, Executor, Postgres, Transaction};
5252
use time::OffsetDateTime;
5353
use tracing::{debug, info, instrument, warn};
54+
use uuid::Uuid;
5455

5556
use super::error::convert_sqlx_err;
5657
use super::migrator::run_migrations;
@@ -1664,6 +1665,26 @@ impl MetastoreService for PostgresqlMetastore {
16641665
.await?;
16651666
Ok(EmptyResponse {})
16661667
}
1668+
1669+
async fn get_cluster_identity(
1670+
&self,
1671+
_: GetClusterIdentityRequest,
1672+
) -> MetastoreResult<GetClusterIdentityResponse> {
1673+
let (uuid,) = sqlx::query_as(
1674+
r"
1675+
WITH insert AS (
1676+
INSERT INTO kv (key, value)
1677+
VALUES ('cluster_identity', $1)
1678+
ON CONFLICT (key) DO NOTHING
1679+
)
1680+
SELECT value FROM kv where key = 'cluster_identity';
1681+
",
1682+
)
1683+
.bind(Uuid::new_v4().hyphenated().to_string())
1684+
.fetch_one(&self.connection_pool)
1685+
.await?;
1686+
Ok(GetClusterIdentityResponse { uuid })
1687+
}
16671688
}
16681689

16691690
async fn open_or_fetch_shard<'e>(
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Copyright 2021-Present Datadog, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// Index API tests
16+
//
17+
// - create_index
18+
// - index_exists
19+
// - index_metadata
20+
// - list_indexes
21+
// - delete_index
22+
23+
use quickwit_proto::metastore::{GetClusterIdentityRequest, MetastoreService};
24+
use uuid::Uuid;
25+
26+
use super::DefaultForTest;
27+
use crate::MetastoreServiceExt;
28+
29+
pub async fn test_metastore_get_identity<
30+
MetastoreToTest: MetastoreService + MetastoreServiceExt + DefaultForTest,
31+
>() {
32+
let metastore = MetastoreToTest::default_for_test().await;
33+
34+
let identity_1 = metastore
35+
.get_cluster_identity(GetClusterIdentityRequest {})
36+
.await
37+
.unwrap()
38+
.uuid;
39+
40+
let identity_2 = metastore
41+
.get_cluster_identity(GetClusterIdentityRequest {})
42+
.await
43+
.unwrap()
44+
.uuid;
45+
46+
assert_eq!(identity_1, identity_2);
47+
assert_ne!(identity_1, Uuid::nil().hyphenated().to_string());
48+
}

0 commit comments

Comments
 (0)