11#include " storage_proxy.h"
22
33#include " gc.h"
4+
5+ #include < ydb/core/base/appdata_fwd.h>
6+
47#include < ydb/core/fq/libs/config/protos/storage.pb.h>
58#include < ydb/core/fq/libs/control_plane_storage/util.h>
69#include " ydb_checkpoint_storage.h"
2225#include < util/string/join.h>
2326#include < util/string/strip.h>
2427
28+ #include < library/cpp/retry/retry_policy.h>
29+
2530namespace NFq {
2631
2732using namespace NActors ;
@@ -30,6 +35,8 @@ namespace {
3035
3136// //////////////////////////////////////////////////////////////////////////////
3237
38+ constexpr char CHECKPOINTS_TABLE_PREFIX[] = " .metadata/streaming/checkpoints" ;
39+
3340struct TStorageProxyMetrics : public TThrRefBase {
3441 explicit TStorageProxyMetrics (const ::NMonitoring::TDynamicCounterPtr& counters)
3542 : Counters(counters)
@@ -65,7 +72,32 @@ struct TRequestContext : public TThrRefBase {
6572 }
6673};
6774
75+ struct TEvPrivate {
76+ // Event ids
77+ enum EEv : ui32 {
78+ EvBegin = EventSpaceBegin (TEvents::ES_PRIVATE),
79+ EvInitResult = EvBegin,
80+ EvInitialize,
81+ EvEnd
82+ };
83+ static_assert (EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE), " expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)" );
84+
85+ // Events
86+ struct TEvInitResult : public TEventLocal <TEvInitResult, EvInitResult> {
87+ TEvInitResult (const NYql::TIssues& storageIssues, const NYql::TIssues& stateIssues)
88+ : StorageIssues(storageIssues)
89+ , StateIssues(stateIssues) {}
90+ NYql::TIssues StorageIssues;
91+ NYql::TIssues StateIssues;
92+ };
93+ struct TEvInitialize : public TEventLocal <TEvInitialize, EvInitialize> {
94+ };
95+ };
96+
6897class TStorageProxy : public TActorBootstrapped <TStorageProxy> {
98+ private:
99+ using IRetryPolicy = IRetryPolicy<>;
100+
69101 TCheckpointStorageSettings Config;
70102 TString IdsPrefix;
71103 TExternalStorageSettings StorageConfig;
@@ -75,7 +107,8 @@ class TStorageProxy : public TActorBootstrapped<TStorageProxy> {
75107 NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory;
76108 NYdb::TDriver Driver;
77109 const TStorageProxyMetricsPtr Metrics;
78- bool Initialized = false ;
110+ const IRetryPolicy::TPtr RetryPolicy;
111+ IRetryPolicy::IRetryState::TPtr RetryState;
79112
80113public:
81114 explicit TStorageProxy (
@@ -101,6 +134,8 @@ class TStorageProxy : public TActorBootstrapped<TStorageProxy> {
101134
102135 hFunc (NYql::NDq::TEvDqCompute::TEvSaveTaskState, Handle);
103136 hFunc (NYql::NDq::TEvDqCompute::TEvGetTaskState, Handle);
137+ hFunc (TEvPrivate::TEvInitResult, Handle);
138+ hFunc (TEvPrivate::TEvInitialize, Handle);
104139 )
105140
106141 void Handle (TEvCheckpointStorage::TEvRegisterCoordinatorRequest::TPtr& ev);
@@ -114,6 +149,8 @@ class TStorageProxy : public TActorBootstrapped<TStorageProxy> {
114149
115150 void Handle (NYql::NDq::TEvDqCompute::TEvSaveTaskState::TPtr& ev);
116151 void Handle (NYql::NDq::TEvDqCompute::TEvGetTaskState::TPtr& ev);
152+ void Handle (TEvPrivate::TEvInitResult::TPtr& ev);
153+ void Handle (TEvPrivate::TEvInitialize::TPtr& ev);
117154};
118155
119156static void FillDefaultParameters (TCheckpointStorageSettings& checkpointCoordinatorConfig, TExternalStorageSettings& ydbStorageConfig) {
@@ -137,15 +174,29 @@ TStorageProxy::TStorageProxy(
137174 , StorageConfig(Config.GetExternalStorage())
138175 , CredentialsProviderFactory(credentialsProviderFactory)
139176 , Driver(std::move(driver))
140- , Metrics(MakeIntrusive<TStorageProxyMetrics>(counters)) {
177+ , Metrics(MakeIntrusive<TStorageProxyMetrics>(counters))
178+ , RetryPolicy(IRetryPolicy::GetExponentialBackoffPolicy(
179+ [](){return ERetryErrorClass::LongRetry;},
180+ TDuration::MilliSeconds (100 ),
181+ TDuration::MilliSeconds(100 ),
182+ TDuration::Seconds(10 )
183+ )) {
141184 FillDefaultParameters (Config, StorageConfig);
142185}
143186
144187void TStorageProxy::Bootstrap () {
145188 LOG_STREAMS_STORAGE_SERVICE_INFO (" Bootstrap" );
146- auto ydbConnectionPtr = NewYdbConnection (Config.GetExternalStorage (), CredentialsProviderFactory, Driver);
147- CheckpointStorage = NewYdbCheckpointStorage (StorageConfig, CreateEntityIdGenerator (IdsPrefix), ydbConnectionPtr);
148- StateStorage = NewYdbStateStorage (Config, ydbConnectionPtr);
189+ IYdbConnection::TPtr ydbConnection;
190+ if (!StorageConfig.GetEndpoint ().empty ()) {
191+ LOG_STREAMS_STORAGE_SERVICE_INFO (" Create sdk ydb connection" );
192+ ydbConnection = CreateSdkYdbConnection (StorageConfig, CredentialsProviderFactory, Driver);
193+ } else {
194+ LOG_STREAMS_STORAGE_SERVICE_INFO (" Create local ydb connection" );
195+ ydbConnection = CreateLocalYdbConnection (NKikimr::AppData ()->TenantName , CHECKPOINTS_TABLE_PREFIX);
196+ }
197+ CheckpointStorage = NewYdbCheckpointStorage (StorageConfig, CreateEntityIdGenerator (IdsPrefix), ydbConnection);
198+ StateStorage = NewYdbStateStorage (Config, ydbConnection);
199+
149200 if (Config.GetCheckpointGarbageConfig ().GetEnabled ()) {
150201 const auto & gcConfig = Config.GetCheckpointGarbageConfig ();
151202 ActorGC = Register (NewGC (gcConfig, CheckpointStorage, StateStorage).release ());
@@ -159,27 +210,19 @@ void TStorageProxy::Bootstrap() {
159210}
160211
161212void TStorageProxy::Initialize () {
162- if (Initialized) {
163- return ;
164- }
165- LOG_STREAMS_STORAGE_SERVICE_INFO (" Initialize" );
166- Initialized = true ;
167-
168- auto issues = CheckpointStorage->Init ().GetValueSync ();
169- if (!issues.Empty ()) {
170- LOG_STREAMS_STORAGE_SERVICE_ERROR (" Failed to init checkpoint storage: " << issues.ToOneLineString ());
171- Initialized = false ;
172- }
173-
174- issues = StateStorage->Init ().GetValueSync ();
175- if (!issues.Empty ()) {
176- LOG_STREAMS_STORAGE_SERVICE_ERROR (" Failed to init checkpoint state storage: " << issues.ToOneLineString ());
177- Initialized = false ;
178- }
213+ LOG_STREAMS_STORAGE_SERVICE_TRACE (" Initialize" );
214+
215+ auto storageInitFuture = CheckpointStorage->Init ();
216+ auto stateInitFuture = StateStorage->Init ();
217+
218+ std::vector<NThreading::TFuture<NYql::TIssues>> futures{storageInitFuture, stateInitFuture};
219+ auto voidFuture = NThreading::WaitAll (futures);
220+ voidFuture.Subscribe ([futures = std::move (futures), actorId = this ->SelfId (), actorSystem = TActivationContext::ActorSystem ()](const auto & ) mutable {
221+ actorSystem->Send (actorId, new TEvPrivate::TEvInitResult (futures[0 ].GetValue (), futures[1 ].GetValue ()));
222+ });
179223}
180224
181225void TStorageProxy::Handle (TEvCheckpointStorage::TEvRegisterCoordinatorRequest::TPtr& ev) {
182- Initialize ();
183226 auto context = MakeIntrusive<TRequestContext>(Metrics);
184227
185228 const auto * event = ev->Get ();
@@ -435,6 +478,31 @@ void TStorageProxy::Handle(NYql::NDq::TEvDqCompute::TEvGetTaskState::TPtr& ev) {
435478 });
436479}
437480
481+ void TStorageProxy::Handle (TEvPrivate::TEvInitResult::TPtr& ev) {
482+ const auto * event = ev->Get ();
483+ if (!event->StorageIssues .Empty ()) {
484+ LOG_STREAMS_STORAGE_SERVICE_ERROR (" Failed to init checkpoint storage: " << event->StorageIssues .ToOneLineString ());
485+ }
486+ if (!event->StateIssues .Empty ()) {
487+ LOG_STREAMS_STORAGE_SERVICE_ERROR (" Failed to init state storage: " << event->StateIssues .ToOneLineString ());
488+ }
489+ if (!event->StorageIssues .Empty () || !event->StateIssues .Empty ()) {
490+ if (RetryState == nullptr ) {
491+ RetryState = RetryPolicy->CreateRetryState ();
492+ }
493+ if (auto delay = RetryState->GetNextRetryDelay ()) {
494+ LOG_STREAMS_STORAGE_SERVICE_INFO (" Schedule init retry after " << delay);
495+ Schedule (*delay, new TEvPrivate::TEvInitialize ());
496+ }
497+ return ;
498+ }
499+ LOG_STREAMS_STORAGE_SERVICE_INFO (" Checkpoint storage and state storage were successfully inited" );
500+ }
501+
502+ void TStorageProxy::Handle (TEvPrivate::TEvInitialize::TPtr& /* ev*/ ) {
503+ Initialize ();
504+ }
505+
438506} // namespace
439507
440508// //////////////////////////////////////////////////////////////////////////////
0 commit comments