Skip to content

Commit e4d2412

Browse files
authored
make sure that seqno is unique among all tasks (#27909)
2 parents 1928e96 + dde224b commit e4d2412

File tree

3 files changed

+15
-5
lines changed

3 files changed

+15
-5
lines changed

ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
4646
, LockMode(settings.HasLockMode() ? settings.GetLockMode() : TMaybe<NKikimrDataEvents::ELockMode>())
4747
, SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT)
4848
, LookupStrategy(settings.GetLookupStrategy())
49-
, StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), args.TypeEnv, args.HolderFactory, args.InputDesc))
49+
, StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), args.TaskId, args.TypeEnv, args.HolderFactory, args.InputDesc))
5050
, Counters(counters)
5151
, LookupActorSpan(TWilsonKqp::LookupActor, std::move(args.TraceId), "LookupActor")
5252
{

ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ namespace NKikimr {
1616
namespace NKqp {
1717

1818
constexpr ui64 MAX_IN_FLIGHT_LIMIT = 500;
19+
constexpr ui64 SEQNO_SPACE = 40;
20+
constexpr ui64 MaxTaskId = (1ULL << (64 - SEQNO_SPACE));
1921

2022
namespace {
2123
std::vector<std::pair<ui64, TOwnedTableRange>> GetRangePartitioning(const TKqpStreamLookupWorker::TPartitionInfo& partitionInfo,
@@ -498,11 +500,15 @@ class TKqpLookupRows : public TKqpStreamLookupWorker {
498500

499501
class TKqpJoinRows : public TKqpStreamLookupWorker {
500502
public:
501-
TKqpJoinRows(TLookupSettings&& settings, const NMiniKQL::TTypeEnvironment& typeEnv,
503+
TKqpJoinRows(TLookupSettings&& settings, ui64 taskId,
504+
const NMiniKQL::TTypeEnvironment& typeEnv,
502505
const NMiniKQL::THolderFactory& holderFactory, const NYql::NDqProto::TTaskInput& inputDesc)
503506
: TKqpStreamLookupWorker(std::move(settings), typeEnv, holderFactory)
504-
, InputDesc(inputDesc) {
505-
507+
, InputDesc(inputDesc)
508+
, InputRowSeqNo(taskId << SEQNO_SPACE)
509+
, InputRowSeqNoLast((taskId + 1) << SEQNO_SPACE)
510+
{
511+
YQL_ENSURE(taskId < MaxTaskId);
506512
// read columns should contain join key and result columns
507513
for (auto joinKey : Settings.LookupKeyColumns) {
508514
ReadColumns.emplace(joinKey->Name, *joinKey);
@@ -537,6 +543,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
537543
lastRow = cookie.LastRow;
538544
} else {
539545
rowSeqNo = InputRowSeqNo++;
546+
YQL_ENSURE(InputRowSeqNo < InputRowSeqNoLast);
540547
}
541548

542549
if (joinKey.HasValue()) {
@@ -1193,13 +1200,15 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
11931200
absl::flat_hash_map<TOwnedCellVec, TJoinKeyInfo, NKikimr::TCellVectorsHash, NKikimr::TCellVectorsEquals> PendingLeftRowsByKey;
11941201
std::unordered_map<ui64, TResultBatch> ResultRowsBySeqNo;
11951202
ui64 InputRowSeqNo = 0;
1203+
ui64 InputRowSeqNoLast = 0;
11961204
ui64 JoinKeySeqNo = 0;
11971205
ui64 CurrentResultSeqNo = 0;
11981206
NMiniKQL::TStructType* LeftRowType = nullptr;
11991207
NKikimr::NMiniKQL::TTupleType* InputTupleType = nullptr;
12001208
};
12011209

12021210
std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings,
1211+
ui64 taskId,
12031212
const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory,
12041213
const NYql::NDqProto::TTaskInput& inputDesc) {
12051214

@@ -1250,7 +1259,7 @@ std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKq
12501259
return std::make_unique<TKqpLookupRows>(std::move(preparedSettings), typeEnv, holderFactory);
12511260
case NKqpProto::EStreamLookupStrategy::JOIN:
12521261
case NKqpProto::EStreamLookupStrategy::SEMI_JOIN:
1253-
return std::make_unique<TKqpJoinRows>(std::move(preparedSettings), typeEnv, holderFactory, inputDesc);
1262+
return std::make_unique<TKqpJoinRows>(std::move(preparedSettings), taskId, typeEnv, holderFactory, inputDesc);
12541263
default:
12551264
return {};
12561265
}

ydb/core/kqp/runtime/kqp_stream_lookup_worker.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ class TKqpStreamLookupWorker {
9191
};
9292

9393
std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings,
94+
ui64 taskId,
9495
const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory,
9596
const NYql::NDqProto::TTaskInput& inputDesc);
9697

0 commit comments

Comments
 (0)