3535import org .elasticsearch .ingest .geoip .GeoIpTaskState .Metadata ;
3636import org .elasticsearch .ingest .geoip .direct .DatabaseConfiguration ;
3737import org .elasticsearch .ingest .geoip .direct .DatabaseConfigurationMetadata ;
38- import org .elasticsearch .persistent .AllocatedPersistentTask ;
3938import org .elasticsearch .persistent .PersistentTasksCustomMetadata .PersistentTask ;
4039import org .elasticsearch .tasks .TaskId ;
41- import org .elasticsearch .threadpool .Scheduler ;
4240import org .elasticsearch .threadpool .ThreadPool ;
4341import org .elasticsearch .xcontent .XContentParser ;
4442import org .elasticsearch .xcontent .XContentParserConfiguration ;
5553import java .util .Map ;
5654import java .util .Objects ;
5755import java .util .Set ;
58- import java .util .concurrent .atomic .AtomicInteger ;
5956import java .util .function .Function ;
6057import java .util .function .Supplier ;
6158import java .util .regex .Pattern ;
7370@ NotMultiProjectCapable (
7471 description = "Enterprise GeoIP not available in serverless, we should review this class for MP again after serverless is enabled"
7572)
76- public class EnterpriseGeoIpDownloader extends AllocatedPersistentTask {
73+ public class EnterpriseGeoIpDownloader extends AbstractGeoIpDownloader {
7774
7875 private static final Logger logger = LogManager .getLogger (EnterpriseGeoIpDownloader .class );
7976
@@ -105,19 +102,9 @@ public class EnterpriseGeoIpDownloader extends AllocatedPersistentTask {
105102 private final Client client ;
106103 private final HttpClient httpClient ;
107104 private final ClusterService clusterService ;
108- private final ThreadPool threadPool ;
109105
110106 // visible for testing
111107 protected volatile EnterpriseGeoIpTaskState state ;
112- /**
113- * The currently scheduled periodic run. Only null before first periodic run.
114- */
115- private volatile Scheduler .ScheduledCancellable scheduledPeriodicRun ;
116- /**
117- * The number of requested runs. If this is greater than 0, then a run is either in progress or scheduled to run as soon as possible.
118- */
119- private final AtomicInteger queuedRuns = new AtomicInteger (0 );
120- private final Supplier <TimeValue > pollIntervalSupplier ;
121108 private final Function <String , char []> tokenProvider ;
122109
123110 EnterpriseGeoIpDownloader (
@@ -134,12 +121,10 @@ public class EnterpriseGeoIpDownloader extends AllocatedPersistentTask {
134121 Supplier <TimeValue > pollIntervalSupplier ,
135122 Function <String , char []> tokenProvider
136123 ) {
137- super (id , type , action , description , parentTask , headers );
124+ super (id , type , action , description , parentTask , headers , threadPool , pollIntervalSupplier );
138125 this .client = client ;
139126 this .httpClient = httpClient ;
140127 this .clusterService = clusterService ;
141- this .threadPool = threadPool ;
142- this .pollIntervalSupplier = pollIntervalSupplier ;
143128 this .tokenProvider = tokenProvider ;
144129 }
145130
@@ -397,111 +382,14 @@ static byte[] getChunk(InputStream is) throws IOException {
397382 return buf ;
398383 }
399384
400- /**
401- * Cancels the currently scheduled run (if any) and schedules a new periodic run using the current poll interval, then requests
402- * that the downloader runs on demand now. The main reason we need that last step is that if this persistent task
403- * gets reassigned to a different node, we want to run the downloader immediately on that new node, not wait for the next periodic run.
404- */
405- public void restartPeriodicRun () {
406- if (isCancelled () || isCompleted () || threadPool .scheduler ().isShutdown ()) {
407- logger .debug ("Not restarting periodic run because task is cancelled, completed, or shutting down" );
408- return ;
409- }
410- logger .debug ("Restarting periodic run" );
411- // We synchronize to ensure we only have one scheduledPeriodicRun at a time.
412- synchronized (this ) {
413- if (scheduledPeriodicRun != null ) {
414- // Technically speaking, there's a chance that the scheduled run is already running, in which case cancelling it here does
415- // nothing. That means that we might end up with two periodic runs scheduled close together. However, that's unlikely to
416- // happen and relatively harmless if it does, as we only end up running the downloader more often than strictly necessary.
417- final boolean cancelSuccessful = scheduledPeriodicRun .cancel ();
418- logger .debug ("Cancelled scheduled run: [{}]" , cancelSuccessful );
419- }
420- // This is based on the premise that the poll interval is sufficiently large that we don't need to worry about
421- // the scheduled `runPeriodic` running before this method completes.
422- scheduledPeriodicRun = threadPool .schedule (this ::runPeriodic , pollIntervalSupplier .get (), threadPool .generic ());
423- }
424- // Technically, with multiple rapid calls to restartPeriodicRun, we could end up with multiple calls to requestRunOnDemand, but
425- // that's unlikely to happen and harmless if it does, as we only end up running the downloader more often than strictly necessary.
426- requestRunOnDemand ();
427- }
428-
429- /**
430- * Runs the downloader now and schedules the next periodic run using the poll interval.
431- */
432- private void runPeriodic () {
433- if (isCancelled () || isCompleted () || threadPool .scheduler ().isShutdown ()) {
434- logger .debug ("Not running periodic downloader because task is cancelled, completed, or shutting down" );
435- return ;
436- }
437-
438- logger .trace ("Running periodic downloader" );
439- // There's a chance that an on-demand run is already in progress, in which case this periodic run is redundant.
440- // However, we don't try to avoid that case here, as it's harmless to run the downloader more than strictly necessary (due to
441- // the high default poll interval of 3d), and it simplifies the logic considerably.
442- requestRunOnDemand ();
443-
444- synchronized (this ) {
445- scheduledPeriodicRun = threadPool .schedule (this ::runPeriodic , pollIntervalSupplier .get (), threadPool .generic ());
446- }
447- }
448-
449- /**
450- * This method requests that the downloader runs on the latest cluster state, which likely contains a change in the GeoIP metadata.
451- * This method does nothing if this task is cancelled or completed.
452- */
453- public void requestRunOnDemand () {
454- if (isCancelled () || isCompleted ()) {
455- logger .debug ("Not requesting downloader to run on demand because task is cancelled or completed" );
456- return ;
457- }
458- logger .trace ("Requesting downloader run on demand" );
459- // If queuedRuns was greater than 0, then either a run is in progress and it will fire off another run when it finishes,
460- // or a run is scheduled to run as soon as possible and it will include the latest cluster state.
461- // If it was 0, we set it to 1 to indicate that a run is scheduled to run as soon as possible and schedule it now.
462- if (queuedRuns .getAndIncrement () == 0 ) {
463- logger .trace ("Scheduling downloader run on demand" );
464- threadPool .generic ().submit (this ::runOnDemand );
465- }
466- }
467-
468- /**
469- * Runs the downloader on the latest cluster state. {@link #queuedRuns} protects against multiple concurrent runs and ensures that
470- * if a run is requested while this method is running, then another run will be scheduled to run as soon as this method finishes.
471- */
472- private void runOnDemand () {
473- if (isCancelled () || isCompleted ()) {
474- logger .debug ("Not running downloader on demand because task is cancelled or completed" );
475- return ;
476- }
477- // Capture the current queue size, so that if another run is requested while we're running, we'll know at the end of this method
478- // whether we need to run again.
479- final int currentQueueSize = queuedRuns .get ();
480- logger .trace ("Running downloader on demand" );
481- try {
482- runDownloader ();
483- logger .trace ("Downloader completed successfully" );
484- } finally {
485- // If any exception was thrown during runDownloader, we still want to check queuedRuns.
486- // Subtract this "batch" of runs from queuedRuns.
487- // If queuedRuns is still > 0, then a run was requested while we were running, so we need to run again.
488- if (queuedRuns .addAndGet (-currentQueueSize ) > 0 ) {
489- logger .debug ("Downloader on demand requested again while running, scheduling another run" );
490- threadPool .generic ().submit (this ::runOnDemand );
491- }
492- }
493- }
494-
495- /**
496- * Downloads the geoip databases now based on the supplied cluster state.
497- */
385+ @ Override
498386 void runDownloader () {
499387 if (isCancelled () || isCompleted ()) {
500388 logger .debug ("Not running downloader because task is cancelled or completed" );
501389 return ;
502390 }
503391 // by the time we reach here, the state will never be null
504- assert this .state != null : "this.setState() is null. You need to call setState() before calling runDownloader()" ;
392+ assert this .state != null : "this.state is null. You need to call setState() before calling runDownloader()" ;
505393
506394 try {
507395 updateDatabases (); // n.b. this downloads bytes from the internet, it can take a while
@@ -531,16 +419,6 @@ private void cleanDatabases() {
531419 });
532420 }
533421
534- @ Override
535- protected void onCancelled () {
536- synchronized (this ) {
537- if (scheduledPeriodicRun != null ) {
538- scheduledPeriodicRun .cancel ();
539- }
540- }
541- markAsCompleted ();
542- }
543-
544422 private ProviderDownload downloaderFor (DatabaseConfiguration database ) {
545423 if (database .provider () instanceof DatabaseConfiguration .Maxmind maxmind ) {
546424 return new MaxmindDownload (database .name (), maxmind );
0 commit comments