Skip to content

Commit f75c208

Browse files
authored
stable-25-3-1: Positioning between zones if mirroring with gaps (#28116)
2 parents b7e95cb + 01c640c commit f75c208

File tree

3 files changed

+112
-37
lines changed

3 files changed

+112
-37
lines changed

ydb/core/persqueue/pqtablet/partition/partition.cpp

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -700,6 +700,12 @@ void TPartition::InitComplete(const TActorContext& ctx) {
700700
for (const auto& s : SourceIdStorage.GetInMemorySourceIds()) {
701701
ss << "SYNC INIT sourceId " << s.first << " seqNo " << s.second.SeqNo << " offset " << s.second.Offset << "\n";
702702
}
703+
for (const auto& h : CompactionBlobEncoder.DataKeysBody) {
704+
ss << "SYNC INIT DATA KEY: " << h.Key.ToString() << " size " << h.Size << "\n";
705+
}
706+
for (const auto& h : CompactionBlobEncoder.HeadKeys) {
707+
ss << "SYNC INIT HEAD KEY: " << h.Key.ToString() << " size " << h.Size << "\n";
708+
}
703709
for (const auto& h : BlobEncoder.DataKeysBody) {
704710
ss << "SYNC INIT DATA KEY: " << h.Key.ToString() << " size " << h.Size << "\n";
705711
}
@@ -1121,6 +1127,10 @@ void TPartition::LogAndCollectError(NKikimrServices::EServiceKikimr service, con
11211127

11221128
const TPartitionBlobEncoder& TPartition::GetBlobEncoder(ui64 offset) const
11231129
{
1130+
if ((offset >= CompactionBlobEncoder.EndOffset) && (offset < BlobEncoder.StartOffset)) {
1131+
offset = BlobEncoder.StartOffset;
1132+
}
1133+
11241134
if (BlobEncoder.DataKeysBody.empty()) {
11251135
return CompactionBlobEncoder;
11261136
}
@@ -1151,17 +1161,24 @@ TInstant TPartition::GetWriteTimeEstimate(ui64 offset) const {
11511161
}
11521162

11531163
const TPartitionBlobEncoder& blobEncoder = GetBlobEncoder(offset);
1164+
offset = Max(offset, blobEncoder.StartOffset);
11541165
const std::deque<TDataKey>& container = GetContainer(blobEncoder, offset);
1155-
PQ_ENSURE(!container.empty());
1166+
PQ_ENSURE(!container.empty())
1167+
("offset", offset)
1168+
("cz.StartOffset", CompactionBlobEncoder.StartOffset)("cz.EndOffset", CompactionBlobEncoder.EndOffset)
1169+
("fwz.StartOffset", BlobEncoder.StartOffset)("fwz.EndOffset", BlobEncoder.EndOffset)
1170+
;
11561171

11571172
auto it = std::upper_bound(container.begin(), container.end(), offset,
11581173
[](const ui64 offset, const TDataKey& p) {
11591174
return offset < p.Key.GetOffset() ||
11601175
offset == p.Key.GetOffset() && p.Key.GetPartNo() > 0;
11611176
});
11621177
// Always greater
1163-
PQ_ENSURE(it != container.begin())("StartOffset", blobEncoder.StartOffset)("HeadOffset", blobEncoder.Head.Offset)
1164-
("offset", offset)("containter size", container.size())("first-elem", container.front().Key.ToString())
1178+
PQ_ENSURE(it != container.begin())
1179+
("StartOffset", blobEncoder.StartOffset)("HeadOffset", blobEncoder.Head.Offset)
1180+
("offset", offset)
1181+
("containter size", container.size())("first-elem", container.front().Key.ToString())
11651182
("is-fast-write", blobEncoder.ForFastWrite);
11661183
PQ_ENSURE(it == container.end() ||
11671184
offset < it->Key.GetOffset() ||

ydb/core/persqueue/pqtablet/partition/partition_read.cpp

Lines changed: 32 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -490,15 +490,6 @@ TMaybe<TReadAnswer> TReadInfo::AddBlobsFromBody(const TVector<NPQ::TRequestedBlo
490490
TClientBlob &res = batch.Blobs[i];
491491
VERIFY_RESULT_BLOB(res, i);
492492

493-
Y_ABORT_UNLESS(PartNo == res.GetPartNo(), "%s",
494-
(TStringBuilder() <<
495-
"\npos=" << pos <<
496-
"\ni=" << i <<
497-
"\nOffset=" << Offset <<
498-
"\nPartNo=" << PartNo <<
499-
"\noffset=" << offset <<
500-
"\npartNo=" << res.GetPartNo()
501-
).data());
502493
AFL_ENSURE(PartNo == res.GetPartNo())("pos", pos)("i", i)("Offset", Offset)("PartNo", PartNo)("offset", offset)("partNo", res.GetPartNo());
503494

504495
if (userInfo) {
@@ -641,29 +632,34 @@ TReadAnswer TReadInfo::FormAnswer(
641632

642633
AddResultBlob(readResult, writeBlob, Offset);
643634
if (writeBlob.IsLastPart()) {
635+
PartNo = 0;
644636
++Offset;
637+
} else {
638+
++PartNo;
645639
}
646640
if (updateUsage(writeBlob)) {
647641
break;
648642
}
649643
}
650644
}
651645

652-
readAnswer = AddBlobsFromBody(blobs,
653-
CompactedBlobsCount, blobs.size(),
654-
userInfo,
655-
startOffset,
656-
endOffset,
657-
sizeLag,
658-
tablet,
659-
realReadOffset,
660-
readResult,
661-
answer,
662-
needStop,
663-
cnt, size, lastBlobSize,
664-
ctx);
665-
if (readAnswer) {
666-
return std::move(*readAnswer);
646+
if (!needStop && cnt < Count && size < Size) { // body blobs are fully processed and need to take more data
647+
readAnswer = AddBlobsFromBody(blobs,
648+
CompactedBlobsCount, blobs.size(),
649+
userInfo,
650+
startOffset,
651+
endOffset,
652+
sizeLag,
653+
tablet,
654+
realReadOffset,
655+
readResult,
656+
answer,
657+
needStop,
658+
cnt, size, lastBlobSize,
659+
ctx);
660+
if (readAnswer) {
661+
return std::move(*readAnswer);
662+
}
667663
}
668664

669665
AFL_ENSURE(Offset <= (ui64)Max<i64>())("Offset is too big", Offset);
@@ -702,13 +698,14 @@ void TPartition::Handle(TEvPQ::TEvReadTimeout::TPtr& ev, const TActorContext& ct
702698
OnReadRequestFinished(res->Destination, answer.Size, res->User, ctx);
703699
}
704700

705-
void CollectReadRequestFromBody(const ui64 startOffset, const ui16 partNo, const ui32 maxCount,
701+
void CollectReadRequestFromBody(ui64 startOffset, const ui16 partNo, const ui32 maxCount,
706702
const ui32 maxSize, ui32* rcount, ui32* rsize, ui64 lastOffset,
707703
TBlobKeyTokens* blobKeyTokens,
708704
TPartitionBlobEncoder& zone,
709705
TVector<TRequestedBlob>& result)
710706
{
711707
AFL_ENSURE(rcount && rsize);
708+
startOffset = Max(startOffset, zone.DataKeysBody.empty() ? zone.StartOffset : zone.DataKeysBody.front().Key.GetOffset());
712709
auto blobs = zone.GetBlobsFromBody(startOffset,
713710
partNo,
714711
maxCount,
@@ -860,7 +857,7 @@ void TPartition::DoRead(TEvPQ::TEvRead::TPtr&& readEvent, TDuration waitQuotaTim
860857

861858
LOG_D("read cookie " << cookie << " Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition
862859
<< " user " << user
863-
<< " offset " << read->Offset << " count " << read->Count << " size " << read->Size << " endOffset " << GetEndOffset()
860+
<< " offset " << read->Offset << " partno " << read->PartNo << " count " << read->Count << " size " << read->Size << " endOffset " << GetEndOffset()
864861
<< " max time lag " << read->MaxTimeLagMs << "ms effective offset " << offset);
865862

866863
if (offset == GetEndOffset()) {
@@ -1068,14 +1065,6 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u
10681065
GetReadRequestFromCompactedBody(info.Offset, info.PartNo, info.Count, info.Size, &count, &size, info.LastOffset,
10691066
&info.BlobKeyTokens, blobs);
10701067
info.CompactedBlobsCount = blobs.size();
1071-
GetReadRequestFromFastWriteBody(info.Offset, info.PartNo, info.Count, info.Size, &count, &size, info.LastOffset,
1072-
&info.BlobKeyTokens, blobs);
1073-
1074-
info.Blobs = blobs;
1075-
ui64 lastOffset = blobs.empty() ? info.Offset : blobs.back().Key.GetOffset();
1076-
1077-
LOG_D("read cookie " << cookie << " added " << info.Blobs.size()
1078-
<< " blobs, size " << size << " count " << count << " last offset " << lastOffset << ", current partition end offset: " << GetEndOffset());
10791068

10801069
if (blobs.empty() ||
10811070
((info.CompactedBlobsCount > 0) && (blobs[info.CompactedBlobsCount - 1].Key == CompactionBlobEncoder.DataKeysBody.back().Key))) { // read from head only when all blobs from body processed
@@ -1087,6 +1076,15 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u
10871076
info.CachedOffset = insideHeadOffset;
10881077
}
10891078

1079+
GetReadRequestFromFastWriteBody(info.Offset, info.PartNo, info.Count, info.Size, &count, &size, info.LastOffset,
1080+
&info.BlobKeyTokens, blobs);
1081+
1082+
info.Blobs = blobs;
1083+
ui64 lastOffset = blobs.empty() ? info.Offset : blobs.back().Key.GetOffset();
1084+
1085+
LOG_D("read cookie " << cookie << " added " << info.Blobs.size()
1086+
<< " blobs, size " << size << " count " << count << " last offset " << lastOffset << ", current partition end offset: " << GetEndOffset());
1087+
10901088
PQ_ENSURE(info.BlobKeyTokens.Size() == info.Blobs.size());
10911089
if (info.Destination != 0) {
10921090
++userInfo.ActiveReads;

ydb/core/persqueue/ut/pq_ut.cpp

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1291,6 +1291,66 @@ Y_UNIT_TEST(TestWritePQ) {
12911291
TestWritePQImpl(false);
12921292
}
12931293

1294+
Y_UNIT_TEST(Read_From_Different_Zones_What_Was_Written_With_Gaps)
1295+
{
1296+
// The test creates messages in different zones. There are gaps in the offsets between the zones.
1297+
// We check that the client can read from any offset from any zone.
1298+
TTestContext tc;
1299+
RunTestWithReboots(tc.TabletIds, [&]() {
1300+
return tc.InitialEventsFilter.Prepare();
1301+
}, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) {
1302+
activeZone = false;
1303+
TFinalizer finalizer(tc);
1304+
tc.EnableDetailedPQLog = true;
1305+
tc.Prepare(dispatchName, setup, activeZone);
1306+
tc.Runtime->SetScheduledLimit(100);
1307+
1308+
// Important client, lifetimeseconds=0 - never delete
1309+
PQTabletPrepare({.partitions = 1, .storageLimitBytes = 50_MB}, {{"user", true}}, tc);
1310+
1311+
TVector<std::pair<ui64, TString>> data;
1312+
1313+
data.emplace_back(1, TString(1'000, 'x'));
1314+
1315+
// CompactZone.Body
1316+
CmdWrite(0, "sourceid", data, tc, false, {}, true, "", -1, 100);
1317+
++data[0].first;
1318+
data[0].second = TString(7'000'000, 'x');
1319+
CmdWrite(0, "sourceid", data, tc, false, {}, true, "", -1, 101);
1320+
1321+
CmdRunCompaction(0, tc);
1322+
1323+
// CompactZone.Head
1324+
++data[0].first;
1325+
data[0].second = TString(1'000, 'x');
1326+
CmdWrite(0, "sourceid", data, tc, false, {}, true, "", -1, 200);
1327+
++data[0].first;
1328+
CmdWrite(0, "sourceid", data, tc, false, {}, true, "", -1, 201);
1329+
1330+
CmdRunCompaction(0, tc);
1331+
1332+
// FastWriteZone.Body
1333+
++data[0].first;
1334+
CmdWrite(0, "sourceid", data, tc, false, {}, true, "", -1, 300);
1335+
++data[0].first;
1336+
CmdWrite(0, "sourceid", data, tc, false, {}, true, "", -1, 301);
1337+
1338+
PQGetPartInfo(100, 302, tc);
1339+
1340+
CmdRead(0, 102, Max<i32>(), Max<i32>(), 4, false, tc, {200, 201, 300, 301});
1341+
CmdRead(0, 202, Max<i32>(), Max<i32>(), 2, false, tc, {300, 301});
1342+
1343+
// The client has committed an offset between the zones
1344+
CmdSetOffset(0, "user", 103, false, tc);
1345+
PQTabletRestart(tc);
1346+
1347+
CmdSetOffset(0, "user", 203, false, tc);
1348+
PQTabletRestart(tc);
1349+
1350+
CmdRead(0, 102, Max<i32>(), Max<i32>(), 4, false, tc, {200, 201, 300, 301});
1351+
CmdRead(0, 202, Max<i32>(), Max<i32>(), 2, false, tc, {300, 301});
1352+
});
1353+
}
12941354

12951355
Y_UNIT_TEST(TestSourceIdDropByUserWrites) {
12961356
TTestContext tc;

0 commit comments

Comments
 (0)