@@ -62,7 +62,7 @@ type Scheduler struct {
6262
6363// New creates a new data retrieval.
6464func New (cfg * dblabCfg.Config , docker * client.Client , pm * pool.Manager , tm * telemetry.Agent , runner runners.Runner ) * Retrieval {
65- return & Retrieval {
65+ r := & Retrieval {
6666 cfg : & cfg .Retrieval ,
6767 global : & cfg .Global ,
6868 docker : docker ,
@@ -74,6 +74,20 @@ func New(cfg *dblabCfg.Config, docker *client.Client, pm *pool.Manager, tm *tele
7474 Status : models .Inactive ,
7575 },
7676 }
77+
78+ for _ , jobName := range r .cfg .Jobs {
79+ jobSpec , ok := r .cfg .JobsSpec [jobName ]
80+ if ! ok {
81+ continue
82+ }
83+
84+ jobSpec .Name = jobName
85+ r .jobSpecs [jobName ] = jobSpec
86+ }
87+
88+ r .defineRetrievalMode ()
89+
90+ return r
7791}
7892
7993// Reload reloads retrieval configuration.
@@ -100,7 +114,9 @@ func (r *Retrieval) Run(ctx context.Context) error {
100114 runCtx , cancel := context .WithCancel (ctx )
101115 r .ctxCancel = cancel
102116
103- fsManager , err := r .getPoolToDataRefresh ()
117+ log .Msg ("Retrieval mode:" , r .State .Mode )
118+
119+ fsManager , err := r .getPoolToDataRetrieving ()
104120 if err != nil {
105121 var skipError * SkipRefreshingError
106122 if errors .As (err , & skipError ) {
@@ -118,7 +134,7 @@ func (r *Retrieval) Run(ctx context.Context) error {
118134 return fmt .Errorf ("failed to choose pool to refresh: %w" , err )
119135 }
120136
121- log .Msg ("Pool to perform a full refresh : " , fsManager .Pool ().Name )
137+ log .Msg ("Pool to perform data retrieving : " , fsManager .Pool ().Name )
122138
123139 if err := r .run (runCtx , fsManager ); err != nil {
124140 r .tm .SendEvent (ctx , telemetry .AlertEvent , telemetry.Alert {Level : models .RefreshFailed , Message : err .Error ()})
@@ -130,7 +146,7 @@ func (r *Retrieval) Run(ctx context.Context) error {
130146 return nil
131147}
132148
133- func (r * Retrieval ) getPoolToDataRefresh () (pool.FSManager , error ) {
149+ func (r * Retrieval ) getPoolToDataRetrieving () (pool.FSManager , error ) {
134150 firstPool := r .poolManager .First ()
135151 if firstPool == nil {
136152 return nil , errors .New ("no available pools" )
@@ -140,6 +156,12 @@ func (r *Retrieval) getPoolToDataRefresh() (pool.FSManager, error) {
140156 return firstPool , nil
141157 }
142158
159+ // For physical or unknown modes, changing the pool is possible only by the refresh timetable.
160+ if r .State .Mode != models .Logical {
161+ return firstPool , nil
162+ }
163+
164+ // For logical mode try to find another pool to avoid rewriting prepared data.
143165 elementToRefresh := r .poolManager .GetPoolToUpdate ()
144166
145167 if elementToRefresh == nil || elementToRefresh .Value == nil {
@@ -220,8 +242,6 @@ func (r *Retrieval) configure(fsm pool.FSManager) error {
220242 return errors .Wrap (err , "invalid data retrieval configuration" )
221243 }
222244
223- r .defineRetrievalMode ()
224-
225245 return nil
226246}
227247
@@ -237,14 +257,11 @@ func (r *Retrieval) parseJobs(fsm pool.FSManager) error {
237257 r .jobs = make ([]components.JobRunner , 0 , len (r .cfg .Jobs ))
238258
239259 for _ , jobName := range r .cfg .Jobs {
240- jobSpec , ok := r .cfg . JobsSpec [jobName ]
260+ jobSpec , ok := r .jobSpecs [jobName ]
241261 if ! ok {
242262 return errors .Errorf ("Job %q not found" , jobName )
243263 }
244264
245- jobSpec .Name = jobName
246- r .jobSpecs [jobName ] = jobSpec
247-
248265 jobCfg := config.JobConfig {
249266 Spec : jobSpec ,
250267 Docker : r .docker ,
@@ -315,11 +332,15 @@ func (r *Retrieval) hasPhysicalJob() bool {
315332func (r * Retrieval ) defineRetrievalMode () {
316333 if r .hasPhysicalJob () {
317334 r .State .Mode = models .Physical
335+ return
318336 }
319337
320338 if r .hasLogicalJob () {
321339 r .State .Mode = models .Logical
340+ return
322341 }
342+
343+ r .State .Mode = models .Unknown
323344}
324345
325346func (r * Retrieval ) prepareEnvironment (fsm pool.FSManager ) error {
0 commit comments