Skip to content

Commit fae0cba

Browse files
[+] Changes for Ya.Metrika #26244
1 parent 44da537 commit fae0cba

File tree

2 files changed

+19
-20
lines changed

2 files changed

+19
-20
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1466,7 +1466,7 @@ TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx)
14661466
if (auto inFlightIter = TxInflightMaxSeqNoPerSourceId.find(s.first); !inFlightIter.IsEnd()) {
14671467
if (SeqnoViolation(inFlightIter->second.KafkaProducerEpoch, inFlightIter->second.SeqNo, s.second.ProducerEpoch, s.second.MinSeqNo)) {
14681468
tx.Predicate = false;
1469-
tx.Message = (MakeTxWriteErrorMessage(tx.GetTxId(), TopicName(), Partition, s.first, existing->second.SeqNo) <<
1469+
tx.Message = (MakeTxWriteErrorMessage(tx.GetTxId(), TopicName(), Partition, s.first, inFlightIter->second.SeqNo) <<
14701470
"MinSeqNo violation failure. " <<
14711471
"SeqNo " << s.second.MinSeqNo);
14721472
tx.WriteInfoApplied = true;
@@ -1477,7 +1477,9 @@ TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx)
14771477
if (auto existing = knownSourceIds.find(s.first); !existing.IsEnd()) {
14781478
if (SeqnoViolation(existing->second.ProducerEpoch, existing->second.SeqNo, s.second.ProducerEpoch, s.second.MinSeqNo)) {
14791479
tx.Predicate = false;
1480-
tx.Message = TStringBuilder() << "MinSeqNo violation failure on " << s.first;
1480+
tx.Message = (MakeTxWriteErrorMessage(tx.GetTxId(), TopicName(), Partition, s.first, existing->second.SeqNo) <<
1481+
"MinSeqNo violation failure. " <<
1482+
"SeqNo " << s.second.MinSeqNo);
14811483
tx.WriteInfoApplied = true;
14821484
break;
14831485
}

ydb/core/persqueue/partition_init.cpp

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -532,29 +532,26 @@ enum EKeyPosition {
532532
// Calculates the location of keys relative to each other
533533
static EKeyPosition KeyPosition(const TKey& lhs, const TKey& rhs)
534534
{
535-
// Called from FilterBlobsMetaData. The keys are pre-sorted
536-
Y_ABORT_UNLESS(lhs.GetOffset() <= rhs.GetOffset(),
537-
"lhs: %s, rhs: %s",
538-
lhs.ToString().data(), rhs.ToString().data());
539-
540535
if (lhs.GetOffset() == rhs.GetOffset()) {
541536
if (lhs.GetPartNo() == rhs.GetPartNo()) {
542-
Y_ABORT_UNLESS(lhs.GetCount() <= rhs.GetCount(),
543-
"lhs: %s, rhs: %s",
544-
lhs.ToString().data(), rhs.ToString().data());
545-
return RhsContainsLhs;
537+
if (lhs.GetCount() < rhs.GetCount()) {
538+
return RhsContainsLhs;
539+
} else if (lhs.GetCount() == rhs.GetCount()) {
540+
if (lhs.GetInternalPartsCount() < rhs.GetInternalPartsCount()) {
541+
return RhsContainsLhs;
542+
} else {
543+
return LhsContainsRhs;
544+
}
545+
} else {
546+
return LhsContainsRhs;
547+
}
548+
} else if (lhs.GetPartNo() > rhs.GetPartNo()) {
549+
return LhsContainsRhs;
550+
} else {
551+
return RhsAfterLhs;
546552
}
547-
548-
// case lhs.GetOffset() == rhs.GetOffset() && lhs.GetPartNo() < rhs.GetPartNo()
549-
Y_ABORT_UNLESS(lhs.GetPartNo() + lhs.GetInternalPartsCount() <= rhs.GetPartNo(),
550-
"lhs: %s, rhs: %s",
551-
lhs.ToString().data(), rhs.ToString().data());
552-
553-
return RhsAfterLhs;
554553
}
555554

556-
// case lhs.GetOffset() < rhs.GetOffset()
557-
558555
if (ui64 nextOffset = lhs.GetOffset() + lhs.GetCount(); nextOffset > rhs.GetOffset()) {
559556
return LhsContainsRhs;
560557
} else if (nextOffset == rhs.GetOffset()) {

0 commit comments

Comments
 (0)