Skip to content

Commit 292fcd8

Browse files
authored
dq: idle watermarks: idle channel part (#26841)
1 parent 6108e84 commit 292fcd8

28 files changed

+525
-407
lines changed

ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@ struct TComputeActorAsyncInputHelperAsync : public TComputeActorAsyncInputHelper
2424
const TString& logPrefix,
2525
ui64 index,
2626
NDqProto::EWatermarksMode watermarksMode,
27+
TDuration watermarksIdleTimeout,
2728
ui64& cookie,
2829
int& inflight
2930
)
30-
: TComputeActorAsyncInputHelper(logPrefix, index, watermarksMode)
31+
: TComputeActorAsyncInputHelper(logPrefix, index, watermarksMode, watermarksIdleTimeout)
3132
, TaskRunnerActor(nullptr)
3233
, Cookie(cookie)
3334
, Inflight(inflight)
@@ -127,10 +128,11 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
127128

128129
TComputeActorAsyncInputHelperAsync CreateInputHelper(const TString& logPrefix,
129130
ui64 index,
130-
NDqProto::EWatermarksMode watermarksMode
131+
NDqProto::EWatermarksMode watermarksMode,
132+
TDuration watermarksIdleTimeout
131133
)
132134
{
133-
return TComputeActorAsyncInputHelperAsync(logPrefix, index, watermarksMode, Cookie, ProcessSourcesState.Inflight);
135+
return TComputeActorAsyncInputHelperAsync(logPrefix, index, watermarksMode, watermarksIdleTimeout, Cookie, ProcessSourcesState.Inflight);
134136
}
135137

136138
const IDqAsyncInputBuffer* GetInputTransform(ui64 inputIdx, const TComputeActorAsyncInputHelperSync&) const {
@@ -614,7 +616,9 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
614616
auto finished = channelData.GetFinished();
615617

616618
TMaybe<TInstant> watermark;
617-
if (channelData.HasWatermark()) {
619+
if (finished && inputChannel->WatermarksMode == NDqProto::WATERMARKS_MODE_DEFAULT) {
620+
watermark = TInstant::Max();
621+
} else if (channelData.HasWatermark()) {
618622
watermark = TInstant::MicroSeconds(channelData.GetWatermark().GetTimestampUs());
619623
}
620624

@@ -645,7 +649,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
645649
Checkpoints->RegisterCheckpoint(checkpoint, channelData.GetChannelId());
646650
}
647651

648-
TakeInputChannelDataRequests[Cookie++] = TTakeInputChannelData{ack, channelData.GetChannelId(), watermark};
652+
TakeInputChannelDataRequests[Cookie++] = TTakeInputChannelData{ack, finished, channelData.GetChannelId(), watermark};
649653
}
650654

651655
void PassAway() override {
@@ -780,16 +784,6 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
780784
ResumeExecution(EResumeSource::CAWatermarkInject);
781785
}
782786

783-
for (auto inputChannelId : ev->Get()->FinishedInputsWithWatermarks) {
784-
CA_LOG_T("Unregister watermarked input channel " << inputChannelId);
785-
WatermarksTracker.UnregisterInputChannel(inputChannelId);
786-
}
787-
788-
for (auto sourceId : ev->Get()->FinishedSourcesWithWatermarks) {
789-
CA_LOG_T("Unregister watermarked async input " << sourceId);
790-
WatermarksTracker.UnregisterAsyncInput(sourceId);
791-
}
792-
793787
ReadyToCheckpointFlag = (bool) ev->Get()->ProgramState;
794788
if (ev->Get()->CheckpointRequestedFromTaskRunner) {
795789
CheckpointRequestedFromTaskRunner = false;
@@ -840,6 +834,12 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
840834
auto& source = it->second;
841835
source.PushStarted = false;
842836
source.FreeSpace = ev->Get()->FreeSpaceLeft;
837+
if (ev->Get()->Finish && source.WatermarksMode == NDqProto::WATERMARKS_MODE_DEFAULT) {
838+
WatermarksTracker.UnregisterAsyncInput(ev->Get()->Index);
839+
}
840+
if (source.IsPausedByWatermark()) {
841+
ScheduleIdlenessCheck();
842+
}
843843
if (--ProcessSourcesState.Inflight == 0) {
844844
CA_LOG_T("Send TEvContinueRun on OnAsyncInputPushFinished");
845845
AskContinueRun(Nothing(), false);
@@ -969,9 +969,12 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
969969

970970
inputChannel->FreeSpace = ev->Get()->FreeSpace;
971971

972-
if (watermark) {
972+
if (it->second.Finish && inputChannel->WatermarksMode == NDqProto::EWatermarksMode::WATERMARKS_MODE_DEFAULT) {
973+
WatermarksTracker.UnregisterInputChannel( channelId );
974+
} else if (watermark) {
973975
if (WatermarksTracker.NotifyInChannelWatermarkReceived( channelId, *watermark)) {
974976
CA_LOG_T("Pause input channel " << channelId << " because of watermark");
977+
ScheduleIdlenessCheck();
975978
inputChannel->Pause(*watermark); // XXX does nothing in async CA
976979
}
977980
}
@@ -1134,7 +1137,8 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
11341137
if (auto watermarkRequest = WatermarksTracker.GetPendingWatermark()) {
11351138
Y_ENSURE(*watermarkRequest >= ContinueRunEvent->WatermarkRequest);
11361139
ContinueRunEvent->WatermarkRequest = *watermarkRequest;
1137-
MetricsReporter.ReportInjectedToTaskRunnerWatermark(*watermarkRequest, WatermarksTracker.GetWatermarkDiscrepancy());
1140+
MetricsReporter.ReportInjectedToTaskRunnerWatermark(*watermarkRequest, *WatermarksTracker.GetMaxWatermark() - *watermarkRequest);
1141+
CA_LOG_T("Injecting watermark to TaskRunnerActorLocal " << watermarkRequest);
11381142
}
11391143

11401144
if (!UseCpuQuota()) {
@@ -1198,6 +1202,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
11981202

11991203
struct TTakeInputChannelData {
12001204
bool Ack;
1205+
bool Finish;
12011206
ui64 ChannelId;
12021207
TMaybe<TInstant> Watermark;
12031208
};

ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,22 @@ struct TComputeActorAsyncInputHelper {
2222
TIssuesBuffer IssuesBuffer;
2323
bool Finished = false;
2424
const NDqProto::EWatermarksMode WatermarksMode = NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED;
25+
const TDuration WatermarksIdleTimeout = TDuration::Max();
2526
const NKikimr::NMiniKQL::TType* ValueType = nullptr;
2627
TMaybe<TInstant> PendingWatermark = Nothing();
2728
TMaybe<NKikimr::NMiniKQL::TProgramBuilder> ProgramBuilder;
2829
public:
2930
TComputeActorAsyncInputHelper(
3031
const TString& logPrefix,
3132
ui64 index,
32-
NDqProto::EWatermarksMode watermarksMode)
33+
NDqProto::EWatermarksMode watermarksMode,
34+
TDuration watermarksIdleTimeout)
3335
: LogPrefix(logPrefix)
3436
, Index(index)
3537
, IssuesBuffer(IssuesBufferSize)
36-
, WatermarksMode(watermarksMode) {}
38+
, WatermarksMode(watermarksMode)
39+
, WatermarksIdleTimeout(watermarksIdleTimeout)
40+
{}
3741

3842
bool IsPausedByWatermark() const {
3943
return PendingWatermark.Defined();
@@ -78,7 +82,7 @@ struct TComputeActorAsyncInputHelper {
7882

7983
metricsReporter.ReportAsyncInputData(Index, batch.RowCount(), space, watermark);
8084

81-
if (watermark) {
85+
if (watermark && !finished) {
8286
const auto inputWatermarkChanged = watermarksTracker.NotifyAsyncInputWatermarkReceived(
8387
Index,
8488
*watermark);

ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ enum class EResumeSource : ui32 {
5656
CAPendingOutput,
5757
CATaskRunnerCreated,
5858
CAWatermarkInject,
59+
CAWatermarkIdleness,
5960
CAWakeupCallback,
6061

6162
Last,

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

Lines changed: 53 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
102102
struct TEvPrivate {
103103
enum EEv : ui32 {
104104
EvAsyncOutputError = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
105+
EvCheckIdleness,
105106
EvEnd
106107
};
107108

@@ -116,6 +117,14 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
116117
NYql::NDqProto::StatusIds::StatusCode StatusCode;
117118
NYql::TIssues Issues;
118119
};
120+
121+
struct TEvCheckIdleness : public NActors::TEventLocal<TEvCheckIdleness, EvCheckIdleness> {
122+
TEvCheckIdleness(TInstant checkTime)
123+
: CheckTime(checkTime)
124+
{}
125+
126+
TInstant CheckTime;
127+
};
119128
};
120129

121130
protected:
@@ -137,6 +146,8 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
137146
RuntimeSettings.StatsMode, MemoryLimits.ChannelBufferSize, this, this->GetActivityType());
138147
this->RegisterWithSameMailbox(Channels);
139148

149+
InitializeWatermarks();
150+
140151
if (RuntimeSettings.Timeout) {
141152
CA_LOG_D("Set execution timeout " << *RuntimeSettings.Timeout);
142153
this->Schedule(*RuntimeSettings.Timeout, new NActors::TEvents::TEvWakeup(EEvWakeupTag::TimeoutTag));
@@ -317,6 +328,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
317328
hFunc(IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived, OnNewAsyncInputDataArrived);
318329
hFunc(IDqComputeActorAsyncInput::TEvAsyncInputError, OnAsyncInputError);
319330
hFunc(TEvPrivate::TEvAsyncOutputError, HandleAsyncOutputError);
331+
hFunc(TEvPrivate::TEvCheckIdleness, HandleCheckIdleness);
320332
default: {
321333
CA_LOG_C("TDqComputeActorBase, unexpected event: " << ev->GetTypeRewrite() << " (" << GetEventTypeString(ev) << ")");
322334
InternalError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TIssuesIds::DEFAULT_ERROR, TStringBuilder() << "Unexpected event: " << ev->GetTypeRewrite() << " (" << GetEventTypeString(ev) << ")");
@@ -843,6 +855,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
843855
IDqInputChannel::TPtr Channel;
844856
bool HasPeer = false;
845857
const NDqProto::EWatermarksMode WatermarksMode;
858+
const TDuration WatermarksIdleTimeout = TDuration::Max();
846859
std::optional<NDqProto::TCheckpoint> PendingCheckpoint;
847860
const NDqProto::ECheckpointingMode CheckpointingMode;
848861
i64 FreeSpace = 0;
@@ -852,11 +865,13 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
852865
ui64 channelId,
853866
ui32 srcStageId,
854867
NDqProto::EWatermarksMode watermarksMode,
855-
NDqProto::ECheckpointingMode checkpointingMode)
868+
NDqProto::ECheckpointingMode checkpointingMode,
869+
TDuration watermarksIdleTimeout)
856870
: LogPrefix(logPrefix)
857871
, ChannelId(channelId)
858872
, SrcStageId(srcStageId)
859873
, WatermarksMode(watermarksMode)
874+
, WatermarksIdleTimeout(watermarksIdleTimeout)
860875
, CheckpointingMode(checkpointingMode)
861876
{
862877
}
@@ -1587,6 +1602,25 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
15871602
InternalError(ev->Get()->StatusCode, ev->Get()->Issues);
15881603
}
15891604

1605+
void HandleCheckIdleness(const TEvPrivate::TEvCheckIdleness::TPtr& ev) {
1606+
auto checkTime = Max(ev->Get()->CheckTime, TInstant::Now());
1607+
if (WatermarksTracker.ProcessIdlenessCheck(checkTime)) {
1608+
auto idleWatermark = WatermarksTracker.HandleIdleness(checkTime);
1609+
if (idleWatermark) {
1610+
CA_LOG_T("Idleness watermark " << idleWatermark);
1611+
ResumeExecution(EResumeSource::CAWatermarkIdleness);
1612+
}
1613+
}
1614+
ScheduleIdlenessCheck();
1615+
}
1616+
1617+
void ScheduleIdlenessCheck() {
1618+
if (auto checkTime = WatermarksTracker.PrepareIdlenessCheck()) {
1619+
CA_LOG_T("Schedule next idleness check at " << checkTime);
1620+
this->Schedule(*checkTime, new TEvPrivate::TEvCheckIdleness(*checkTime));
1621+
}
1622+
}
1623+
15901624
bool AllAsyncOutputsFinished() const {
15911625
for (const auto& [outputIndex, sinkInfo] : SinksMap) {
15921626
if (!sinkInfo.FinishIsAcknowledged) {
@@ -1618,26 +1652,36 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
16181652
Y_ABORT_UNLESS(!inputDesc.HasSource() || inputDesc.ChannelsSize() == 0); // HasSource => no channels
16191653

16201654
if (inputDesc.HasSource()) {
1621-
watermarksMode = inputDesc.GetSource().GetWatermarksMode();
1655+
auto& source = inputDesc.GetSource();
1656+
watermarksMode = source.GetWatermarksMode();
1657+
TDuration watermarksIdleTimeout = TDuration::Max();
1658+
if (source.HasWatermarksIdleTimeoutUs()) {
1659+
watermarksIdleTimeout = TDuration::MicroSeconds(source.GetWatermarksIdleTimeoutUs());
1660+
}
16221661
auto result = SourcesMap.emplace(
16231662
i,
1624-
static_cast<TDerived*>(this)->CreateInputHelper(LogPrefix, i, watermarksMode)
1663+
static_cast<TDerived*>(this)->CreateInputHelper(LogPrefix, i, watermarksMode, watermarksIdleTimeout)
16251664
);
16261665
YQL_ENSURE(result.second);
16271666
} else {
16281667
for (auto& channel : inputDesc.GetChannels()) {
16291668
auto channelWatermarksMode = channel.GetWatermarksMode();
1669+
TDuration watermarksIdleTimeout = TDuration::Max();
16301670
if (channelWatermarksMode != NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED) {
16311671
watermarksMode = channelWatermarksMode;
16321672
}
1673+
if (channel.HasWatermarksIdleTimeoutUs()) {
1674+
watermarksIdleTimeout = TDuration::MicroSeconds(channel.GetWatermarksIdleTimeoutUs());
1675+
}
16331676
auto result = InputChannelsMap.emplace(
16341677
channel.GetId(),
16351678
TInputChannelInfo(
16361679
LogPrefix,
16371680
channel.GetId(),
16381681
channel.GetSrcStageId(),
16391682
channelWatermarksMode,
1640-
channel.GetCheckpointingMode())
1683+
channel.GetCheckpointingMode(),
1684+
watermarksIdleTimeout)
16411685
);
16421686
YQL_ENSURE(result.second);
16431687
}
@@ -1646,8 +1690,9 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
16461690
if (inputDesc.HasTransform()) {
16471691
auto result = InputTransformsMap.emplace(
16481692
i,
1649-
TAsyncInputTransformHelper(LogPrefix, i, watermarksMode)
1693+
TAsyncInputTransformHelper(LogPrefix, i, watermarksMode, TDuration::Max())
16501694
);
1695+
// TODO: watermarksIdleTimeout (currently unused)
16511696
YQL_ENSURE(result.second);
16521697
}
16531698
}
@@ -1687,21 +1732,21 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
16871732

16881733
RequestContext = MakeIntrusive<NYql::NDq::TRequestContext>(Task.GetRequestContext());
16891734

1690-
InitializeWatermarks();
16911735
InitializeLogPrefix(); // note: SelfId is not initialized here
16921736
}
16931737

16941738
private:
16951739
void InitializeWatermarks() {
1740+
TInstant now = TInstant::Now();
16961741
for (const auto& [id, source] : SourcesMap) {
16971742
if (source.WatermarksMode == NDqProto::EWatermarksMode::WATERMARKS_MODE_DEFAULT) {
1698-
WatermarksTracker.RegisterAsyncInput(id);
1743+
WatermarksTracker.RegisterAsyncInput(id, source.WatermarksIdleTimeout, now);
16991744
}
17001745
}
17011746

17021747
for (const auto& [id, channel] : InputChannelsMap) {
17031748
if (channel.WatermarksMode == NDqProto::EWatermarksMode::WATERMARKS_MODE_DEFAULT) {
1704-
WatermarksTracker.RegisterInputChannel(id);
1749+
WatermarksTracker.RegisterInputChannel(id, channel.WatermarksIdleTimeout, now);
17051750
}
17061751
}
17071752
}

0 commit comments

Comments
 (0)