Skip to content

Commit 347be50

Browse files
authored
Changes for Ya.Metrika (#28381)
2 parents 2afd157 + fae0cba commit 347be50

File tree

11 files changed

+137
-57
lines changed

11 files changed

+137
-57
lines changed

ydb/core/persqueue/events/internal.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -861,18 +861,21 @@ struct TEvPQ {
861861
};
862862

863863
struct TEvTxCalcPredicateResult : public TEventLocal<TEvTxCalcPredicateResult, EvTxCalcPredicateResult> {
864-
TEvTxCalcPredicateResult(ui64 step, ui64 txId, const NPQ::TPartitionId& partition, TMaybe<bool> predicate) :
864+
TEvTxCalcPredicateResult(ui64 step, ui64 txId, const NPQ::TPartitionId& partition, TMaybe<bool> predicate,
865+
const TString& issueMsg) :
865866
Step(step),
866867
TxId(txId),
867868
Partition(partition),
868-
Predicate(predicate)
869+
Predicate(predicate),
870+
IssueMsg(issueMsg)
869871
{
870872
}
871873

872874
ui64 Step;
873875
ui64 TxId;
874876
NPQ::TPartitionId Partition;
875877
TMaybe<bool> Predicate;
878+
TString IssueMsg;
876879
};
877880

878881
struct TEvProposePartitionConfig : public TEventLocal<TEvProposePartitionConfig, EvProposePartitionConfig> {

ydb/core/persqueue/partition.cpp

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,24 @@ static const ui32 MAX_KEYS = 10000;
125125
static const ui32 MAX_TXS = 1000;
126126
static const ui32 MAX_WRITE_CYCLE_SIZE = 16_MB;
127127

128+
TStringBuilder MakeTxWriteErrorMessage(TMaybe<ui64> txId,
129+
TStringBuf topicName, const TPartitionId& partitionId,
130+
TStringBuf sourceId, ui64 seqNo)
131+
{
132+
TStringBuilder ss;
133+
ss << "[TxId: " << txId << ", Topic: '" << topicName << "', Partition " << partitionId << ", SourceId '" << EscapeC(sourceId) << "', SeqNo " << seqNo << "] ";
134+
return ss;
135+
}
136+
137+
TStringBuilder MakeTxReadErrorMessage(TMaybe<ui64> txId,
138+
TStringBuf topicName, const TPartitionId& partitionId,
139+
TStringBuf consumer)
140+
{
141+
TStringBuilder ss;
142+
ss << "[TxId: " << txId << ", Topic: '" << topicName << "', Partition " << partitionId << ", Consumer '" << consumer << "] ";
143+
return ss;
144+
}
145+
128146
auto GetStepAndTxId(ui64 step, ui64 txId)
129147
{
130148
return std::make_pair(step, txId);
@@ -1205,7 +1223,8 @@ void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvTxCalcPredicate>
12051223
MakeHolder<TEvPQ::TEvTxCalcPredicateResult>(ev->Step,
12061224
ev->TxId,
12071225
Partition,
1208-
Nothing()).Release());
1226+
Nothing(),
1227+
TString()).Release());
12091228
return;
12101229
}
12111230
}
@@ -1447,7 +1466,9 @@ TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx)
14471466
if (auto inFlightIter = TxInflightMaxSeqNoPerSourceId.find(s.first); !inFlightIter.IsEnd()) {
14481467
if (SeqnoViolation(inFlightIter->second.KafkaProducerEpoch, inFlightIter->second.SeqNo, s.second.ProducerEpoch, s.second.MinSeqNo)) {
14491468
tx.Predicate = false;
1450-
tx.Message = TStringBuilder() << "MinSeqNo violation failure on " << s.first;
1469+
tx.Message = (MakeTxWriteErrorMessage(tx.GetTxId(), TopicName(), Partition, s.first, inFlightIter->second.SeqNo) <<
1470+
"MinSeqNo violation failure. " <<
1471+
"SeqNo " << s.second.MinSeqNo);
14511472
tx.WriteInfoApplied = true;
14521473
break;
14531474
}
@@ -1456,7 +1477,9 @@ TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx)
14561477
if (auto existing = knownSourceIds.find(s.first); !existing.IsEnd()) {
14571478
if (SeqnoViolation(existing->second.ProducerEpoch, existing->second.SeqNo, s.second.ProducerEpoch, s.second.MinSeqNo)) {
14581479
tx.Predicate = false;
1459-
tx.Message = TStringBuilder() << "MinSeqNo violation failure on " << s.first;
1480+
tx.Message = (MakeTxWriteErrorMessage(tx.GetTxId(), TopicName(), Partition, s.first, existing->second.SeqNo) <<
1481+
"MinSeqNo violation failure. " <<
1482+
"SeqNo " << s.second.MinSeqNo);
14601483
tx.WriteInfoApplied = true;
14611484
break;
14621485
}
@@ -1511,6 +1534,8 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorConte
15111534
void TPartition::ReplyToProposeOrPredicate(TSimpleSharedPtr<TTransaction>& tx, bool isPredicate) {
15121535

15131536
if (isPredicate) {
1537+
TabletCounters.Percentile()[COUNTER_LATENCY_PQ_TXCALCPREDICATE].IncrementFor((Now() - tx->CalcPredicateTimestamp).MilliSeconds());
1538+
15141539
tx->CalcPredicateSpan.End();
15151540
tx->CalcPredicateSpan = {};
15161541

@@ -1527,7 +1552,8 @@ void TPartition::ReplyToProposeOrPredicate(TSimpleSharedPtr<TTransaction>& tx, b
15271552
MakeHolder<TEvPQ::TEvTxCalcPredicateResult>(tx->Tx->Step,
15281553
tx->Tx->TxId,
15291554
Partition,
1530-
*tx->Predicate).Release());
1555+
*tx->Predicate,
1556+
tx->Message).Release());
15311557
} else {
15321558
auto insRes = TransactionsInflight.emplace(tx->ProposeConfig->TxId, tx);
15331559
Y_ABORT_UNLESS(insRes.second);
@@ -2022,7 +2048,7 @@ void TPartition::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext&
20222048

20232049
void TPartition::PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> event)
20242050
{
2025-
UserActionAndTransactionEvents.emplace_back(MakeSimpleShared<TTransaction>(std::move(event)));
2051+
UserActionAndTransactionEvents.emplace_back(MakeSimpleShared<TTransaction>(std::move(event), Now()));
20262052
RequestWriteInfoIfRequired();
20272053
}
20282054

@@ -2459,7 +2485,7 @@ TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimple
24592485
ReplyToProposeOrPredicate(t, true);
24602486
return EProcessResult::Continue;
24612487
}
2462-
result = BeginTransaction(*t->Tx, t->Predicate);
2488+
result = BeginTransaction(*t->Tx, t->Predicate, t->Message);
24632489
if (t->Predicate.Defined()) {
24642490
ReplyToProposeOrPredicate(t, true);
24652491
}
@@ -2527,7 +2553,8 @@ bool TPartition::ExecUserActionOrTransaction(TSimpleSharedPtr<TTransaction>& t,
25272553
return true;
25282554
}
25292555

2530-
TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx, TMaybe<bool>& predicateOut)
2556+
TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx,
2557+
TMaybe<bool>& predicateOut, TString& issueMsg)
25312558
{
25322559
if (tx.ForcePredicateFalse) {
25332560
predicateOut = false;
@@ -2550,13 +2577,17 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr
25502577
if (AffectedUsers.contains(consumer) && !GetPendingUserIfExists(consumer)) {
25512578
PQ_LOG_D("Partition " << Partition <<
25522579
" Consumer '" << consumer << "' has been removed");
2580+
issueMsg = (MakeTxReadErrorMessage(tx.TxId, TopicName(), Partition, consumer) <<
2581+
"Consumer has been removed");
25532582
result = false;
25542583
break;
25552584
}
25562585

25572586
if (!UsersInfoStorage->GetIfExists(consumer)) {
25582587
PQ_LOG_D("Partition " << Partition <<
25592588
" Unknown consumer '" << consumer << "'");
2589+
issueMsg = (MakeTxReadErrorMessage(tx.TxId, TopicName(), Partition, consumer) <<
2590+
"Unknown consumer");
25602591
result = false;
25612592
break;
25622593
}
@@ -2585,20 +2616,32 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr
25852616
" Bad request (invalid range) " <<
25862617
" Begin " << operation.GetCommitOffsetsBegin() <<
25872618
" End " << operation.GetCommitOffsetsEnd());
2619+
issueMsg = (MakeTxReadErrorMessage(tx.TxId, TopicName(), Partition, consumer) <<
2620+
"Invalid range. " <<
2621+
"Range begin " << operation.GetCommitOffsetsBegin() <<
2622+
", range end " << operation.GetCommitOffsetsEnd());
25882623
result = false;
25892624
} else if (!operation.GetForceCommit() && userInfo.Offset != (i64)operation.GetCommitOffsetsBegin()) {
25902625
PQ_LOG_D("Partition " << Partition <<
25912626
" Consumer '" << consumer << "'" <<
25922627
" Bad request (gap) " <<
25932628
" Offset " << userInfo.Offset <<
25942629
" Begin " << operation.GetCommitOffsetsBegin());
2630+
issueMsg = (MakeTxReadErrorMessage(tx.TxId, TopicName(), Partition, consumer) <<
2631+
"Gap. " <<
2632+
"Offset " << userInfo.Offset <<
2633+
", range begin " << operation.GetCommitOffsetsBegin());
25952634
result = false;
25962635
} else if (!operation.GetForceCommit() && operation.GetCommitOffsetsEnd() > EndOffset) {
25972636
PQ_LOG_D("Partition " << Partition <<
25982637
" Consumer '" << consumer << "'" <<
25992638
" Bad request (behind the last offset) " <<
26002639
" EndOffset " << EndOffset <<
26012640
" End " << operation.GetCommitOffsetsEnd());
2641+
issueMsg = (MakeTxReadErrorMessage(tx.TxId, TopicName(), Partition, consumer) <<
2642+
"Behind the last offset. " <<
2643+
"Partition end offset " << EndOffset <<
2644+
", range end " << operation.GetCommitOffsetsEnd());
26022645
result = false;
26032646
}
26042647

ydb/core/persqueue/partition.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,14 @@ class TPartitionCompaction;
6363

6464
struct TTransaction {
6565

66-
explicit TTransaction(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> tx,
67-
TMaybe<bool> predicate = Nothing())
66+
TTransaction(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> tx,
67+
TInstant calcPredicateTimestamp,
68+
TMaybe<bool> predicate = Nothing())
6869
: Tx(tx)
6970
, Predicate(predicate)
7071
, SupportivePartitionActor(tx->SupportivePartitionActor)
7172
, CalcPredicateSpan(std::move(tx->Span))
73+
, CalcPredicateTimestamp(calcPredicateTimestamp)
7274
{
7375
Y_ABORT_UNLESS(Tx);
7476
}
@@ -128,6 +130,7 @@ struct TTransaction {
128130
NWilson::TSpan CommitSpan;
129131

130132
TInstant WriteInfoResponseTimestamp;
133+
TInstant CalcPredicateTimestamp;
131134
};
132135
class TPartitionCompaction;
133136

@@ -808,7 +811,8 @@ class TPartition : public TActorBootstrapped<TPartition> {
808811
void ExecRequest(TSplitMessageGroupMsg& msg, ProcessParameters& parameters);
809812
bool ExecRequest(TWriteMsg& msg, ProcessParameters& parameters, TEvKeyValue::TEvRequest* request);
810813

811-
[[nodiscard]] EProcessResult BeginTransaction(const TEvPQ::TEvTxCalcPredicate& event, TMaybe<bool>& predicate);
814+
[[nodiscard]] EProcessResult BeginTransaction(const TEvPQ::TEvTxCalcPredicate& event,
815+
TMaybe<bool>& predicate, TString& issueMsg);
812816

813817
EProcessResult ApplyWriteInfoResponse(TTransaction& tx);
814818

ydb/core/persqueue/partition_init.cpp

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -532,29 +532,26 @@ enum EKeyPosition {
532532
// Calculates the location of keys relative to each other
533533
static EKeyPosition KeyPosition(const TKey& lhs, const TKey& rhs)
534534
{
535-
// Called from FilterBlobsMetaData. The keys are pre-sorted
536-
Y_ABORT_UNLESS(lhs.GetOffset() <= rhs.GetOffset(),
537-
"lhs: %s, rhs: %s",
538-
lhs.ToString().data(), rhs.ToString().data());
539-
540535
if (lhs.GetOffset() == rhs.GetOffset()) {
541536
if (lhs.GetPartNo() == rhs.GetPartNo()) {
542-
Y_ABORT_UNLESS(lhs.GetCount() <= rhs.GetCount(),
543-
"lhs: %s, rhs: %s",
544-
lhs.ToString().data(), rhs.ToString().data());
545-
return RhsContainsLhs;
537+
if (lhs.GetCount() < rhs.GetCount()) {
538+
return RhsContainsLhs;
539+
} else if (lhs.GetCount() == rhs.GetCount()) {
540+
if (lhs.GetInternalPartsCount() < rhs.GetInternalPartsCount()) {
541+
return RhsContainsLhs;
542+
} else {
543+
return LhsContainsRhs;
544+
}
545+
} else {
546+
return LhsContainsRhs;
547+
}
548+
} else if (lhs.GetPartNo() > rhs.GetPartNo()) {
549+
return LhsContainsRhs;
550+
} else {
551+
return RhsAfterLhs;
546552
}
547-
548-
// case lhs.GetOffset() == rhs.GetOffset() && lhs.GetPartNo() < rhs.GetPartNo()
549-
Y_ABORT_UNLESS(lhs.GetPartNo() + lhs.GetInternalPartsCount() <= rhs.GetPartNo(),
550-
"lhs: %s, rhs: %s",
551-
lhs.ToString().data(), rhs.ToString().data());
552-
553-
return RhsAfterLhs;
554553
}
555554

556-
// case lhs.GetOffset() < rhs.GetOffset()
557-
558555
if (ui64 nextOffset = lhs.GetOffset() + lhs.GetCount(); nextOffset > rhs.GetOffset()) {
559556
return LhsContainsRhs;
560557
} else if (nextOffset == rhs.GetOffset()) {

ydb/core/persqueue/pq_impl.cpp

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3299,7 +3299,7 @@ void TPersQueue::DeleteExpiredTransactions(const TActorContext& ctx)
32993299

33003300
for (auto& [txId, tx] : Txs) {
33013301
if ((tx.MaxStep < step) && (tx.State <= NKikimrPQ::TTransaction::PREPARED)) {
3302-
DeleteTx(tx);
3302+
BeginDeleteTransaction(ctx, tx, NKikimrPQ::TTransaction::EXPIRED);
33033303
}
33043304
}
33053305

@@ -3359,6 +3359,15 @@ void TPersQueue::SetTxInFlyCounter()
33593359
Counters->Simple()[COUNTER_PQ_TABLET_TX_IN_FLY] = Txs.size();
33603360
}
33613361

3362+
void TPersQueue::BeginDeleteTransaction(const TActorContext& ctx,
3363+
TDistributedTransaction& tx,
3364+
NKikimrPQ::TTransaction::EState state)
3365+
{
3366+
BeginDeletePartitions(tx);
3367+
ChangeTxState(tx, state);
3368+
CheckTxState(ctx, tx);
3369+
}
3370+
33623371
void TPersQueue::Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, const TActorContext& ctx)
33633372
{
33643373
if (!InitCompleted) {
@@ -3369,12 +3378,12 @@ void TPersQueue::Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, co
33693378
NKikimrPQ::TEvCancelTransactionProposal& event = ev->Get()->Record;
33703379
Y_ABORT_UNLESS(event.HasTxId());
33713380

3372-
PQ_LOG_TX_W("Handle TEvPersQueue::TEvCancelTransactionProposal for tx " << event.GetTxId());
3381+
PQ_LOG_TX_W("Handle TEvPersQueue::TEvCancelTransactionProposal for TxId " << event.GetTxId());
33733382

33743383
if (auto tx = GetTransaction(ctx, event.GetTxId()); tx) {
33753384
Y_ABORT_UNLESS(tx->State <= NKikimrPQ::TTransaction::PREPARED);
33763385

3377-
DeleteTx(*tx);
3386+
BeginDeleteTransaction(ctx, *tx, NKikimrPQ::TTransaction::CANCELED);
33783387

33793388
TryWriteTxs(ctx);
33803389
}
@@ -4400,6 +4409,11 @@ void TPersQueue::SendEvProposeTransactionResult(const TActorContext& ctx,
44004409
result->Record.SetTxId(tx.TxId);
44014410
result->Record.SetStep(tx.Step);
44024411

4412+
if (tx.Error.Defined() && tx.Error->GetKind() != NKikimrPQ::TError::OK) {
4413+
auto* error = result->Record.MutableErrors()->Add();
4414+
*error = *tx.Error;
4415+
}
4416+
44034417
PQ_LOG_TX_D("TxId: " << tx.TxId <<
44044418
" send TEvPersQueue::TEvProposeTransactionResult(" <<
44054419
NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(result->Record.GetStatus()) <<
@@ -4761,9 +4775,6 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
47614775

47624776
WriteTx(tx, NKikimrPQ::TTransaction::EXECUTED);
47634777

4764-
PQ_LOG_TX_I("delete partitions for TxId " << tx.TxId);
4765-
BeginDeletePartitions(tx);
4766-
47674778
TryChangeTxState(tx, NKikimrPQ::TTransaction::EXECUTED);
47684779
}
47694780

@@ -4785,6 +4796,9 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
47854796

47864797
SendEvReadSetAckToSenders(ctx, tx);
47874798

4799+
PQ_LOG_TX_I("delete partitions for TxId " << tx.TxId);
4800+
BeginDeletePartitions(tx);
4801+
47884802
TryChangeTxState(tx, NKikimrPQ::TTransaction::WAIT_RS_ACKS);
47894803

47904804
[[fallthrough]];
@@ -4801,7 +4815,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
48014815

48024816
break;
48034817

4804-
case NKikimrPQ::TTransaction::DELETING:
4818+
case NKikimrPQ::TTransaction::DELETING: {
48054819
// The PQ tablet has persisted its state. Now she can delete the transaction and take the next one.
48064820
TMaybe<TWriteId> writeId = tx.WriteId; // copy writeId to save for kafka transaction after erase
48074821
DeleteWriteId(writeId);
@@ -4816,6 +4830,18 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
48164830
TryContinueKafkaWrites(writeId, ctx);
48174831
break;
48184832
}
4833+
4834+
case NKikimrPQ::TTransaction::EXPIRED:
4835+
case NKikimrPQ::TTransaction::CANCELED:
4836+
PQ_LOG_TX_D("AllSupportivePartitionsHaveBeenDeleted " << AllSupportivePartitionsHaveBeenDeleted(tx.WriteId));
4837+
if (AllSupportivePartitionsHaveBeenDeleted(tx.WriteId)) {
4838+
DeleteTx(tx);
4839+
// implicitly switch to the state DELETING
4840+
}
4841+
4842+
break;
4843+
4844+
}
48194845
}
48204846

48214847
bool TPersQueue::AllSupportivePartitionsHaveBeenDeleted(const TMaybe<TWriteId>& writeId) const
@@ -5307,11 +5333,13 @@ void TPersQueue::Handle(TEvPersQueue::TEvProposeTransactionAttach::TPtr &ev, con
53075333
// the actor's ID could have changed from the moment he sent the TEvProposeTransaction. you need to
53085334
// update the actor ID in the transaction
53095335
//
5310-
// if the transaction has progressed beyond WAIT_RS, then a response has been sent to the sender
5336+
// if the transaction has progressed beyond EXECUTED, then a response has been sent to the sender
53115337
//
5338+
status = NKikimrProto::OK;
5339+
53125340
tx->SourceActor = ev->Sender;
5313-
if (tx->State <= NKikimrPQ::TTransaction::WAIT_RS) {
5314-
status = NKikimrProto::OK;
5341+
if (tx->State >= NKikimrPQ::TTransaction::EXECUTED) {
5342+
SendEvProposeTransactionResult(ctx, *tx);
53155343
}
53165344
}
53175345

@@ -5441,7 +5469,9 @@ void TPersQueue::Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorCon
54415469
}
54425470
if (writeInfo.TxId.Defined()) {
54435471
if (auto tx = GetTransaction(ctx, *writeInfo.TxId); tx) {
5444-
if (tx->State == NKikimrPQ::TTransaction::WAIT_RS_ACKS) {
5472+
if ((tx->State == NKikimrPQ::TTransaction::WAIT_RS_ACKS) ||
5473+
(tx->State == NKikimrPQ::TTransaction::EXPIRED) ||
5474+
(tx->State == NKikimrPQ::TTransaction::CANCELED)) {
54455475
TryExecuteTxs(ctx, *tx);
54465476
}
54475477
}

ydb/core/persqueue/pq_impl.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,10 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
595595

596596
void AckReadSetsToTablet(ui64 tabletId, const TActorContext& ctx);
597597

598+
void BeginDeleteTransaction(const TActorContext& ctx,
599+
TDistributedTransaction& tx,
600+
NKikimrPQ::TTransaction::EState state);
601+
598602
TIntrusivePtr<NJaegerTracing::TSamplingThrottlingControl> SamplingControl;
599603
NWilson::TSpan WriteTxsSpan;
600604

0 commit comments

Comments
 (0)