Skip to content

Commit a56d656

Browse files
authored
Correct distconf quorum for scenario when user adds new nodes to cluster (#28455)
1 parent 9a553f9 commit a56d656

File tree

17 files changed

+305
-153
lines changed

17 files changed

+305
-153
lines changed

ydb/core/blobstorage/base/blobstorage_events.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,8 @@
44
namespace NKikimr {
55

66
TEvNodeWardenStorageConfig::TEvNodeWardenStorageConfig(std::shared_ptr<const NKikimrBlobStorage::TStorageConfig> config,
7-
std::shared_ptr<const NKikimrBlobStorage::TStorageConfig> proposedConfig, bool selfManagementEnabled,
8-
TBridgeInfo::TPtr bridgeInfo)
7+
bool selfManagementEnabled, TBridgeInfo::TPtr bridgeInfo)
98
: Config(std::move(config))
10-
, ProposedConfig(std::move(proposedConfig))
119
, SelfManagementEnabled(selfManagementEnabled)
1210
, BridgeInfo(std::move(bridgeInfo))
1311
{}

ydb/core/blobstorage/base/blobstorage_events.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -592,13 +592,11 @@ namespace NKikimr {
592592
: TEventLocal<TEvNodeWardenStorageConfig, TEvBlobStorage::EvNodeWardenStorageConfig>
593593
{
594594
std::shared_ptr<const NKikimrBlobStorage::TStorageConfig> Config;
595-
std::shared_ptr<const NKikimrBlobStorage::TStorageConfig> ProposedConfig;
596595
bool SelfManagementEnabled;
597596
TBridgeInfo::TPtr BridgeInfo;
598597

599598
TEvNodeWardenStorageConfig(std::shared_ptr<const NKikimrBlobStorage::TStorageConfig> config,
600-
std::shared_ptr<const NKikimrBlobStorage::TStorageConfig> proposedConfig, bool selfManagementEnabled,
601-
TBridgeInfo::TPtr bridgeInfo);
599+
bool selfManagementEnabled, TBridgeInfo::TPtr bridgeInfo);
602600
~TEvNodeWardenStorageConfig();
603601
};
604602

ydb/core/blobstorage/nodewarden/distconf.cpp

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ namespace NKikimr::NStorage {
130130
ProposedStorageConfig.reset();
131131
}
132132

133-
ReportStorageConfigToNodeWarden(0);
133+
ReportStorageConfigToNodeWarden();
134134

135135
if (IsSelfStatic) {
136136
PersistConfig({});
@@ -177,15 +177,7 @@ namespace NKikimr::NStorage {
177177
}
178178

179179
void TDistributedConfigKeeper::HandleConfigConfirm(STATEFN_SIG) {
180-
if (ev->Cookie) {
181-
STLOG(PRI_DEBUG, BS_NODE, NWDC46, "HandleConfigConfirm", (Cookie, ev->Cookie),
182-
(ProposedStorageConfigCookie, ProposedStorageConfigCookie),
183-
(ProposedStorageConfigCookieUsage, ProposedStorageConfigCookieUsage));
184-
if (ev->Cookie == ProposedStorageConfigCookie && ProposedStorageConfigCookieUsage) {
185-
--ProposedStorageConfigCookieUsage;
186-
}
187-
FinishAsyncOperation(ev->Cookie);
188-
}
180+
Y_UNUSED(ev);
189181
}
190182

191183
void TDistributedConfigKeeper::SendEvent(ui32 nodeId, ui64 cookie, TActorId sessionId, std::unique_ptr<IEventBase> ev) {
@@ -233,13 +225,22 @@ namespace NKikimr::NStorage {
233225

234226
for (const auto& [cookie, task] : ScatterTasks) {
235227
for (const ui32 nodeId : task.PendingNodes) {
236-
const auto it = DirectBoundNodes.find(nodeId);
237-
Y_ABORT_UNLESS(it != DirectBoundNodes.end());
238-
TBoundNode& info = it->second;
239-
Y_ABORT_UNLESS(info.ScatterTasks.contains(cookie));
228+
if (const auto it = DirectBoundNodes.find(nodeId); it != DirectBoundNodes.end()) {
229+
TBoundNode& info = it->second;
230+
Y_ABORT_UNLESS(info.ScatterTasks.contains(cookie));
231+
} else {
232+
Y_ABORT_UNLESS(AddedNodesScatterTasks.contains({nodeId, cookie}));
233+
}
240234
}
241235
}
242236

237+
for (const auto& [nodeId, cookie] : AddedNodesScatterTasks) {
238+
const auto it = ScatterTasks.find(cookie);
239+
Y_ABORT_UNLESS(it != ScatterTasks.end());
240+
TScatterTask& task = it->second;
241+
Y_ABORT_UNLESS(task.PendingNodes.contains(nodeId));
242+
}
243+
243244
for (const auto& [nodeId, info] : DirectBoundNodes) {
244245
for (const ui64 cookie : info.ScatterTasks) {
245246
const auto it = ScatterTasks.find(cookie);
@@ -250,10 +251,12 @@ namespace NKikimr::NStorage {
250251
}
251252

252253
for (const auto& [cookie, task] : ScatterTasks) {
253-
if (task.Origin) {
254-
Y_ABORT_UNLESS(Binding);
255-
Y_ABORT_UNLESS(task.Origin == Binding);
256-
}
254+
std::visit(TOverloaded{
255+
[&](const TBinding& origin) { Y_ABORT_UNLESS(origin == Binding); },
256+
[&](const TActorId& /*actorId*/) { Y_ABORT_UNLESS(!Binding); },
257+
[&](const TScatterTaskOriginFsm&) {},
258+
[&](const TScatterTaskOriginTargeted&) {}
259+
}, task.Origin);
257260
}
258261

259262
for (const auto& [nodeId, subs] : SubscribedSessions) {
@@ -277,6 +280,10 @@ namespace NKikimr::NStorage {
277280
if (UnsubscribeQueue.contains(nodeId)) {
278281
okay = true;
279282
}
283+
if (!okay) {
284+
const auto it = AddedNodesScatterTasks.lower_bound({nodeId, 0});
285+
okay = it != AddedNodesScatterTasks.end() && std::get<0>(*it) == nodeId;
286+
}
280287
Y_ABORT_UNLESS(okay);
281288
if (subs.SubscriptionCookie) {
282289
const auto it = SubscriptionCookieMap.find(subs.SubscriptionCookie);
@@ -377,16 +384,12 @@ namespace NKikimr::NStorage {
377384
}
378385
}
379386

380-
void TDistributedConfigKeeper::ReportStorageConfigToNodeWarden(ui64 cookie) {
387+
void TDistributedConfigKeeper::ReportStorageConfigToNodeWarden() {
381388
Y_ABORT_UNLESS(StorageConfig);
382389
const TActorId wardenId = MakeBlobStorageNodeWardenID(SelfId().NodeId());
383390
const auto& config = SelfManagementEnabled ? StorageConfig : BaseConfig;
384-
auto proposedConfig = ProposedStorageConfig && SelfManagementEnabled
385-
? std::make_shared<NKikimrBlobStorage::TStorageConfig>(*ProposedStorageConfig)
386-
: nullptr;
387-
auto ev = std::make_unique<TEvNodeWardenStorageConfig>(config, std::move(proposedConfig), SelfManagementEnabled,
388-
BridgeInfo);
389-
Send(wardenId, ev.release(), 0, cookie);
391+
auto ev = std::make_unique<TEvNodeWardenStorageConfig>(config, SelfManagementEnabled, BridgeInfo);
392+
Send(wardenId, ev.release());
390393
}
391394

392395
STFUNC(TDistributedConfigKeeper::StateFunc) {

ydb/core/blobstorage/nodewarden/distconf.h

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -172,22 +172,39 @@ namespace NKikimr::NStorage {
172172
ui64 Id = RandomNumber<ui64>(); // unique id
173173
};
174174

175+
struct TScatterTaskOriginFsm {
176+
TString ToString() const { return "fsm"; }
177+
};
178+
179+
struct TScatterTaskOriginTargeted {
180+
TActorId Sender;
181+
ui64 Cookie;
182+
TActorId InterconnectSessionId;
183+
184+
TString ToString() const { return TStringBuilder() << "{Sender# " << Sender << " Cookie# " << Cookie
185+
<< " InterconnectSessionId# " << InterconnectSessionId << '}'; }
186+
};
187+
188+
using TScatterTaskOrigin = std::variant<
189+
TBinding, // when scatter is received from root
190+
TActorId, // locally generated by invoke processor
191+
TScatterTaskOriginFsm, // locally generated by configuration change FSM
192+
TScatterTaskOriginTargeted // when targeted scatter is issued by cluster leader to newly added nodes
193+
>;
194+
175195
struct TScatterTask {
176-
const std::optional<TBinding> Origin;
196+
const TScatterTaskOrigin Origin;
177197
const ui64 ScepterCounter;
178-
const TActorId ActorId;
179198

180199
THashSet<ui32> PendingNodes;
181200
ui32 AsyncOperationsPending = 0;
182201
TEvScatter Request;
183202
TEvGather Response;
184203
std::vector<TEvGather> CollectedResponses; // from bound nodes
185204

186-
TScatterTask(const std::optional<TBinding>& origin, TEvScatter&& request,
187-
ui64 scepterCounter, TActorId actorId)
188-
: Origin(origin)
205+
TScatterTask(TScatterTaskOrigin&& origin, TEvScatter&& request, ui64 scepterCounter)
206+
: Origin(std::move(origin))
189207
, ScepterCounter(scepterCounter)
190-
, ActorId(actorId)
191208
{
192209
Request.Swap(&request);
193210
if (Request.HasCookie()) {
@@ -218,8 +235,7 @@ namespace NKikimr::NStorage {
218235

219236
// proposed storage configuration of the cluster
220237
std::optional<NKikimrBlobStorage::TStorageConfig> ProposedStorageConfig; // proposed one
221-
ui64 ProposedStorageConfigCookie; // if set, then this configuration is being written right now
222-
ui32 ProposedStorageConfigCookieUsage = 0;
238+
std::optional<ui64> ProposedStorageConfigCookie; // if set, then this configuration is being written right now
223239

224240
// most relevant proposed config
225241
using TPersistCallback = std::function<void(TEvPrivate::TEvStorageConfigStored&)>;
@@ -266,6 +282,7 @@ namespace NKikimr::NStorage {
266282
ui64 NextScatterCookie = RandomNumber<ui64>();
267283
using TScatterTasks = THashMap<ui64, TScatterTask>;
268284
TScatterTasks ScatterTasks;
285+
std::set<std::tuple<ui32, ui64>> AddedNodesScatterTasks;
269286

270287
std::optional<TActorId> StateStorageSelfHealActor;
271288

@@ -284,6 +301,7 @@ namespace NKikimr::NStorage {
284301
NKikimrBlobStorage::TStorageConfig StorageConfig; // storage config being proposed
285302
TActorId ActorId; // actor id waiting for this operation to complete
286303
bool MindPrev; // mind previous configuration quorum
304+
std::vector<TNodeIdentifier> AddedNodes; // a list of nodes being added in this configuration change
287305
};
288306
std::optional<TProposition> CurrentProposition;
289307

@@ -348,7 +366,7 @@ namespace NKikimr::NStorage {
348366
void Halt(); // cease any distconf activity, unbind and reject any bindings
349367
bool ApplyStorageConfig(const NKikimrBlobStorage::TStorageConfig& config, bool fromBinding = false);
350368
void HandleConfigConfirm(STATEFN_SIG);
351-
void ReportStorageConfigToNodeWarden(ui64 cookie);
369+
void ReportStorageConfigToNodeWarden();
352370

353371
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
354372
// PDisk configuration retrieval and storing
@@ -411,6 +429,7 @@ namespace NKikimr::NStorage {
411429
void BecomeRoot();
412430
void UnbecomeRoot();
413431
void HandleErrorTimeout();
432+
void UndoCurrentPropositionNodeChange(TProposition& proposition);
414433
void ProcessGather(TEvGather *res);
415434
bool HasConnectedNodeQuorum(const NKikimrBlobStorage::TStorageConfig& config, bool local) const;
416435

@@ -487,7 +506,7 @@ namespace NKikimr::NStorage {
487506
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
488507
// Scatter/gather logic
489508

490-
void IssueScatterTask(std::optional<TActorId> actorId, TEvScatter&& request);
509+
void IssueScatterTask(TScatterTaskOrigin&& origin, TEvScatter&& request, std::span<TNodeIdentifier> addedNodes = {});
491510
void CheckCompleteScatterTask(TScatterTasks::iterator it);
492511
void FinishAsyncOperation(ui64 cookie);
493512
void IssueScatterTaskForNode(ui32 nodeId, TBoundNode& info, ui64 cookie, TScatterTask& task);

ydb/core/blobstorage/nodewarden/distconf_binding.cpp

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ namespace NKikimr::NStorage {
77
void TDistributedConfigKeeper::Handle(TEvInterconnect::TEvNodesInfo::TPtr ev) {
88
STLOG(PRI_DEBUG, BS_NODE, NWDC11, "TEvNodesInfo");
99

10+
if (SelfManagementEnabled) {
11+
// we obtain node list only from current StorageConfig
12+
return;
13+
}
14+
1015
std::vector<std::tuple<TNodeIdentifier, TNodeLocation>> newNodeList;
1116
for (const auto& item : ev->Get()->Nodes) {
1217
if (item.IsStatic) {
@@ -18,6 +23,8 @@ namespace NKikimr::NStorage {
1823
}
1924

2025
void TDistributedConfigKeeper::ApplyNewNodeList(std::span<std::tuple<TNodeIdentifier, TNodeLocation>> newNodeList) {
26+
STLOG(PRI_DEBUG, BS_NODE, NWDC13, "ApplyNewNodeList", (NewNodeList, newNodeList));
27+
2128
// do not start configuration negotiation for dynamic nodes
2229
if (!IsSelfStatic) {
2330
std::optional<TString> expectedBridgePileName;
@@ -49,7 +56,8 @@ namespace NKikimr::NStorage {
4956
break;
5057
}
5158
}
52-
Y_ABORT_UNLESS(found);
59+
auto fn = [](const auto& x) { return TStringBuilder() << std::get<0>(x); };
60+
Y_VERIFY_S(found, "SelfNodeId# " << selfNodeId << " NewNodeList# " << FormatList(newNodeList | std::views::transform(fn)));
5361

5462
// process all other nodes, find bindable ones (from our current pile) and build list of all nodes
5563
AllNodeIds.clear();
@@ -61,7 +69,7 @@ namespace NKikimr::NStorage {
6169

6270
for (const auto& [item, location] : newNodeList) {
6371
const ui32 nodeId = item.NodeId();
64-
AllNodeIds.insert(item.NodeId());
72+
AllNodeIds.insert(nodeId);
6573

6674
// check if node is from the same pile (as this one)
6775
if (location.GetBridgePileName() == SelfNodeBridgePileName) {
@@ -297,6 +305,13 @@ namespace NKikimr::NStorage {
297305
if (Binding && Binding->NodeId == nodeId) {
298306
AbortBinding("disconnection", false);
299307
}
308+
309+
// abort scatter tasks issued to newly added nodes
310+
for (auto it = AddedNodesScatterTasks.lower_bound({nodeId, 0}); it != AddedNodesScatterTasks.end() &&
311+
std::get<0>(*it) == nodeId; it = AddedNodesScatterTasks.erase(it)) {
312+
const auto& [nodeId, cookie] = *it;
313+
AbortScatterTask(cookie, nodeId);
314+
}
300315
}
301316

302317
void TDistributedConfigKeeper::UnsubscribeInterconnect(ui32 nodeId) {
@@ -309,6 +324,10 @@ namespace NKikimr::NStorage {
309324
if (ConnectedDynamicNodes.contains(nodeId)) {
310325
return;
311326
}
327+
if (const auto it = AddedNodesScatterTasks.lower_bound({nodeId, 0}); it != AddedNodesScatterTasks.end() &&
328+
std::get<0>(*it) == nodeId) {
329+
return;
330+
}
312331
if (const auto it = SubscribedSessions.find(nodeId); it != SubscribedSessions.end()) {
313332
TSessionSubscription& subs = it->second;
314333
STLOG(PRI_DEBUG, BS_NODE, NWDC55, "UnsubscribeInterconnect", (NodeId, nodeId), (Subscription, subs));
@@ -319,11 +338,17 @@ namespace NKikimr::NStorage {
319338
Y_ABORT_UNLESS(jt != SubscriptionCookieMap.end());
320339
Y_ABORT_UNLESS(jt->second == nodeId);
321340
SubscriptionCookieMap.erase(jt);
341+
if (!AllNodeIds.contains(nodeId)) {
342+
TActivationContext::Send(new IEventHandle(TEvInterconnect::EvDisconnect, 0,
343+
TActivationContext::InterconnectProxy(nodeId), {}, nullptr, 0));
344+
}
322345
} else {
323346
// we already had TEvNodeConnected, so we have to unsubscribe
324347
Y_ABORT_UNLESS(subs.SessionId);
325-
TActivationContext::Send(new IEventHandle(TEvents::TSystem::Unsubscribe, 0, subs.SessionId, SelfId(),
326-
nullptr, 0));
348+
ui32 event = AllNodeIds.contains(nodeId)
349+
? TEvents::TSystem::Unsubscribe
350+
: TEvents::TSystem::Poison;
351+
TActivationContext::Send(new IEventHandle(event, 0, subs.SessionId, SelfId(), nullptr, 0));
327352
}
328353
SubscribedSessions.erase(it);
329354
}
@@ -403,7 +428,8 @@ namespace NKikimr::NStorage {
403428
return; // possible race with unbinding
404429
}
405430

406-
Y_ABORT_UNLESS(Binding->RootNodeId || ScatterTasks.empty());
431+
auto isTargeted = [](const TScatterTaskOrigin& origin) { return std::holds_alternative<TScatterTaskOriginTargeted>(origin); };
432+
Y_ABORT_UNLESS(Binding->RootNodeId || std::ranges::all_of(ScatterTasks | std::views::values, isTargeted, &TScatterTask::Origin));
407433

408434
// check if this binding was accepted and if it is acceptable from our point of view
409435
bool bindingUpdate = false;
@@ -546,13 +572,10 @@ namespace NKikimr::NStorage {
546572
Y_ABORT_UNLESS(senderNodeId != SelfId().NodeId());
547573
auto& record = ev->Get()->Record;
548574

549-
STLOG(PRI_DEBUG, BS_NODE, NWDC02, "TEvNodeConfigPush", (NodeId, senderNodeId), (Cookie, ev->Cookie),
550-
(SessionId, ev->InterconnectSession), (Binding, Binding), (Record, record),
551-
(RootNodeId, GetRootNodeId()));
552-
553575
// check if we have to send our current config to the peer
554576
const NKikimrBlobStorage::TStorageConfig *configToPeer = nullptr;
555577
std::optional<ui64> requestStorageConfigGeneration;
578+
const bool knownNode = AllNodeIds.contains(senderNodeId);
556579
if (StorageConfig) {
557580
for (const auto& item : record.GetBoundNodes()) {
558581
if (item.GetNodeId().GetNodeId() == senderNodeId) {
@@ -566,9 +589,19 @@ namespace NKikimr::NStorage {
566589
}
567590
}
568591

569-
if (!AllNodeIds.contains(senderNodeId)) {
592+
STLOG(PRI_DEBUG, BS_NODE, NWDC02, "TEvNodeConfigPush",
593+
(NodeId, senderNodeId),
594+
(Cookie, ev->Cookie),
595+
(SessionId, ev->InterconnectSession),
596+
(Binding, Binding),
597+
(Record, record),
598+
(RootNodeId, GetRootNodeId()),
599+
(StorageConfigGeneration, StorageConfig ? (i64)StorageConfig->GetGeneration() : -1),
600+
(KnownNode, knownNode));
601+
602+
if (!knownNode) {
570603
// node has been already deleted from the config, but new subscription is coming through -- ignoring it
571-
SendEvent(*ev, TEvNodeConfigReversePush::MakeRejected(configToPeer));
604+
SendEvent(*ev, TEvNodeConfigReversePush::MakeRejected(nullptr));
572605
return;
573606
}
574607

@@ -577,7 +610,7 @@ namespace NKikimr::NStorage {
577610
STLOG(PRI_DEBUG, BS_NODE, NWDC28, "TEvNodeConfigPush rejected", (NodeId, senderNodeId),
578611
(Cookie, ev->Cookie), (SessionId, ev->InterconnectSession), (Binding, Binding),
579612
(Record, record));
580-
SendEvent(*ev, TEvNodeConfigReversePush::MakeRejected(configToPeer));
613+
SendEvent(*ev, TEvNodeConfigReversePush::MakeRejected(nullptr));
581614
return;
582615
}
583616

@@ -589,7 +622,7 @@ namespace NKikimr::NStorage {
589622
// nodes AND this is the root one
590623
} else {
591624
// this is either not the root node, or no quorum for connection
592-
auto response = TEvNodeConfigReversePush::MakeRejected(configToPeer);
625+
auto response = TEvNodeConfigReversePush::MakeRejected(nullptr);
593626
if (Binding && Binding->RootNodeId) {
594627
// command peer to join this specific node
595628
response->Record.SetRootNodeId(Binding->RootNodeId);

0 commit comments

Comments
 (0)