Skip to content

Commit 4869f88

Browse files
authored
25-3: Fix successful reads not always waiting for read conflicts to persist (#28060)
2 parents d5d2006 + 44e1aad commit 4869f88

File tree

3 files changed

+132
-3
lines changed

3 files changed

+132
-3
lines changed

ydb/core/tx/datashard/datashard__read_iterator.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2953,7 +2953,8 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase
29532953

29542954
ApplyLocks(ctx);
29552955

2956-
if (Reader->NeedVolatileWaitForCommit() ||
2956+
if (txc.DB.HasChanges() ||
2957+
Reader->NeedVolatileWaitForCommit() ||
29572958
Self->Pipeline.HasCommittingOpsBelow(state.ReadVersion) ||
29582959
Self->GetVolatileTxManager().HasUnstableVolatileTxsAtSnapshot(state.ReadVersion))
29592960
{

ydb/core/tx/datashard/datashard_ut_common_kqp.h

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -211,9 +211,13 @@ namespace NKqpHelpers {
211211
return KqpSimpleBeginWait(runtime, txId, KqpSimpleBeginSend(runtime, sessionId, query, database));
212212
}
213213

214-
inline TString KqpSimpleContinue(TTestActorRuntime& runtime, const TString& sessionId, const TString& txId, const TString& query, const TString& database = {}) {
214+
inline auto KqpSimpleContinueSend(TTestActorRuntime& runtime, const TString& sessionId, const TString& txId, const TString& query, const TString& database = {}) {
215215
Y_ENSURE(!txId.empty(), "continue on empty transaction");
216-
auto response = AwaitResponse(runtime, SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, false /* commitTx */), database));
216+
return SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, false /* commitTx */), database);
217+
}
218+
219+
inline TString KqpSimpleContinueWait(TTestActorRuntime& runtime, const TString& txId, NThreading::TFuture<Ydb::Table::ExecuteDataQueryResponse> future) {
220+
auto response = AwaitResponse(runtime, std::move(future));
217221
if (response.operation().status() != Ydb::StatusIds::SUCCESS) {
218222
return TStringBuilder() << "ERROR: " << response.operation().status();
219223
}
@@ -223,6 +227,10 @@ namespace NKqpHelpers {
223227
return FormatResult(result);
224228
}
225229

230+
inline TString KqpSimpleContinue(TTestActorRuntime& runtime, const TString& sessionId, const TString& txId, const TString& query, const TString& database = {}) {
231+
return KqpSimpleContinueWait(runtime, txId, KqpSimpleContinueSend(runtime, sessionId, txId, query, database));
232+
}
233+
226234
inline auto KqpSimpleSendCommit(TTestActorRuntime& runtime, const TString& sessionId, const TString& txId, const TString& query, const TString& database = {}) {
227235
Y_ENSURE(!txId.empty(), "commit on empty transaction");
228236
return SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, true /* commitTx */), database);

ydb/core/tx/datashard/datashard_ut_read_iterator.cpp

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5162,6 +5162,126 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorConsistency) {
51625162
);
51635163
}
51645164

5165+
Y_UNIT_TEST(WriteLockUncommittedConflictFailure) {
5166+
TPortManager pm;
5167+
TServerSettings serverSettings(pm.GetPort(2134));
5168+
serverSettings.SetDomainName("Root")
5169+
.SetNodeCount(1)
5170+
.SetUseRealThreads(false);
5171+
5172+
// The bug requires restoring lock from persistent storage
5173+
serverSettings.FeatureFlags.SetEnableDataShardInMemoryStateMigration(false);
5174+
5175+
TServer::TPtr server = new TServer(serverSettings);
5176+
5177+
auto& runtime = *server->GetRuntime();
5178+
auto sender = runtime.AllocateEdgeActor();
5179+
5180+
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
5181+
5182+
InitRoot(server, sender);
5183+
5184+
TDisableDataShardLogBatching disableDataShardLogBatching;
5185+
5186+
UNIT_ASSERT_VALUES_EQUAL(
5187+
KqpSchemeExec(runtime, R"(
5188+
CREATE TABLE `/Root/table` (key int, value int, PRIMARY KEY (key));
5189+
)"),
5190+
"SUCCESS"
5191+
);
5192+
5193+
const auto shards = GetTableShards(server, sender, "/Root/table");
5194+
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u);
5195+
5196+
// Populate tables with initial values
5197+
ExecSQL(server, sender, R"(
5198+
UPSERT INTO `/Root/table` (key, value) VALUES
5199+
(1, 1001),
5200+
(2, 1002);
5201+
)");
5202+
5203+
// Block reads so read locks are not established yet
5204+
TBlockEvents<TEvDataShard::TEvRead> blockedReads(runtime);
5205+
5206+
// Begin the 1st transaction upserting to and reading from table
5207+
TString session1, tx1;
5208+
auto future1 = KqpSimpleBeginSend(runtime, session1, R"(
5209+
UPSERT INTO `/Root/table` (key, value) VALUES (3, 3003);
5210+
SELECT key, value FROM `/Root/table` ORDER BY key;
5211+
)");
5212+
5213+
// Begin the 2nd transaction upserting to and reading from table
5214+
TString session2, tx2;
5215+
auto future2 = KqpSimpleBeginSend(runtime, session2, R"(
5216+
UPSERT INTO `/Root/table` (key, value) VALUES (4, 4004);
5217+
SELECT key, value FROM `/Root/table` ORDER BY key;
5218+
)");
5219+
5220+
// Wait until both transactions are stuck at their read phases
5221+
runtime.WaitFor("blocked TEvRead x2", [&]{ return blockedReads.size() >= 2; });
5222+
5223+
// Force read iterators to read rows one-by-one
5224+
auto changeMaxRowsObserver = runtime.AddObserver<TEvDataShard::TEvRead>(
5225+
[&](auto& ev) {
5226+
auto* msg = ev->Get();
5227+
msg->Record.SetMaxRowsInResult(1);
5228+
});
5229+
5230+
// Block TEvReadContinue so it stops after the first result
5231+
TBlockEvents<TEvDataShard::TEvReadContinue> blockedReadContinue(runtime);
5232+
5233+
// Unblock reads and wait for corresponding TEvReadContinue
5234+
blockedReads.Stop().Unblock();
5235+
5236+
runtime.WaitFor("blocked TEvReadContinue x2", [&]{ return blockedReadContinue.size() >= 2; });
5237+
runtime.SimulateSleep(TDuration::Seconds(1));
5238+
5239+
// Block further commits
5240+
TBlockEvents<TEvBlobStorage::TEvPut> blockedCommits(runtime, [&](auto& ev) {
5241+
auto* msg = ev->Get();
5242+
if (msg->Id.Channel() == 0 && msg->Id.TabletID() == shards.at(0)) {
5243+
Cerr << "... blocking commit " << msg->Id << Endl;
5244+
return true;
5245+
}
5246+
return false;
5247+
});
5248+
5249+
// Unblock TEvReadContinue and let kqp potentially receive results
5250+
blockedReadContinue.Stop().Unblock();
5251+
runtime.SimulateSleep(TDuration::Seconds(1));
5252+
5253+
// Stop blocking commits and make them fail (will cause shards to restart)
5254+
blockedCommits.Stop();
5255+
for (auto& ev : blockedCommits) {
5256+
auto proxy = ev->Recipient;
5257+
ui32 groupId = GroupIDFromBlobStorageProxyID(proxy);
5258+
auto response = ev->Get()->MakeErrorResponse(NKikimrProto::ERROR, "Something went wrong", TGroupId::FromValue(groupId));
5259+
runtime.Send(new IEventHandle(ev->Sender, proxy, response.release()), 0, true);
5260+
}
5261+
runtime.SimulateSleep(TDuration::Seconds(1));
5262+
5263+
UNIT_ASSERT_VALUES_EQUAL(
5264+
KqpSimpleBeginWait(runtime, tx1, std::move(future1)),
5265+
"{ items { int32_value: 1 } items { int32_value: 1001 } }, "
5266+
"{ items { int32_value: 2 } items { int32_value: 1002 } }, "
5267+
"{ items { int32_value: 3 } items { int32_value: 3003 } }");
5268+
UNIT_ASSERT_VALUES_EQUAL(
5269+
KqpSimpleBeginWait(runtime, tx2, std::move(future2)),
5270+
"{ items { int32_value: 1 } items { int32_value: 1001 } }, "
5271+
"{ items { int32_value: 2 } items { int32_value: 1002 } }, "
5272+
"{ items { int32_value: 4 } items { int32_value: 4004 } }");
5273+
5274+
// Commit the 1st transaction, it should succeed
5275+
UNIT_ASSERT_VALUES_EQUAL(
5276+
KqpSimpleCommit(runtime, session1, tx1, R"(SELECT 1)"),
5277+
"{ items { int32_value: 1 } }");
5278+
5279+
// Try to commit the 2nd transaction, it must fail
5280+
UNIT_ASSERT_VALUES_EQUAL(
5281+
KqpSimpleCommit(runtime, session2, tx2, R"(SELECT 1)"),
5282+
"ERROR: ABORTED");
5283+
}
5284+
51655285
}
51665286

51675287
Y_UNIT_TEST_SUITE(DataShardReadIteratorLatency) {

0 commit comments

Comments
 (0)