Skip to content

Commit 5e06e8c

Browse files
Optimizing the calculation of the transaction predicate (#28623)
2 parents 84c33f6 + 714b031 commit 5e06e8c

File tree

12 files changed

+643
-349
lines changed

12 files changed

+643
-349
lines changed

ydb/core/persqueue/events/internal.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -861,18 +861,21 @@ struct TEvPQ {
861861
};
862862

863863
struct TEvTxCalcPredicateResult : public TEventLocal<TEvTxCalcPredicateResult, EvTxCalcPredicateResult> {
864-
TEvTxCalcPredicateResult(ui64 step, ui64 txId, const NPQ::TPartitionId& partition, TMaybe<bool> predicate) :
864+
TEvTxCalcPredicateResult(ui64 step, ui64 txId, const NPQ::TPartitionId& partition, TMaybe<bool> predicate,
865+
const TString& issueMsg) :
865866
Step(step),
866867
TxId(txId),
867868
Partition(partition),
868-
Predicate(predicate)
869+
Predicate(predicate),
870+
IssueMsg(issueMsg)
869871
{
870872
}
871873

872874
ui64 Step;
873875
ui64 TxId;
874876
NPQ::TPartitionId Partition;
875877
TMaybe<bool> Predicate;
878+
TString IssueMsg;
876879
};
877880

878881
struct TEvProposePartitionConfig : public TEventLocal<TEvProposePartitionConfig, EvProposePartitionConfig> {

ydb/core/persqueue/partition.cpp

Lines changed: 336 additions & 176 deletions
Large diffs are not rendered by default.

ydb/core/persqueue/partition.h

Lines changed: 80 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,14 @@ class TPartitionCompaction;
6363

6464
struct TTransaction {
6565

66-
explicit TTransaction(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> tx,
67-
TMaybe<bool> predicate = Nothing())
66+
TTransaction(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> tx,
67+
TInstant calcPredicateTimestamp,
68+
TMaybe<bool> predicate = Nothing())
6869
: Tx(tx)
6970
, Predicate(predicate)
7071
, SupportivePartitionActor(tx->SupportivePartitionActor)
7172
, CalcPredicateSpan(std::move(tx->Span))
73+
, CalcPredicateTimestamp(calcPredicateTimestamp)
7274
{
7375
Y_ABORT_UNLESS(Tx);
7476
}
@@ -128,6 +130,7 @@ struct TTransaction {
128130
NWilson::TSpan CommitSpan;
129131

130132
TInstant WriteInfoResponseTimestamp;
133+
TInstant CalcPredicateTimestamp;
131134
};
132135
class TPartitionCompaction;
133136

@@ -333,11 +336,31 @@ class TPartition : public TActorBootstrapped<TPartition> {
333336
TAutoPtr<TEvPersQueue::TEvHasDataInfoResponse> MakeHasDataInfoResponse(ui64 lagSize, const TMaybe<ui64>& cookie, bool readingFinished = false);
334337

335338
void ProcessTxsAndUserActs(const TActorContext& ctx);
336-
void ContinueProcessTxsAndUserActs(const TActorContext& ctx);
337-
void ProcessCommitQueue();
338339
void RunPersist();
339340

340-
void MoveUserActOrTxToCommitState();
341+
enum class EProcessResult;
342+
struct TAffectedSourceIdsAndConsumers;
343+
344+
void ProcessUserActionAndTxEvents();
345+
EProcessResult ProcessUserActionAndTxEvent(TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>& event,
346+
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
347+
EProcessResult ProcessUserActionAndTxEvent(TSimpleSharedPtr<TTransaction>& tx,
348+
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
349+
EProcessResult ProcessUserActionAndTxEvent(TMessage& msg,
350+
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
351+
352+
void MoveUserActionAndTxToPendingCommitQueue();
353+
354+
void ProcessUserActionAndTxPendingCommits();
355+
void ProcessUserActionAndTxPendingCommit(TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>& event,
356+
TEvKeyValue::TEvRequest* request);
357+
void ProcessUserActionAndTxPendingCommit(TSimpleSharedPtr<TTransaction>& tx,
358+
TEvKeyValue::TEvRequest* request);
359+
void ProcessUserActionAndTxPendingCommit(TMessage& msg,
360+
TEvKeyValue::TEvRequest* request);
361+
362+
bool WritingCycleDoesNotExceedTheLimits() const;
363+
341364
void PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> event);
342365
void PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event);
343366
void PushFrontDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event);
@@ -419,14 +442,13 @@ class TPartition : public TActorBootstrapped<TPartition> {
419442
const TString& reason);
420443
THolder<TEvPQ::TEvTxCommitDone> MakeCommitDone(ui64 step, ui64 txId);
421444

422-
bool BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event);
445+
bool BeginTransactionConfig();
423446

424447
void CommitTransaction(TSimpleSharedPtr<TTransaction>& t);
425448
void RollbackTransaction(TSimpleSharedPtr<TTransaction>& t);
426449

427450

428-
void BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config,
429-
const TActorContext& ctx);
451+
void BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config);
430452
void ExecChangePartitionConfig();
431453

432454
void OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx);
@@ -717,10 +739,10 @@ class TPartition : public TActorBootstrapped<TPartition> {
717739
TMaybe<i16> KafkaProducerEpoch = 0;
718740
};
719741

720-
THashSet<TString> TxAffectedSourcesIds;
721-
THashSet<TString> WriteAffectedSourcesIds;
722-
THashSet<TString> TxAffectedConsumers;
723-
THashSet<TString> SetOffsetAffectedConsumers;
742+
THashMap<TString, size_t> TxAffectedSourcesIds;
743+
THashMap<TString, size_t> WriteAffectedSourcesIds;
744+
THashMap<TString, size_t> TxAffectedConsumers;
745+
THashMap<TString, size_t> SetOffsetAffectedConsumers;
724746
THashMap<TString, TSourceIdPostPersistInfo> TxSourceIdForPostPersist;
725747
THashMap<TString, TSeqNoProducerEpoch> TxInflightMaxSeqNoPerSourceId;
726748

@@ -775,44 +797,61 @@ class TPartition : public TActorBootstrapped<TPartition> {
775797

776798
TMaybe<TUsersInfoStorage> UsersInfoStorage;
777799

778-
// template <class T> T& GetUserActionAndTransactionEventsFront();
779-
// template <class T> T& GetCurrentEvent();
780-
//TSimpleSharedPtr<TTransaction>& GetCurrentTransaction();
800+
struct TAffectedSourceIdsAndConsumers {
801+
TVector<TString> TxWriteSourcesIds;
802+
TVector<TString> WriteSourcesIds;
803+
TVector<TString> TxReadConsumers;
804+
TVector<TString> ReadConsumers;
805+
ui32 WriteKeysSize = 0;
806+
ui32 WriteCycleSize = 0;
807+
};
808+
809+
void AppendAffectedSourceIdsAndConsumers(const TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
781810

782-
EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>& event);
783-
EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr<TEvPersQueue::TEvProposeTransaction>& event);
784-
EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr<TTransaction>& tx);
785-
EProcessResult PreProcessUserActionOrTransaction(TMessage& msg);
811+
void DeleteAffectedSourceIdsAndConsumers();
812+
void DeleteAffectedSourceIdsAndConsumers(const TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
813+
void DeleteFromSet(const TVector<TString>& p, THashMap<TString, size_t>& q) const;
786814

787-
bool ExecUserActionOrTransaction(TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>& event, TEvKeyValue::TEvRequest* request);
815+
EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>& event,
816+
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
817+
EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr<TTransaction>& tx,
818+
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
819+
EProcessResult PreProcessUserActionOrTransaction(TMessage& msg,
820+
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
788821

789-
bool ExecUserActionOrTransaction(TSimpleSharedPtr<TEvPersQueue::TEvProposeTransaction>& event,
790-
TEvKeyValue::TEvRequest* request);
822+
bool ExecUserActionOrTransaction(TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>& events, TEvKeyValue::TEvRequest* request);
791823
bool ExecUserActionOrTransaction(TSimpleSharedPtr<TTransaction>& tx, TEvKeyValue::TEvRequest* request);
792824
bool ExecUserActionOrTransaction(TMessage& msg, TEvKeyValue::TEvRequest* request);
793825

794-
[[nodiscard]] EProcessResult PreProcessUserAct(TEvPQ::TEvSetClientInfo& act, const TActorContext& ctx);
826+
[[nodiscard]] EProcessResult PreProcessUserAct(TEvPQ::TEvSetClientInfo& act,
827+
TAffectedSourceIdsAndConsumers* affectedSourceIdsAndConsumers);
795828
void CommitUserAct(TEvPQ::TEvSetClientInfo& act);
796829

797830

798-
[[nodiscard]] EProcessResult PreProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx);
831+
[[nodiscard]] EProcessResult PreProcessImmediateTx(TTransaction& t,
832+
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
799833
void ExecImmediateTx(TTransaction& tx);
800834

801-
EProcessResult PreProcessRequest(TRegisterMessageGroupMsg& msg);
802-
EProcessResult PreProcessRequest(TDeregisterMessageGroupMsg& msg);
803-
EProcessResult PreProcessRequest(TSplitMessageGroupMsg& msg);
804-
EProcessResult PreProcessRequest(TWriteMsg& msg);
835+
EProcessResult PreProcessRequest(TRegisterMessageGroupMsg& msg,
836+
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
837+
EProcessResult PreProcessRequest(TDeregisterMessageGroupMsg& msg,
838+
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
839+
EProcessResult PreProcessRequest(TSplitMessageGroupMsg& msg,
840+
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
841+
EProcessResult PreProcessRequest(TWriteMsg& msg,
842+
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
805843

806844
void ExecRequest(TRegisterMessageGroupMsg& msg, ProcessParameters& parameters);
807845
void ExecRequest(TDeregisterMessageGroupMsg& msg, ProcessParameters& parameters);
808846
void ExecRequest(TSplitMessageGroupMsg& msg, ProcessParameters& parameters);
809847
bool ExecRequest(TWriteMsg& msg, ProcessParameters& parameters, TEvKeyValue::TEvRequest* request);
810848

811-
[[nodiscard]] EProcessResult BeginTransaction(const TEvPQ::TEvTxCalcPredicate& event, TMaybe<bool>& predicate);
849+
[[nodiscard]] EProcessResult BeginTransactionData(TTransaction& t,
850+
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
812851

813-
EProcessResult ApplyWriteInfoResponse(TTransaction& tx);
852+
EProcessResult ApplyWriteInfoResponse(TTransaction& tx,
853+
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
814854

815-
bool FirstEvent = true;
816855
bool HaveWriteMsg = false;
817856
bool HaveData = false;
818857
bool HaveCheckDisk = false;
@@ -833,13 +872,12 @@ class TPartition : public TActorBootstrapped<TPartition> {
833872
void BeginAppendHeadWithNewWrites(const TActorContext& ctx);
834873
void EndAppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx);
835874

875+
bool HasPendingCommitsOrPendingWrites() const;
876+
836877
//
837878
// user actions and transactions
838879
//
839880
struct TUserActionAndTransactionEvent {
840-
std::variant<TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>, // user actions
841-
TSimpleSharedPtr<TTransaction>, // distributed transaction or update config
842-
TMessage> Event;
843881
TUserActionAndTransactionEvent(TSimpleSharedPtr<TTransaction>&& transaction)
844882
: Event(std::move(transaction))
845883
{}
@@ -849,10 +887,16 @@ class TPartition : public TActorBootstrapped<TPartition> {
849887
TUserActionAndTransactionEvent(TMessage&& message)
850888
: Event(std::move(message))
851889
{}
890+
891+
std::variant<TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>, // user actions
892+
TSimpleSharedPtr<TTransaction>, // distributed transaction or update config
893+
TMessage> Event;
894+
TAffectedSourceIdsAndConsumers AffectedSourceIdsAndConsumers;
852895
};
853896

854897
std::deque<TUserActionAndTransactionEvent> UserActionAndTransactionEvents;
855898
std::deque<TUserActionAndTransactionEvent> UserActionAndTxPendingCommit;
899+
std::deque<TUserActionAndTransactionEvent> UserActionAndTxPendingWrite;
856900
TVector<THolder<TEvPQ::TEvGetWriteInfoResponse>> WriteInfosApplied;
857901

858902
THashMap<ui64, TSimpleSharedPtr<TTransaction>> TransactionsInflight;
@@ -875,15 +919,6 @@ class TPartition : public TActorBootstrapped<TPartition> {
875919
TMessageQueue Responses;
876920
ui64 CurrentBatchSize = 0;
877921

878-
enum class ETxBatchingState{
879-
PreProcessing,
880-
Executing,
881-
Finishing
882-
};
883-
ETxBatchingState BatchingState = ETxBatchingState::PreProcessing;
884-
//
885-
//
886-
//
887922
std::deque<std::pair<TString, ui64>> UpdateUserInfoTimestamp;
888923
bool ReadingTimestamp;
889924
TString ReadingForUser;
@@ -1067,15 +1102,15 @@ class TPartition : public TActorBootstrapped<TPartition> {
10671102
size_t WriteNewSizeFromSupportivePartitions = 0;
10681103

10691104
bool TryAddDeleteHeadKeysToPersistRequest();
1070-
void DumpKeyValueRequest(const NKikimrClient::TKeyValueRequest& request);
1105+
void DumpKeyValueRequest(const NKikimrClient::TKeyValueRequest& request) const;
10711106

10721107
TBlobKeyTokenPtr MakeBlobKeyToken(const TString& key);
10731108

1109+
void DumpTheSizeOfInternalQueues() const;
10741110

10751111
TIntrusivePtr<NJaegerTracing::TSamplingThrottlingControl> SamplingControl;
10761112
TDeque<NWilson::TTraceId> TxForPersistTraceIds;
10771113
TDeque<NWilson::TSpan> TxForPersistSpans;
1078-
bool CanProcessUserActionAndTransactionEvents() const;
10791114
};
10801115

10811116
} // namespace NKikimr::NPQ

ydb/core/persqueue/partition_init.cpp

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -532,29 +532,26 @@ enum EKeyPosition {
532532
// Calculates the location of keys relative to each other
533533
static EKeyPosition KeyPosition(const TKey& lhs, const TKey& rhs)
534534
{
535-
// Called from FilterBlobsMetaData. The keys are pre-sorted
536-
Y_ABORT_UNLESS(lhs.GetOffset() <= rhs.GetOffset(),
537-
"lhs: %s, rhs: %s",
538-
lhs.ToString().data(), rhs.ToString().data());
539-
540535
if (lhs.GetOffset() == rhs.GetOffset()) {
541536
if (lhs.GetPartNo() == rhs.GetPartNo()) {
542-
Y_ABORT_UNLESS(lhs.GetCount() <= rhs.GetCount(),
543-
"lhs: %s, rhs: %s",
544-
lhs.ToString().data(), rhs.ToString().data());
545-
return RhsContainsLhs;
537+
if (lhs.GetCount() < rhs.GetCount()) {
538+
return RhsContainsLhs;
539+
} else if (lhs.GetCount() == rhs.GetCount()) {
540+
if (lhs.GetInternalPartsCount() < rhs.GetInternalPartsCount()) {
541+
return RhsContainsLhs;
542+
} else {
543+
return LhsContainsRhs;
544+
}
545+
} else {
546+
return LhsContainsRhs;
547+
}
548+
} else if (lhs.GetPartNo() > rhs.GetPartNo()) {
549+
return LhsContainsRhs;
550+
} else {
551+
return RhsAfterLhs;
546552
}
547-
548-
// case lhs.GetOffset() == rhs.GetOffset() && lhs.GetPartNo() < rhs.GetPartNo()
549-
Y_ABORT_UNLESS(lhs.GetPartNo() + lhs.GetInternalPartsCount() <= rhs.GetPartNo(),
550-
"lhs: %s, rhs: %s",
551-
lhs.ToString().data(), rhs.ToString().data());
552-
553-
return RhsAfterLhs;
554553
}
555554

556-
// case lhs.GetOffset() < rhs.GetOffset()
557-
558555
if (ui64 nextOffset = lhs.GetOffset() + lhs.GetCount(); nextOffset > rhs.GetOffset()) {
559556
return LhsContainsRhs;
560557
} else if (nextOffset == rhs.GetOffset()) {

0 commit comments

Comments
 (0)