33import io .fabric8 .kubernetes .client .CustomResource ;
44import io .javaoperatorsdk .operator .processing .KubernetesResourceUtils ;
55import io .javaoperatorsdk .operator .processing .event .AbstractEventSource ;
6+ import java .io .IOException ;
67import java .util .Map ;
78import java .util .Timer ;
89import java .util .TimerTask ;
910import java .util .concurrent .ConcurrentHashMap ;
11+ import java .util .concurrent .atomic .AtomicBoolean ;
1012import org .slf4j .Logger ;
1113import org .slf4j .LoggerFactory ;
1214
1315public class TimerEventSource extends AbstractEventSource {
1416
15- private Logger log = LoggerFactory .getLogger (TimerEventSource .class );
16-
1717 private final Timer timer = new Timer ();
18-
18+ private final AtomicBoolean running = new AtomicBoolean ();
1919 private final Map <String , EventProducerTimeTask > onceTasks = new ConcurrentHashMap <>();
2020 private final Map <String , EventProducerTimeTask > timerTasks = new ConcurrentHashMap <>();
21+ private Logger log = LoggerFactory .getLogger (TimerEventSource .class );
2122
2223 public void schedule (CustomResource customResource , long delay , long period ) {
24+ if (!running .get ()) {
25+ throw new IllegalStateException ("The TimerEventSource is not running" );
26+ }
27+
2328 String resourceUid = KubernetesResourceUtils .getUID (customResource );
2429 if (timerTasks .containsKey (resourceUid )) {
2530 return ;
@@ -30,6 +35,10 @@ public void schedule(CustomResource customResource, long delay, long period) {
3035 }
3136
3237 public void scheduleOnce (CustomResource customResource , long delay ) {
38+ if (!running .get ()) {
39+ throw new IllegalStateException ("The TimerEventSource is not running" );
40+ }
41+
3342 String resourceUid = KubernetesResourceUtils .getUID (customResource );
3443 if (onceTasks .containsKey (resourceUid )) {
3544 cancelOnceSchedule (resourceUid );
@@ -59,6 +68,19 @@ public void cancelOnceSchedule(String customResourceUid) {
5968 }
6069 }
6170
71+ @ Override
72+ public void start () {
73+ running .set (true );
74+ }
75+
76+ @ Override
77+ public void close () throws IOException {
78+ running .set (false );
79+ onceTasks .keySet ().forEach (this ::cancelOnceSchedule );
80+ timerTasks .keySet ().forEach (this ::cancelSchedule );
81+ timer .cancel ();
82+ }
83+
6284 public class EventProducerTimeTask extends TimerTask {
6385
6486 protected final String customResourceUid ;
@@ -69,8 +91,10 @@ public EventProducerTimeTask(String customResourceUid) {
6991
7092 @ Override
7193 public void run () {
72- log .debug ("Producing event for custom resource id: {}" , customResourceUid );
73- eventHandler .handleEvent (new TimerEvent (customResourceUid , TimerEventSource .this ));
94+ if (running .get ()) {
95+ log .debug ("Producing event for custom resource id: {}" , customResourceUid );
96+ eventHandler .handleEvent (new TimerEvent (customResourceUid , TimerEventSource .this ));
97+ }
7498 }
7599 }
76100}
0 commit comments