Skip to content

Commit 0c8b35a

Browse files
storage named collection in metastore (#1022)
Co-authored-by: Alan Yu <alan.yu@timeplus.io>
1 parent 9e9152a commit 0c8b35a

File tree

68 files changed

+2777
-1035
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+2777
-1035
lines changed

programs/server/Server.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -833,8 +833,6 @@ try
833833
server_settings.max_io_thread_pool_free_size,
834834
server_settings.io_thread_pool_queue_size);
835835

836-
NamedCollectionFactory::instance().loadIfNot();
837-
838836
/// Initialize global local cache for remote filesystem.
839837
if (config().has("local_cache_for_remote_fs"))
840838
{
@@ -1206,6 +1204,10 @@ try
12061204

12071205
auto server_descriptor = global_context->getServerDescriptor();
12081206

1207+
/// Start named collection after bootstrap
1208+
NamedCollectionFactory::instance().loadIfNot();
1209+
/// proton: ends.
1210+
12091211
for (auto & server : servers_to_start_before_tables)
12101212
{
12111213
server.start();

src/Cluster/Common/EncodeMetaKey.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,29 @@ std::string encodeMetaTaskKey(const std::string & ns, const std::string & name)
224224
return key;
225225
}
226226

227+
std::string encodeMetaNamedCollectionKey()
228+
{
229+
std::string encoded_key;
230+
DB::PrefixTreeEncode::encodeVarUIntAscending(std::to_underlying(MetaKeySpace::NamedCollection), encoded_key);
231+
return encoded_key;
232+
}
233+
234+
std::string encodeMetaNamedCollectionKey(const std::string & name)
235+
{
236+
std::string encoded_key;
237+
encoded_key.reserve(1 + name.size());
238+
DB::PrefixTreeEncode::encodeVarUIntAscending(std::to_underlying(MetaKeySpace::NamedCollection), encoded_key);
239+
DB::PrefixTreeEncode::encodeStringAscending(name, encoded_key);
240+
return encoded_key;
241+
}
242+
243+
std::string decodeMetaNamedCollectionKey(std::string_view data)
244+
{
245+
DB::PrefixTreeEncode::decodeVarUIntAscending(data);
246+
std::string decoded;
247+
auto result = DB::PrefixTreeEncode::decodeStringAscending(data, decoded);
248+
return std::string{result.data(), result.size()};
249+
}
227250

228251
namespace
229252
{

src/Cluster/Common/EncodeMetaKey.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ std::string encodeMetaTaskKey(const std::string & ns, const std::string & name);
4848

4949
std::string encodeMetaAppliedSequenceKey(const StreamIDShard & stream_shard);
5050

51+
std::string encodeMetaNamedCollectionKey();
52+
std::string encodeMetaNamedCollectionKey(const std::string & name);
53+
std::string decodeMetaNamedCollectionKey(std::string_view data);
54+
5155
std::string encodeMetaPendingRequestKey(uint64_t sequence_number);
5256

5357
}

src/Cluster/Common/MetaKeySpace.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ enum class MetaKeySpace : uint8_t
1212
FormatSchema = 0xfa,
1313
Alert = 0xf9,
1414
Task = 0xf8,
15+
NamedCollection = 0xf7,
1516
Database = 0xf4,
1617
Disk = 0xf3,
1718
StoragePolicy = 0xf2,

src/Cluster/MetaStore/MetaDB.cpp

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,78 @@ CallResultV<protocol::TaskDescriptorPtrs> MetaDB::listTasks(std::vector<std::str
471471
return getValues<protocol::TaskDescriptor>(encodeMetaTaskKey(), MetaKeySpace::Task, corrupted_keys);
472472
}
473473

474+
Error MetaDB::saveNamedCollection(
475+
const std::string & name, const protocol::NamedCollectionDescriptor & named_collection, AppliedSequence applied_sn)
476+
{
477+
return mergeKeyValue(
478+
encodeMetaNamedCollectionKey(name),
479+
cluster::serialize<std::string>(named_collection, /*version=*/1),
480+
applied_sn,
481+
"named_collection");
482+
}
483+
484+
Error MetaDB::deleteNamedCollection(const std::string & name, AppliedSequence applied_sn)
485+
{
486+
return deleteKey(encodeMetaNamedCollectionKey(name), applied_sn, "named_collection");
487+
}
488+
489+
Error MetaDB::deleteNamedCollection(const std::string & name)
490+
{
491+
return deleteKey(encodeMetaNamedCollectionKey(name), std::nullopt, "named_collection");
492+
}
493+
494+
CallResultV<protocol::NamedCollectionDescriptorPtr> MetaDB::getNamedCollection(const std::string & name) const
495+
{
496+
return getValue<protocol::NamedCollectionDescriptor>(encodeMetaNamedCollectionKey(name), MetaKeySpace::NamedCollection);
497+
}
498+
499+
CallResultV<protocol::NamedCollectionDescriptorPtrs> MetaDB::getNamedCollection(const std::string & name, size_t versions_requested) const
500+
{
501+
return getValue<protocol::NamedCollectionDescriptor>(
502+
encodeMetaNamedCollectionKey(name), MetaKeySpace::NamedCollection, versions_requested);
503+
}
504+
505+
CallResultV<std::vector<std::string>> MetaDB::listNamedCollections() const
506+
{
507+
std::string prefix_start = encodeMetaNamedCollectionKey();
508+
std::string prefix_end = prefix_start;
509+
DB::PrefixTreeEncode::keyPrefixEnd(prefix_end);
510+
511+
rocksdb::Slice prefix_start_slice = prefix_start;
512+
rocksdb::Slice prefix_end_slice = prefix_end;
513+
514+
rocksdb::ReadOptions options;
515+
options.auto_prefix_mode = true;
516+
options.iterate_upper_bound = &prefix_end_slice;
517+
518+
std::unique_ptr<rocksdb::Iterator> iter{meta_db->NewIterator(options, default_cf_handle)};
519+
520+
CallResultV<std::vector<std::string>> result;
521+
522+
Error last_err;
523+
524+
iter->Seek(prefix_start_slice);
525+
while (iter->Valid())
526+
{
527+
try
528+
{
529+
result.result.push_back(decodeMetaNamedCollectionKey(iter->key().ToStringView()));
530+
}
531+
catch (const Poco::Exception & e)
532+
{
533+
auto text = fmt::format("Failed to decode value: key={} error={}", iter->key().ToStringView(), e.displayText());
534+
last_err = Error(e.code(), text);
535+
LOG_ERROR(logger, "{}, will skip it", text);
536+
}
537+
iter->Next();
538+
}
539+
540+
if (result.result.empty() && last_err.hasError())
541+
result.err.swap(last_err);
542+
543+
return result;
544+
}
545+
474546
Error MetaDB::deleteMaterializedViewAssignment(const cluster::Stream & mv)
475547
{
476548
return deleteKey(encodeMetaMaterializedViewAssignmentKey(mv.ns, mv.name), {}, "MaterializedViewAssignment");

src/Cluster/MetaStore/MetaDB.h

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <Cluster/Protocol/DiskDescriptor.h>
1010
#include <Cluster/Protocol/FormatSchemaDescriptor.h>
1111
#include <Cluster/Protocol/MaterializedViewAssignment.h>
12+
#include <Cluster/Protocol/NamedCollectionDescriptor.h>
1213
#include <Cluster/Protocol/StoragePolicyDescriptor.h>
1314
#include <Cluster/Protocol/StreamDescriptor.h>
1415
#include <Cluster/Protocol/TaskDescriptor.h>
@@ -31,10 +32,7 @@ class MetaDB final
3132
{
3233
public:
3334
MetaDB(
34-
const std::filesystem::path & meta_dir,
35-
const StreamIDShard & meta_stream_shard_,
36-
size_t metadata_keep_versions,
37-
LoggerPtr logger_);
35+
const std::filesystem::path & meta_dir, const StreamIDShard & meta_stream_shard_, size_t metadata_keep_versions, LoggerPtr logger_);
3836

3937
~MetaDB();
4038

@@ -125,6 +123,15 @@ class MetaDB final
125123
CallResultV<protocol::TaskDescriptorPtrs> listTasks(const std::string & ns) const;
126124
CallResultV<protocol::TaskDescriptorPtrs> listTasks(std::vector<std::string> * corrupted_keys = nullptr) const;
127125

126+
/// Named Collection CRUD
127+
Error
128+
saveNamedCollection(const std::string & name, const protocol::NamedCollectionDescriptor & named_collection, AppliedSequence applied_sn);
129+
Error deleteNamedCollection(const std::string & name, AppliedSequence applied_sn);
130+
Error deleteNamedCollection(const std::string & name);
131+
CallResultV<protocol::NamedCollectionDescriptorPtr> getNamedCollection(const std::string & name) const;
132+
CallResultV<protocol::NamedCollectionDescriptorPtrs> getNamedCollection(const std::string & name, size_t versions_requested) const;
133+
CallResultV<std::vector<std::string>> listNamedCollections() const;
134+
128135
/// Internal API, save / delete mv assignment from local metadb
129136
/// Materialized View Assignments
130137
Error saveMaterializedViewAssignment(const protocol::MaterializedViewAssignment & assignment, AppliedSequence applied_sn);
@@ -142,13 +149,13 @@ class MetaDB final
142149
/// Pending request management
143150
/// Save a pending metadata request with sync write
144151
Error savePendingRequest(uint64_t sequence_number, const std::string & serialized_data);
145-
152+
146153
/// Delete a pending request after successful apply
147154
Error deletePendingRequest(uint64_t sequence_number);
148-
155+
149156
/// Iterate pending requests after given sequence number for recovery
150157
CallResultV<std::vector<std::pair<uint64_t, std::string>>> iteratePendingRequestsAfter(uint64_t after_sn) const;
151-
158+
152159
/// Cleanup old pending requests up to and including the given applied sequence number
153160
Error cleanupOldPendingRequests(uint64_t applied_sn);
154161

src/Cluster/MetaStore/MetaStore.cpp

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,37 @@ GetTaskResponsePtr MetaStore::getTask(GetTaskRequestPtr req) const
507507
return std::make_shared<GetTaskResponse>(std::move(result.err), /*data_version_=*/req->version());
508508
}
509509

510+
CreateNamedCollectionResponsePtr MetaStore::createNamedCollection(CreateNamedCollectionRequestPtr req)
511+
{
512+
return processMetaRequestSync<CreateNamedCollectionRequest, CreateNamedCollectionResponse, /*resp_desc=*/false>(std::move(req));
513+
}
514+
515+
DeleteNamedCollectionResponsePtr MetaStore::deleteNamedCollection(DeleteNamedCollectionRequestPtr req)
516+
{
517+
return processMetaRequestSync<DeleteNamedCollectionRequest, DeleteNamedCollectionResponse, /*resp_desc=*/false>(std::move(req));
518+
}
519+
520+
ListNamedCollectionsResponsePtr MetaStore::listNamedCollections(ListNamedCollectionsRequestPtr req) const
521+
{
522+
auto result = meta_db->listNamedCollections();
523+
if (!result.hasError())
524+
return std::make_shared<ListNamedCollectionsResponse>(
525+
std::move(result.result), nodeID(), /*sn_=*/0, /*data_version_=*/req->version());
526+
527+
return std::make_shared<ListNamedCollectionsResponse>(std::move(result.err), /*data_version_=*/req->version());
528+
}
529+
530+
GetNamedCollectionResponsePtr MetaStore::getNamedCollection(GetNamedCollectionRequestPtr req) const
531+
{
532+
const auto & req_data = req->data();
533+
auto result = meta_db->getNamedCollection(req_data.collection_name, req_data.versions_requested);
534+
if (!result.hasError())
535+
return std::make_shared<GetNamedCollectionResponse>(
536+
std::move(result.result), nodeID(), /*sn_=*/0, /*data_version_=*/req->version());
537+
else
538+
return std::make_shared<GetNamedCollectionResponse>(std::move(result.err), /*data_version_=*/req->version());
539+
}
540+
510541
Error MetaStore::proposeMetaRequest(cluster::RequestPtr req, int64_t timeout_ms)
511542
{
512543
/// Always use local mode

src/Cluster/MetaStore/MetaStore.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
#include <Cluster/Requests/CreateDiskResponse.h>
2424
#include <Cluster/Requests/CreateFormatSchemaRequest.h>
2525
#include <Cluster/Requests/CreateFormatSchemaResponse.h>
26+
#include <Cluster/Requests/CreateNamedCollectionRequest.h>
27+
#include <Cluster/Requests/CreateNamedCollectionResponse.h>
2628
#include <Cluster/Requests/CreateStoragePolicyRequest.h>
2729
#include <Cluster/Requests/CreateStoragePolicyResponse.h>
2830
#include <Cluster/Requests/CreateStreamRequest.h>
@@ -41,6 +43,8 @@
4143
#include <Cluster/Requests/DeleteDiskResponse.h>
4244
#include <Cluster/Requests/DeleteFormatSchemaRequest.h>
4345
#include <Cluster/Requests/DeleteFormatSchemaResponse.h>
46+
#include <Cluster/Requests/DeleteNamedCollectionRequest.h>
47+
#include <Cluster/Requests/DeleteNamedCollectionResponse.h>
4448
#include <Cluster/Requests/DeleteStoragePolicyRequest.h>
4549
#include <Cluster/Requests/DeleteStoragePolicyResponse.h>
4650
#include <Cluster/Requests/DeleteStreamRequest.h>
@@ -57,6 +61,8 @@
5761
#include <Cluster/Requests/GetDatabaseResponse.h>
5862
#include <Cluster/Requests/GetFormatSchemaRequest.h>
5963
#include <Cluster/Requests/GetFormatSchemaResponse.h>
64+
#include <Cluster/Requests/GetNamedCollectionRequest.h>
65+
#include <Cluster/Requests/GetNamedCollectionResponse.h>
6066
#include <Cluster/Requests/GetStreamRequest.h>
6167
#include <Cluster/Requests/GetStreamResponse.h>
6268
#include <Cluster/Requests/GetTaskRequest.h>
@@ -73,6 +79,8 @@
7379
#include <Cluster/Requests/ListDisksResponse.h>
7480
#include <Cluster/Requests/ListFormatSchemasRequest.h>
7581
#include <Cluster/Requests/ListFormatSchemasResponse.h>
82+
#include <Cluster/Requests/ListNamedCollectionsRequest.h>
83+
#include <Cluster/Requests/ListNamedCollectionsResponse.h>
7684
#include <Cluster/Requests/ListStoragePoliciesRequest.h>
7785
#include <Cluster/Requests/ListStoragePoliciesResponse.h>
7886
#include <Cluster/Requests/ListStreamsRequest.h>
@@ -209,6 +217,11 @@ class MetaStore
209217
ListTasksResponsePtr listTasks(ListTasksRequestPtr req) const;
210218
GetTaskResponsePtr getTask(GetTaskRequestPtr req) const;
211219

220+
/// Named Collection
221+
CreateNamedCollectionResponsePtr createNamedCollection(CreateNamedCollectionRequestPtr req);
222+
DeleteNamedCollectionResponsePtr deleteNamedCollection(DeleteNamedCollectionRequestPtr req);
223+
ListNamedCollectionsResponsePtr listNamedCollections(ListNamedCollectionsRequestPtr req) const;
224+
GetNamedCollectionResponsePtr getNamedCollection(GetNamedCollectionRequestPtr req) const;
212225

213226
bool isLocalNode(const std::string & host, uint16_t tcp_port, const std::string & fqdn_name) const;
214227

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
#include <Cluster/Protocol/CreateNamedCollectionRequestData.h>
2+
3+
#include <Cluster/Common/serde.h>
4+
#include <IO/ReadHelpers.h>
5+
#include <IO/WriteHelpers.h>
6+
7+
8+
namespace cluster::protocol
9+
{
10+
void CreateNamedCollectionRequestData::serialize(DB::WriteBuffer & wb, uint16_t version) const
11+
{
12+
DB::writeStringBinary(collection_name, wb);
13+
chassert(collection);
14+
collection->serialize(wb, version);
15+
16+
DB::writeVarUInt(initiator, wb);
17+
DB::writeVarInt(timeout_ms, wb);
18+
19+
serializeEnum(exists_op, wb);
20+
DB::writeStringBinary(requested_by, wb);
21+
DB::writeVarInt(requested_ts, wb);
22+
}
23+
24+
void CreateNamedCollectionRequestData::doDeserialize(DB::ReadBuffer & rb, uint16_t version)
25+
{
26+
DB::readStringBinary(collection_name, rb);
27+
collection = std::make_shared<NamedCollectionDescriptor>();
28+
collection->deserialize(rb, version);
29+
30+
DB::readVarUInt(initiator, rb);
31+
DB::readVarInt(timeout_ms, rb);
32+
33+
exists_op = deserializeEnum<ExistsOperation>(rb);
34+
DB::readStringBinary(requested_by, rb);
35+
DB::readVarInt(requested_ts, rb);
36+
}
37+
38+
std::string CreateNamedCollectionRequestData::doString() const
39+
{
40+
return fmt::format(
41+
"name={} collection={{{}}} initiator=0x{{:x}} timeout_ms={} exists_op={} requested_by={} requested_ts={}",
42+
collection_name,
43+
collection->string(),
44+
initiator,
45+
timeout_ms,
46+
magic_enum::enum_name(exists_op),
47+
requested_by,
48+
requested_ts);
49+
}
50+
}

0 commit comments

Comments
 (0)