@@ -5162,6 +5162,126 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorConsistency) {
51625162 );
51635163 }
51645164
5165+ Y_UNIT_TEST (WriteLockUncommittedConflictFailure) {
5166+ TPortManager pm;
5167+ TServerSettings serverSettings (pm.GetPort (2134 ));
5168+ serverSettings.SetDomainName (" Root" )
5169+ .SetNodeCount (1 )
5170+ .SetUseRealThreads (false );
5171+
5172+ // The bug requires restoring lock from persistent storage
5173+ serverSettings.FeatureFlags .SetEnableDataShardInMemoryStateMigration (false );
5174+
5175+ TServer::TPtr server = new TServer (serverSettings);
5176+
5177+ auto & runtime = *server->GetRuntime ();
5178+ auto sender = runtime.AllocateEdgeActor ();
5179+
5180+ runtime.SetLogPriority (NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
5181+
5182+ InitRoot (server, sender);
5183+
5184+ TDisableDataShardLogBatching disableDataShardLogBatching;
5185+
5186+ UNIT_ASSERT_VALUES_EQUAL (
5187+ KqpSchemeExec (runtime, R"(
5188+ CREATE TABLE `/Root/table` (key int, value int, PRIMARY KEY (key));
5189+ )" ),
5190+ " SUCCESS"
5191+ );
5192+
5193+ const auto shards = GetTableShards (server, sender, " /Root/table" );
5194+ UNIT_ASSERT_VALUES_EQUAL (shards.size (), 1u );
5195+
5196+ // Populate tables with initial values
5197+ ExecSQL (server, sender, R"(
5198+ UPSERT INTO `/Root/table` (key, value) VALUES
5199+ (1, 1001),
5200+ (2, 1002);
5201+ )" );
5202+
5203+ // Block reads so read locks are not established yet
5204+ TBlockEvents<TEvDataShard::TEvRead> blockedReads (runtime);
5205+
5206+ // Begin the 1st transaction upserting to and reading from table
5207+ TString session1, tx1;
5208+ auto future1 = KqpSimpleBeginSend (runtime, session1, R"(
5209+ UPSERT INTO `/Root/table` (key, value) VALUES (3, 3003);
5210+ SELECT key, value FROM `/Root/table` ORDER BY key;
5211+ )" );
5212+
5213+ // Begin the 2nd transaction upserting to and reading from table
5214+ TString session2, tx2;
5215+ auto future2 = KqpSimpleBeginSend (runtime, session2, R"(
5216+ UPSERT INTO `/Root/table` (key, value) VALUES (4, 4004);
5217+ SELECT key, value FROM `/Root/table` ORDER BY key;
5218+ )" );
5219+
5220+ // Wait until both transactions are stuck at their read phases
5221+ runtime.WaitFor (" blocked TEvRead x2" , [&]{ return blockedReads.size () >= 2 ; });
5222+
5223+ // Force read iterators to read rows one-by-one
5224+ auto changeMaxRowsObserver = runtime.AddObserver <TEvDataShard::TEvRead>(
5225+ [&](auto & ev) {
5226+ auto * msg = ev->Get ();
5227+ msg->Record .SetMaxRowsInResult (1 );
5228+ });
5229+
5230+ // Block TEvReadContinue so it stops after the first result
5231+ TBlockEvents<TEvDataShard::TEvReadContinue> blockedReadContinue (runtime);
5232+
5233+ // Unblock reads and wait for corresponding TEvReadContinue
5234+ blockedReads.Stop ().Unblock ();
5235+
5236+ runtime.WaitFor (" blocked TEvReadContinue x2" , [&]{ return blockedReadContinue.size () >= 2 ; });
5237+ runtime.SimulateSleep (TDuration::Seconds (1 ));
5238+
5239+ // Block further commits
5240+ TBlockEvents<TEvBlobStorage::TEvPut> blockedCommits (runtime, [&](auto & ev) {
5241+ auto * msg = ev->Get ();
5242+ if (msg->Id .Channel () == 0 && msg->Id .TabletID () == shards.at (0 )) {
5243+ Cerr << " ... blocking commit " << msg->Id << Endl;
5244+ return true ;
5245+ }
5246+ return false ;
5247+ });
5248+
5249+ // Unblock TEvReadContinue and let kqp potentially receive results
5250+ blockedReadContinue.Stop ().Unblock ();
5251+ runtime.SimulateSleep (TDuration::Seconds (1 ));
5252+
5253+ // Stop blocking commits and make them fail (will cause shards to restart)
5254+ blockedCommits.Stop ();
5255+ for (auto & ev : blockedCommits) {
5256+ auto proxy = ev->Recipient ;
5257+ ui32 groupId = GroupIDFromBlobStorageProxyID (proxy);
5258+ auto response = ev->Get ()->MakeErrorResponse (NKikimrProto::ERROR, " Something went wrong" , TGroupId::FromValue (groupId));
5259+ runtime.Send (new IEventHandle (ev->Sender , proxy, response.release ()), 0 , true );
5260+ }
5261+ runtime.SimulateSleep (TDuration::Seconds (1 ));
5262+
5263+ UNIT_ASSERT_VALUES_EQUAL (
5264+ KqpSimpleBeginWait (runtime, tx1, std::move (future1)),
5265+ " { items { int32_value: 1 } items { int32_value: 1001 } }, "
5266+ " { items { int32_value: 2 } items { int32_value: 1002 } }, "
5267+ " { items { int32_value: 3 } items { int32_value: 3003 } }" );
5268+ UNIT_ASSERT_VALUES_EQUAL (
5269+ KqpSimpleBeginWait (runtime, tx2, std::move (future2)),
5270+ " { items { int32_value: 1 } items { int32_value: 1001 } }, "
5271+ " { items { int32_value: 2 } items { int32_value: 1002 } }, "
5272+ " { items { int32_value: 4 } items { int32_value: 4004 } }" );
5273+
5274+ // Commit the 1st transaction, it should succeed
5275+ UNIT_ASSERT_VALUES_EQUAL (
5276+ KqpSimpleCommit (runtime, session1, tx1, R"( SELECT 1)" ),
5277+ " { items { int32_value: 1 } }" );
5278+
5279+ // Try to commit the 2nd transaction, it must fail
5280+ UNIT_ASSERT_VALUES_EQUAL (
5281+ KqpSimpleCommit (runtime, session2, tx2, R"( SELECT 1)" ),
5282+ " ERROR: ABORTED" );
5283+ }
5284+
51655285}
51665286
51675287Y_UNIT_TEST_SUITE (DataShardReadIteratorLatency) {
0 commit comments