Skip to content

Commit 3f31499

Browse files
Optimizing the calculation of the transaction predicate (#28653)
2 parents 46f538d + 9d4caac commit 3f31499

File tree

11 files changed

+589
-302
lines changed

11 files changed

+589
-302
lines changed

ydb/core/persqueue/events/internal.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -861,18 +861,21 @@ struct TEvPQ {
861861
};
862862

863863
struct TEvTxCalcPredicateResult : public TEventLocal<TEvTxCalcPredicateResult, EvTxCalcPredicateResult> {
864-
TEvTxCalcPredicateResult(ui64 step, ui64 txId, const NPQ::TPartitionId& partition, TMaybe<bool> predicate) :
864+
TEvTxCalcPredicateResult(ui64 step, ui64 txId, const NPQ::TPartitionId& partition, TMaybe<bool> predicate,
865+
const TString& issueMsg) :
865866
Step(step),
866867
TxId(txId),
867868
Partition(partition),
868-
Predicate(predicate)
869+
Predicate(predicate),
870+
IssueMsg(issueMsg)
869871
{
870872
}
871873

872874
ui64 Step;
873875
ui64 TxId;
874876
NPQ::TPartitionId Partition;
875877
TMaybe<bool> Predicate;
878+
TString IssueMsg;
876879
};
877880

878881
struct TEvProposePartitionConfig : public TEventLocal<TEvProposePartitionConfig, EvProposePartitionConfig> {

ydb/core/persqueue/partition.cpp

Lines changed: 329 additions & 163 deletions
Large diffs are not rendered by default.

ydb/core/persqueue/partition.h

Lines changed: 80 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,14 @@ class TPartitionCompaction;
6363

6464
struct TTransaction {
6565

66-
explicit TTransaction(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> tx,
67-
TMaybe<bool> predicate = Nothing())
66+
TTransaction(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> tx,
67+
TInstant calcPredicateTimestamp,
68+
TMaybe<bool> predicate = Nothing())
6869
: Tx(tx)
6970
, Predicate(predicate)
7071
, SupportivePartitionActor(tx->SupportivePartitionActor)
7172
, CalcPredicateSpan(std::move(tx->Span))
73+
, CalcPredicateTimestamp(calcPredicateTimestamp)
7274
{
7375
Y_ABORT_UNLESS(Tx);
7476
}
@@ -128,6 +130,7 @@ struct TTransaction {
128130
NWilson::TSpan CommitSpan;
129131

130132
TInstant WriteInfoResponseTimestamp;
133+
TInstant CalcPredicateTimestamp;
131134
};
132135
class TPartitionCompaction;
133136

@@ -333,11 +336,31 @@ class TPartition : public TActorBootstrapped<TPartition> {
333336
TAutoPtr<TEvPersQueue::TEvHasDataInfoResponse> MakeHasDataInfoResponse(ui64 lagSize, const TMaybe<ui64>& cookie, bool readingFinished = false);
334337

335338
void ProcessTxsAndUserActs(const TActorContext& ctx);
336-
void ContinueProcessTxsAndUserActs(const TActorContext& ctx);
337-
void ProcessCommitQueue();
338339
void RunPersist();
339340

340-
void MoveUserActOrTxToCommitState();
341+
enum class EProcessResult;
342+
struct TAffectedSourceIdsAndConsumers;
343+
344+
void ProcessUserActionAndTxEvents();
345+
EProcessResult ProcessUserActionAndTxEvent(TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>& event,
346+
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
347+
EProcessResult ProcessUserActionAndTxEvent(TSimpleSharedPtr<TTransaction>& tx,
348+
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
349+
EProcessResult ProcessUserActionAndTxEvent(TMessage& msg,
350+
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
351+
352+
void MoveUserActionAndTxToPendingCommitQueue();
353+
354+
void ProcessUserActionAndTxPendingCommits();
355+
void ProcessUserActionAndTxPendingCommit(TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>& event,
356+
TEvKeyValue::TEvRequest* request);
357+
void ProcessUserActionAndTxPendingCommit(TSimpleSharedPtr<TTransaction>& tx,
358+
TEvKeyValue::TEvRequest* request);
359+
void ProcessUserActionAndTxPendingCommit(TMessage& msg,
360+
TEvKeyValue::TEvRequest* request);
361+
362+
bool WritingCycleDoesNotExceedTheLimits() const;
363+
341364
void PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> event);
342365
void PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event);
343366
void PushFrontDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event);
@@ -419,14 +442,13 @@ class TPartition : public TActorBootstrapped<TPartition> {
419442
const TString& reason);
420443
THolder<TEvPQ::TEvTxCommitDone> MakeCommitDone(ui64 step, ui64 txId);
421444

422-
bool BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event);
445+
bool BeginTransactionConfig();
423446

424447
void CommitTransaction(TSimpleSharedPtr<TTransaction>& t);
425448
void RollbackTransaction(TSimpleSharedPtr<TTransaction>& t);
426449

427450

428-
void BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config,
429-
const TActorContext& ctx);
451+
void BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config);
430452
void ExecChangePartitionConfig();
431453

432454
void OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx);
@@ -717,10 +739,10 @@ class TPartition : public TActorBootstrapped<TPartition> {
717739
TMaybe<i16> KafkaProducerEpoch = 0;
718740
};
719741

720-
THashSet<TString> TxAffectedSourcesIds;
721-
THashSet<TString> WriteAffectedSourcesIds;
722-
THashSet<TString> TxAffectedConsumers;
723-
THashSet<TString> SetOffsetAffectedConsumers;
742+
THashMap<TString, size_t> TxAffectedSourcesIds;
743+
THashMap<TString, size_t> WriteAffectedSourcesIds;
744+
THashMap<TString, size_t> TxAffectedConsumers;
745+
THashMap<TString, size_t> SetOffsetAffectedConsumers;
724746
THashMap<TString, TSourceIdPostPersistInfo> TxSourceIdForPostPersist;
725747
THashMap<TString, TSeqNoProducerEpoch> TxInflightMaxSeqNoPerSourceId;
726748

@@ -775,44 +797,61 @@ class TPartition : public TActorBootstrapped<TPartition> {
775797

776798
TMaybe<TUsersInfoStorage> UsersInfoStorage;
777799

778-
// template <class T> T& GetUserActionAndTransactionEventsFront();
779-
// template <class T> T& GetCurrentEvent();
780-
//TSimpleSharedPtr<TTransaction>& GetCurrentTransaction();
800+
struct TAffectedSourceIdsAndConsumers {
801+
TVector<TString> TxWriteSourcesIds;
802+
TVector<TString> WriteSourcesIds;
803+
TVector<TString> TxReadConsumers;
804+
TVector<TString> ReadConsumers;
805+
ui32 WriteKeysSize = 0;
806+
ui32 WriteCycleSize = 0;
807+
};
808+
809+
void AppendAffectedSourceIdsAndConsumers(const TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
781810

782-
EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>& event);
783-
EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr<TEvPersQueue::TEvProposeTransaction>& event);
784-
EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr<TTransaction>& tx);
785-
EProcessResult PreProcessUserActionOrTransaction(TMessage& msg);
811+
void DeleteAffectedSourceIdsAndConsumers();
812+
void DeleteAffectedSourceIdsAndConsumers(const TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
813+
void DeleteFromSet(const TVector<TString>& p, THashMap<TString, size_t>& q) const;
786814

787-
bool ExecUserActionOrTransaction(TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>& event, TEvKeyValue::TEvRequest* request);
815+
EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>& event,
816+
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
817+
EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr<TTransaction>& tx,
818+
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
819+
EProcessResult PreProcessUserActionOrTransaction(TMessage& msg,
820+
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
788821

789-
bool ExecUserActionOrTransaction(TSimpleSharedPtr<TEvPersQueue::TEvProposeTransaction>& event,
790-
TEvKeyValue::TEvRequest* request);
822+
bool ExecUserActionOrTransaction(TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>& events, TEvKeyValue::TEvRequest* request);
791823
bool ExecUserActionOrTransaction(TSimpleSharedPtr<TTransaction>& tx, TEvKeyValue::TEvRequest* request);
792824
bool ExecUserActionOrTransaction(TMessage& msg, TEvKeyValue::TEvRequest* request);
793825

794-
[[nodiscard]] EProcessResult PreProcessUserAct(TEvPQ::TEvSetClientInfo& act, const TActorContext& ctx);
826+
[[nodiscard]] EProcessResult PreProcessUserAct(TEvPQ::TEvSetClientInfo& act,
827+
TAffectedSourceIdsAndConsumers* affectedSourceIdsAndConsumers);
795828
void CommitUserAct(TEvPQ::TEvSetClientInfo& act);
796829

797830

798-
[[nodiscard]] EProcessResult PreProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx);
831+
[[nodiscard]] EProcessResult PreProcessImmediateTx(TTransaction& t,
832+
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
799833
void ExecImmediateTx(TTransaction& tx);
800834

801-
EProcessResult PreProcessRequest(TRegisterMessageGroupMsg& msg);
802-
EProcessResult PreProcessRequest(TDeregisterMessageGroupMsg& msg);
803-
EProcessResult PreProcessRequest(TSplitMessageGroupMsg& msg);
804-
EProcessResult PreProcessRequest(TWriteMsg& msg);
835+
EProcessResult PreProcessRequest(TRegisterMessageGroupMsg& msg,
836+
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
837+
EProcessResult PreProcessRequest(TDeregisterMessageGroupMsg& msg,
838+
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
839+
EProcessResult PreProcessRequest(TSplitMessageGroupMsg& msg,
840+
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
841+
EProcessResult PreProcessRequest(TWriteMsg& msg,
842+
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
805843

806844
void ExecRequest(TRegisterMessageGroupMsg& msg, ProcessParameters& parameters);
807845
void ExecRequest(TDeregisterMessageGroupMsg& msg, ProcessParameters& parameters);
808846
void ExecRequest(TSplitMessageGroupMsg& msg, ProcessParameters& parameters);
809847
bool ExecRequest(TWriteMsg& msg, ProcessParameters& parameters, TEvKeyValue::TEvRequest* request);
810848

811-
[[nodiscard]] EProcessResult BeginTransaction(const TEvPQ::TEvTxCalcPredicate& event, TMaybe<bool>& predicate);
849+
[[nodiscard]] EProcessResult BeginTransactionData(TTransaction& t,
850+
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
812851

813-
EProcessResult ApplyWriteInfoResponse(TTransaction& tx);
852+
EProcessResult ApplyWriteInfoResponse(TTransaction& tx,
853+
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
814854

815-
bool FirstEvent = true;
816855
bool HaveWriteMsg = false;
817856
bool HaveData = false;
818857
bool HaveCheckDisk = false;
@@ -833,13 +872,12 @@ class TPartition : public TActorBootstrapped<TPartition> {
833872
void BeginAppendHeadWithNewWrites(const TActorContext& ctx);
834873
void EndAppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx);
835874

875+
bool HasPendingCommitsOrPendingWrites() const;
876+
836877
//
837878
// user actions and transactions
838879
//
839880
struct TUserActionAndTransactionEvent {
840-
std::variant<TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>, // user actions
841-
TSimpleSharedPtr<TTransaction>, // distributed transaction or update config
842-
TMessage> Event;
843881
TUserActionAndTransactionEvent(TSimpleSharedPtr<TTransaction>&& transaction)
844882
: Event(std::move(transaction))
845883
{}
@@ -849,10 +887,16 @@ class TPartition : public TActorBootstrapped<TPartition> {
849887
TUserActionAndTransactionEvent(TMessage&& message)
850888
: Event(std::move(message))
851889
{}
890+
891+
std::variant<TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>, // user actions
892+
TSimpleSharedPtr<TTransaction>, // distributed transaction or update config
893+
TMessage> Event;
894+
TAffectedSourceIdsAndConsumers AffectedSourceIdsAndConsumers;
852895
};
853896

854897
std::deque<TUserActionAndTransactionEvent> UserActionAndTransactionEvents;
855898
std::deque<TUserActionAndTransactionEvent> UserActionAndTxPendingCommit;
899+
std::deque<TUserActionAndTransactionEvent> UserActionAndTxPendingWrite;
856900
TVector<THolder<TEvPQ::TEvGetWriteInfoResponse>> WriteInfosApplied;
857901

858902
THashMap<ui64, TSimpleSharedPtr<TTransaction>> TransactionsInflight;
@@ -875,15 +919,6 @@ class TPartition : public TActorBootstrapped<TPartition> {
875919
TMessageQueue Responses;
876920
ui64 CurrentBatchSize = 0;
877921

878-
enum class ETxBatchingState{
879-
PreProcessing,
880-
Executing,
881-
Finishing
882-
};
883-
ETxBatchingState BatchingState = ETxBatchingState::PreProcessing;
884-
//
885-
//
886-
//
887922
std::deque<std::pair<TString, ui64>> UpdateUserInfoTimestamp;
888923
bool ReadingTimestamp;
889924
TString ReadingForUser;
@@ -1067,15 +1102,15 @@ class TPartition : public TActorBootstrapped<TPartition> {
10671102
size_t WriteNewSizeFromSupportivePartitions = 0;
10681103

10691104
bool TryAddDeleteHeadKeysToPersistRequest();
1070-
void DumpKeyValueRequest(const NKikimrClient::TKeyValueRequest& request);
1105+
void DumpKeyValueRequest(const NKikimrClient::TKeyValueRequest& request) const;
10711106

10721107
TBlobKeyTokenPtr MakeBlobKeyToken(const TString& key);
10731108

1109+
void DumpTheSizeOfInternalQueues() const;
10741110

10751111
TIntrusivePtr<NJaegerTracing::TSamplingThrottlingControl> SamplingControl;
10761112
TDeque<NWilson::TTraceId> TxForPersistTraceIds;
10771113
TDeque<NWilson::TSpan> TxForPersistSpans;
1078-
bool CanProcessUserActionAndTransactionEvents() const;
10791114
};
10801115

10811116
} // namespace NKikimr::NPQ

ydb/core/persqueue/partition_write.cpp

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -568,10 +568,6 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
568568
TxSourceIdForPostPersist.clear();
569569
TxInflightMaxSeqNoPerSourceId.clear();
570570

571-
TxAffectedSourcesIds.clear();
572-
WriteAffectedSourcesIds.clear();
573-
TxAffectedConsumers.clear();
574-
SetOffsetAffectedConsumers.clear();
575571
if (UserActionAndTransactionEvents.empty()) {
576572
WriteInfosToTx.clear();
577573
}
@@ -981,7 +977,9 @@ void TPartition::CancelOneWriteOnWrite(const TActorContext& ctx,
981977
StartProcessChangeOwnerRequests(ctx);
982978
}
983979

984-
TPartition::EProcessResult TPartition::PreProcessRequest(TRegisterMessageGroupMsg& msg) {
980+
TPartition::EProcessResult TPartition::PreProcessRequest(TRegisterMessageGroupMsg& msg,
981+
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers)
982+
{
985983
if (!CanWrite()) {
986984
ScheduleReplyError(msg.Cookie, false, InactivePartitionErrorCode,
987985
TStringBuilder() << "Write to inactive partition " << Partition.OriginalPartitionId);
@@ -996,7 +994,7 @@ TPartition::EProcessResult TPartition::PreProcessRequest(TRegisterMessageGroupMs
996994
if (TxAffectedSourcesIds.contains(msg.Body.SourceId)) {
997995
return EProcessResult::Blocked;
998996
}
999-
WriteAffectedSourcesIds.insert(msg.Body.SourceId);
997+
affectedSourceIdsAndConsumers.WriteSourcesIds.push_back(msg.Body.SourceId);
1000998
return EProcessResult::Continue;
1001999
}
10021000

@@ -1013,7 +1011,9 @@ void TPartition::ExecRequest(TRegisterMessageGroupMsg& msg, ProcessParameters& p
10131011
parameters.SourceIdBatch.RegisterSourceId(body.SourceId, body.SeqNo, parameters.CurOffset, CurrentTimestamp, std::move(keyRange));
10141012
}
10151013

1016-
TPartition::EProcessResult TPartition::PreProcessRequest(TDeregisterMessageGroupMsg& msg) {
1014+
TPartition::EProcessResult TPartition::PreProcessRequest(TDeregisterMessageGroupMsg& msg,
1015+
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers)
1016+
{
10171017
if (!CanWrite()) {
10181018
ScheduleReplyError(msg.Cookie, false, InactivePartitionErrorCode,
10191019
TStringBuilder() << "Write to inactive partition " << Partition.OriginalPartitionId);
@@ -1027,7 +1027,7 @@ TPartition::EProcessResult TPartition::PreProcessRequest(TDeregisterMessageGroup
10271027
if (TxAffectedSourcesIds.contains(msg.Body.SourceId)) {
10281028
return EProcessResult::Blocked;
10291029
}
1030-
WriteAffectedSourcesIds.insert(msg.Body.SourceId);
1030+
affectedSourceIdsAndConsumers.WriteSourcesIds.push_back(msg.Body.SourceId);
10311031
return EProcessResult::Continue;
10321032
}
10331033

@@ -1036,7 +1036,9 @@ void TPartition::ExecRequest(TDeregisterMessageGroupMsg& msg, ProcessParameters&
10361036
}
10371037

10381038

1039-
TPartition::EProcessResult TPartition::PreProcessRequest(TSplitMessageGroupMsg& msg) {
1039+
TPartition::EProcessResult TPartition::PreProcessRequest(TSplitMessageGroupMsg& msg,
1040+
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers)
1041+
{
10401042
if (!CanWrite()) {
10411043
ScheduleReplyError(msg.Cookie, false, InactivePartitionErrorCode,
10421044
TStringBuilder() << "Write to inactive partition " << Partition.OriginalPartitionId);
@@ -1052,16 +1054,16 @@ TPartition::EProcessResult TPartition::PreProcessRequest(TSplitMessageGroupMsg&
10521054
if (TxAffectedSourcesIds.contains(body.SourceId)) {
10531055
return EProcessResult::Blocked;
10541056
}
1055-
WriteAffectedSourcesIds.insert(body.SourceId);
1057+
affectedSourceIdsAndConsumers.WriteSourcesIds.push_back(body.SourceId);
10561058
}
10571059
for (auto& body : msg.Deregistrations) {
10581060
if (TxAffectedSourcesIds.contains(body.SourceId)) {
10591061
return EProcessResult::Blocked;
10601062
}
1061-
WriteAffectedSourcesIds.insert(body.SourceId);
1063+
affectedSourceIdsAndConsumers.WriteSourcesIds.push_back(body.SourceId);
10621064
}
1063-
return EProcessResult::Continue;
10641065

1066+
return EProcessResult::Continue;
10651067
}
10661068

10671069

@@ -1081,7 +1083,9 @@ void TPartition::ExecRequest(TSplitMessageGroupMsg& msg, ProcessParameters& para
10811083
}
10821084
}
10831085

1084-
TPartition::EProcessResult TPartition::PreProcessRequest(TWriteMsg& p) {
1086+
TPartition::EProcessResult TPartition::PreProcessRequest(TWriteMsg& p,
1087+
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers)
1088+
{
10851089
if (!CanWrite()) {
10861090
WriteInflightSize -= p.Msg.Data.size();
10871091
ScheduleReplyError(p.Cookie, false, InactivePartitionErrorCode,
@@ -1106,7 +1110,8 @@ TPartition::EProcessResult TPartition::PreProcessRequest(TWriteMsg& p) {
11061110
return EProcessResult::Blocked;
11071111
}
11081112
}
1109-
WriteAffectedSourcesIds.insert(p.Msg.SourceId);
1113+
affectedSourceIdsAndConsumers.WriteSourcesIds.push_back(p.Msg.SourceId);
1114+
11101115
return EProcessResult::Continue;
11111116
}
11121117

0 commit comments

Comments
 (0)