Skip to content

Commit e76ed65

Browse files
alexnick88Alek5andr-Kotov
authored andcommitted
Changes for Ya.Metrika (#28381)
1 parent 275d5e6 commit e76ed65

File tree

10 files changed

+122
-39
lines changed

10 files changed

+122
-39
lines changed

ydb/core/persqueue/events/internal.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -861,18 +861,21 @@ struct TEvPQ {
861861
};
862862

863863
struct TEvTxCalcPredicateResult : public TEventLocal<TEvTxCalcPredicateResult, EvTxCalcPredicateResult> {
864-
TEvTxCalcPredicateResult(ui64 step, ui64 txId, const NPQ::TPartitionId& partition, TMaybe<bool> predicate) :
864+
TEvTxCalcPredicateResult(ui64 step, ui64 txId, const NPQ::TPartitionId& partition, TMaybe<bool> predicate,
865+
const TString& issueMsg) :
865866
Step(step),
866867
TxId(txId),
867868
Partition(partition),
868-
Predicate(predicate)
869+
Predicate(predicate),
870+
IssueMsg(issueMsg)
869871
{
870872
}
871873

872874
ui64 Step;
873875
ui64 TxId;
874876
NPQ::TPartitionId Partition;
875877
TMaybe<bool> Predicate;
878+
TString IssueMsg;
876879
};
877880

878881
struct TEvProposePartitionConfig : public TEventLocal<TEvProposePartitionConfig, EvProposePartitionConfig> {

ydb/core/persqueue/partition.cpp

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,24 @@ static const ui32 MAX_KEYS = 10000;
125125
static const ui32 MAX_TXS = 1000;
126126
static const ui32 MAX_WRITE_CYCLE_SIZE = 16_MB;
127127

128+
TStringBuilder MakeTxWriteErrorMessage(TMaybe<ui64> txId,
129+
TStringBuf topicName, const TPartitionId& partitionId,
130+
TStringBuf sourceId, ui64 seqNo)
131+
{
132+
TStringBuilder ss;
133+
ss << "[TxId: " << txId << ", Topic: '" << topicName << "', Partition " << partitionId << ", SourceId '" << EscapeC(sourceId) << "', SeqNo " << seqNo << "] ";
134+
return ss;
135+
}
136+
137+
TStringBuilder MakeTxReadErrorMessage(TMaybe<ui64> txId,
138+
TStringBuf topicName, const TPartitionId& partitionId,
139+
TStringBuf consumer)
140+
{
141+
TStringBuilder ss;
142+
ss << "[TxId: " << txId << ", Topic: '" << topicName << "', Partition " << partitionId << ", Consumer '" << consumer << "] ";
143+
return ss;
144+
}
145+
128146
auto GetStepAndTxId(ui64 step, ui64 txId)
129147
{
130148
return std::make_pair(step, txId);
@@ -1205,7 +1223,8 @@ void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvTxCalcPredicate>
12051223
MakeHolder<TEvPQ::TEvTxCalcPredicateResult>(ev->Step,
12061224
ev->TxId,
12071225
Partition,
1208-
Nothing()).Release());
1226+
Nothing(),
1227+
TString()).Release());
12091228
return;
12101229
}
12111230
}
@@ -1451,7 +1470,9 @@ TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx)
14511470
if (auto inFlightIter = TxInflightMaxSeqNoPerSourceId.find(s.first); !inFlightIter.IsEnd()) {
14521471
if (SeqnoViolation(inFlightIter->second.KafkaProducerEpoch, inFlightIter->second.SeqNo, s.second.ProducerEpoch, s.second.MinSeqNo)) {
14531472
tx.Predicate = false;
1454-
tx.Message = TStringBuilder() << "MinSeqNo violation failure on " << s.first;
1473+
tx.Message = (MakeTxWriteErrorMessage(tx.GetTxId(), TopicName(), Partition, s.first, inFlightIter->second.SeqNo) <<
1474+
"MinSeqNo violation failure. " <<
1475+
"SeqNo " << s.second.MinSeqNo);
14551476
tx.WriteInfoApplied = true;
14561477
break;
14571478
}
@@ -1460,7 +1481,9 @@ TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx)
14601481
if (auto existing = knownSourceIds.find(s.first); !existing.IsEnd()) {
14611482
if (SeqnoViolation(existing->second.ProducerEpoch, existing->second.SeqNo, s.second.ProducerEpoch, s.second.MinSeqNo)) {
14621483
tx.Predicate = false;
1463-
tx.Message = TStringBuilder() << "MinSeqNo violation failure on " << s.first;
1484+
tx.Message = (MakeTxWriteErrorMessage(tx.GetTxId(), TopicName(), Partition, s.first, existing->second.SeqNo) <<
1485+
"MinSeqNo violation failure. " <<
1486+
"SeqNo " << s.second.MinSeqNo);
14641487
tx.WriteInfoApplied = true;
14651488
break;
14661489
}
@@ -1515,6 +1538,8 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorConte
15151538
void TPartition::ReplyToProposeOrPredicate(TSimpleSharedPtr<TTransaction>& tx, bool isPredicate) {
15161539

15171540
if (isPredicate) {
1541+
TabletCounters.Percentile()[COUNTER_LATENCY_PQ_TXCALCPREDICATE].IncrementFor((Now() - tx->CalcPredicateTimestamp).MilliSeconds());
1542+
15181543
tx->CalcPredicateSpan.End();
15191544
tx->CalcPredicateSpan = {};
15201545

@@ -1531,7 +1556,8 @@ void TPartition::ReplyToProposeOrPredicate(TSimpleSharedPtr<TTransaction>& tx, b
15311556
MakeHolder<TEvPQ::TEvTxCalcPredicateResult>(tx->Tx->Step,
15321557
tx->Tx->TxId,
15331558
Partition,
1534-
*tx->Predicate).Release());
1559+
*tx->Predicate,
1560+
tx->Message).Release());
15351561
} else {
15361562
auto insRes = TransactionsInflight.emplace(tx->ProposeConfig->TxId, tx);
15371563
Y_ABORT_UNLESS(insRes.second);
@@ -2026,7 +2052,7 @@ void TPartition::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext&
20262052

20272053
void TPartition::PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> event)
20282054
{
2029-
UserActionAndTransactionEvents.emplace_back(MakeSimpleShared<TTransaction>(std::move(event)));
2055+
UserActionAndTransactionEvents.emplace_back(MakeSimpleShared<TTransaction>(std::move(event), Now()));
20302056
RequestWriteInfoIfRequired();
20312057
}
20322058

@@ -2457,7 +2483,7 @@ TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimple
24572483
ReplyToProposeOrPredicate(t, true);
24582484
return EProcessResult::Continue;
24592485
}
2460-
result = BeginTransaction(*t->Tx, t->Predicate);
2486+
result = BeginTransaction(*t->Tx, t->Predicate, t->Message);
24612487
if (t->Predicate.Defined()) {
24622488
ReplyToProposeOrPredicate(t, true);
24632489
}
@@ -2525,7 +2551,8 @@ bool TPartition::ExecUserActionOrTransaction(TSimpleSharedPtr<TTransaction>& t,
25252551
return true;
25262552
}
25272553

2528-
TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx, TMaybe<bool>& predicateOut)
2554+
TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx,
2555+
TMaybe<bool>& predicateOut, TString& issueMsg)
25292556
{
25302557
if (tx.ForcePredicateFalse) {
25312558
predicateOut = false;
@@ -2548,13 +2575,17 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr
25482575
if (AffectedUsers.contains(consumer) && !GetPendingUserIfExists(consumer)) {
25492576
PQ_LOG_D("Partition " << Partition <<
25502577
" Consumer '" << consumer << "' has been removed");
2578+
issueMsg = (MakeTxReadErrorMessage(tx.TxId, TopicName(), Partition, consumer) <<
2579+
"Consumer has been removed");
25512580
result = false;
25522581
break;
25532582
}
25542583

25552584
if (!UsersInfoStorage->GetIfExists(consumer)) {
25562585
PQ_LOG_D("Partition " << Partition <<
25572586
" Unknown consumer '" << consumer << "'");
2587+
issueMsg = (MakeTxReadErrorMessage(tx.TxId, TopicName(), Partition, consumer) <<
2588+
"Unknown consumer");
25582589
result = false;
25592590
break;
25602591
}
@@ -2583,20 +2614,32 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr
25832614
" Bad request (invalid range) " <<
25842615
" Begin " << operation.GetCommitOffsetsBegin() <<
25852616
" End " << operation.GetCommitOffsetsEnd());
2617+
issueMsg = (MakeTxReadErrorMessage(tx.TxId, TopicName(), Partition, consumer) <<
2618+
"Invalid range. " <<
2619+
"Range begin " << operation.GetCommitOffsetsBegin() <<
2620+
", range end " << operation.GetCommitOffsetsEnd());
25862621
result = false;
25872622
} else if (!operation.GetForceCommit() && userInfo.Offset != (i64)operation.GetCommitOffsetsBegin()) {
25882623
PQ_LOG_D("Partition " << Partition <<
25892624
" Consumer '" << consumer << "'" <<
25902625
" Bad request (gap) " <<
25912626
" Offset " << userInfo.Offset <<
25922627
" Begin " << operation.GetCommitOffsetsBegin());
2628+
issueMsg = (MakeTxReadErrorMessage(tx.TxId, TopicName(), Partition, consumer) <<
2629+
"Gap. " <<
2630+
"Offset " << userInfo.Offset <<
2631+
", range begin " << operation.GetCommitOffsetsBegin());
25932632
result = false;
25942633
} else if (!operation.GetForceCommit() && operation.GetCommitOffsetsEnd() > EndOffset) {
25952634
PQ_LOG_D("Partition " << Partition <<
25962635
" Consumer '" << consumer << "'" <<
25972636
" Bad request (behind the last offset) " <<
25982637
" EndOffset " << EndOffset <<
25992638
" End " << operation.GetCommitOffsetsEnd());
2639+
issueMsg = (MakeTxReadErrorMessage(tx.TxId, TopicName(), Partition, consumer) <<
2640+
"Behind the last offset. " <<
2641+
"Partition end offset " << EndOffset <<
2642+
", range end " << operation.GetCommitOffsetsEnd());
26002643
result = false;
26012644
}
26022645

ydb/core/persqueue/partition.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,14 @@ class TPartitionCompaction;
6363

6464
struct TTransaction {
6565

66-
explicit TTransaction(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> tx,
67-
TMaybe<bool> predicate = Nothing())
66+
TTransaction(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> tx,
67+
TInstant calcPredicateTimestamp,
68+
TMaybe<bool> predicate = Nothing())
6869
: Tx(tx)
6970
, Predicate(predicate)
7071
, SupportivePartitionActor(tx->SupportivePartitionActor)
7172
, CalcPredicateSpan(std::move(tx->Span))
73+
, CalcPredicateTimestamp(calcPredicateTimestamp)
7274
{
7375
Y_ABORT_UNLESS(Tx);
7476
}
@@ -128,6 +130,7 @@ struct TTransaction {
128130
NWilson::TSpan CommitSpan;
129131

130132
TInstant WriteInfoResponseTimestamp;
133+
TInstant CalcPredicateTimestamp;
131134
};
132135
class TPartitionCompaction;
133136

@@ -808,7 +811,8 @@ class TPartition : public TActorBootstrapped<TPartition> {
808811
void ExecRequest(TSplitMessageGroupMsg& msg, ProcessParameters& parameters);
809812
bool ExecRequest(TWriteMsg& msg, ProcessParameters& parameters, TEvKeyValue::TEvRequest* request);
810813

811-
[[nodiscard]] EProcessResult BeginTransaction(const TEvPQ::TEvTxCalcPredicate& event, TMaybe<bool>& predicate);
814+
[[nodiscard]] EProcessResult BeginTransaction(const TEvPQ::TEvTxCalcPredicate& event,
815+
TMaybe<bool>& predicate, TString& issueMsg);
812816

813817
EProcessResult ApplyWriteInfoResponse(TTransaction& tx);
814818

ydb/core/persqueue/pq_impl.cpp

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3304,7 +3304,7 @@ void TPersQueue::DeleteExpiredTransactions(const TActorContext& ctx)
33043304

33053305
for (auto& [txId, tx] : Txs) {
33063306
if ((tx.MaxStep < step) && (tx.State <= NKikimrPQ::TTransaction::PREPARED)) {
3307-
DeleteTx(tx);
3307+
BeginDeleteTransaction(ctx, tx, NKikimrPQ::TTransaction::EXPIRED);
33083308
}
33093309
}
33103310

@@ -3366,6 +3366,15 @@ void TPersQueue::SetTxInFlyCounter()
33663366
Counters->Simple()[COUNTER_PQ_TABLET_TX_IN_FLY] = Txs.size();
33673367
}
33683368

3369+
void TPersQueue::BeginDeleteTransaction(const TActorContext& ctx,
3370+
TDistributedTransaction& tx,
3371+
NKikimrPQ::TTransaction::EState state)
3372+
{
3373+
BeginDeletePartitions(tx);
3374+
ChangeTxState(tx, state);
3375+
CheckTxState(ctx, tx);
3376+
}
3377+
33693378
void TPersQueue::Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, const TActorContext& ctx)
33703379
{
33713380
if (!InitCompleted) {
@@ -3376,12 +3385,12 @@ void TPersQueue::Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, co
33763385
NKikimrPQ::TEvCancelTransactionProposal& event = ev->Get()->Record;
33773386
Y_ABORT_UNLESS(event.HasTxId());
33783387

3379-
PQ_LOG_TX_W("Handle TEvPersQueue::TEvCancelTransactionProposal for tx " << event.GetTxId());
3388+
PQ_LOG_TX_W("Handle TEvPersQueue::TEvCancelTransactionProposal for TxId " << event.GetTxId());
33803389

33813390
if (auto tx = GetTransaction(ctx, event.GetTxId()); tx) {
33823391
Y_ABORT_UNLESS(tx->State <= NKikimrPQ::TTransaction::PREPARED);
33833392

3384-
DeleteTx(*tx);
3393+
BeginDeleteTransaction(ctx, *tx, NKikimrPQ::TTransaction::CANCELED);
33853394

33863395
TryWriteTxs(ctx);
33873396
}
@@ -4407,6 +4416,11 @@ void TPersQueue::SendEvProposeTransactionResult(const TActorContext& ctx,
44074416
result->Record.SetTxId(tx.TxId);
44084417
result->Record.SetStep(tx.Step);
44094418

4419+
if (tx.Error.Defined() && tx.Error->GetKind() != NKikimrPQ::TError::OK) {
4420+
auto* error = result->Record.MutableErrors()->Add();
4421+
*error = *tx.Error;
4422+
}
4423+
44104424
PQ_LOG_TX_D("TxId: " << tx.TxId <<
44114425
" send TEvPersQueue::TEvProposeTransactionResult(" <<
44124426
NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(result->Record.GetStatus()) <<
@@ -4768,9 +4782,6 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
47684782

47694783
WriteTx(tx, NKikimrPQ::TTransaction::EXECUTED);
47704784

4771-
PQ_LOG_TX_I("delete partitions for TxId " << tx.TxId);
4772-
BeginDeletePartitions(tx);
4773-
47744785
TryChangeTxState(tx, NKikimrPQ::TTransaction::EXECUTED);
47754786
}
47764787

@@ -4792,6 +4803,9 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
47924803

47934804
SendEvReadSetAckToSenders(ctx, tx);
47944805

4806+
PQ_LOG_TX_I("delete partitions for TxId " << tx.TxId);
4807+
BeginDeletePartitions(tx);
4808+
47954809
TryChangeTxState(tx, NKikimrPQ::TTransaction::WAIT_RS_ACKS);
47964810

47974811
[[fallthrough]];
@@ -4808,7 +4822,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
48084822

48094823
break;
48104824

4811-
case NKikimrPQ::TTransaction::DELETING:
4825+
case NKikimrPQ::TTransaction::DELETING: {
48124826
// The PQ tablet has persisted its state. Now she can delete the transaction and take the next one.
48134827
TMaybe<TWriteId> writeId = tx.WriteId; // copy writeId to save for kafka transaction after erase
48144828
DeleteWriteId(writeId);
@@ -4823,6 +4837,18 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
48234837
TryContinueKafkaWrites(writeId, ctx);
48244838
break;
48254839
}
4840+
4841+
case NKikimrPQ::TTransaction::EXPIRED:
4842+
case NKikimrPQ::TTransaction::CANCELED:
4843+
PQ_LOG_TX_D("AllSupportivePartitionsHaveBeenDeleted " << AllSupportivePartitionsHaveBeenDeleted(tx.WriteId));
4844+
if (AllSupportivePartitionsHaveBeenDeleted(tx.WriteId)) {
4845+
DeleteTx(tx);
4846+
// implicitly switch to the state DELETING
4847+
}
4848+
4849+
break;
4850+
4851+
}
48264852
}
48274853

48284854
bool TPersQueue::AllSupportivePartitionsHaveBeenDeleted(const TMaybe<TWriteId>& writeId) const
@@ -5314,11 +5340,13 @@ void TPersQueue::Handle(TEvPersQueue::TEvProposeTransactionAttach::TPtr &ev, con
53145340
// the actor's ID could have changed from the moment he sent the TEvProposeTransaction. you need to
53155341
// update the actor ID in the transaction
53165342
//
5317-
// if the transaction has progressed beyond WAIT_RS, then a response has been sent to the sender
5343+
// if the transaction has progressed beyond EXECUTED, then a response has been sent to the sender
53185344
//
5345+
status = NKikimrProto::OK;
5346+
53195347
tx->SourceActor = ev->Sender;
5320-
if (tx->State <= NKikimrPQ::TTransaction::WAIT_RS) {
5321-
status = NKikimrProto::OK;
5348+
if (tx->State >= NKikimrPQ::TTransaction::EXECUTED) {
5349+
SendEvProposeTransactionResult(ctx, *tx);
53225350
}
53235351
}
53245352

@@ -5448,7 +5476,9 @@ void TPersQueue::Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorCon
54485476
}
54495477
if (writeInfo.TxId.Defined()) {
54505478
if (auto tx = GetTransaction(ctx, *writeInfo.TxId); tx) {
5451-
if (tx->State == NKikimrPQ::TTransaction::WAIT_RS_ACKS) {
5479+
if ((tx->State == NKikimrPQ::TTransaction::WAIT_RS_ACKS) ||
5480+
(tx->State == NKikimrPQ::TTransaction::EXPIRED) ||
5481+
(tx->State == NKikimrPQ::TTransaction::CANCELED)) {
54525482
TryExecuteTxs(ctx, *tx);
54535483
}
54545484
}

ydb/core/persqueue/pq_impl.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,10 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
595595

596596
void AckReadSetsToTablet(ui64 tabletId, const TActorContext& ctx);
597597

598+
void BeginDeleteTransaction(const TActorContext& ctx,
599+
TDistributedTransaction& tx,
600+
NKikimrPQ::TTransaction::EState state);
601+
598602
TIntrusivePtr<NJaegerTracing::TSamplingThrottlingControl> SamplingControl;
599603
NWilson::TSpan WriteTxsSpan;
600604

ydb/core/persqueue/transaction.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,14 @@ void TDistributedTransaction::OnTxCalcPredicateResult(const TEvPQ::TEvTxCalcPred
248248
}
249249

250250
OnPartitionResult(event, decision);
251+
252+
if (!event.IssueMsg.empty()) {
253+
NKikimrPQ::TError error;
254+
error.SetKind(NKikimrPQ::TError::BAD_REQUEST);
255+
error.SetReason(event.IssueMsg);
256+
257+
Error = std::move(error);
258+
}
251259
}
252260

253261
void UpdatePartitionsData(NKikimrPQ::TPartitions& partitionsData, NKikimrPQ::TPartitions::TPartitionInfo& partition) {

ydb/core/persqueue/transaction.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ struct TDistributedTransaction {
109109

110110
bool Pending = false;
111111

112+
TMaybe<NKikimrPQ::TError> Error;
113+
112114
void SetExecuteSpan(NWilson::TSpan&& span);
113115
void EndExecuteSpan();
114116
NWilson::TSpan CreatePlanStepSpan(ui64 tabletId, ui64 step);

0 commit comments

Comments
 (0)