44
55package oracle .kubernetes .operator ;
66
7- import java .util .ArrayList ;
8- import java .util .List ;
9- import java .util .Map ;
10- import java .util .Optional ;
11- import java .util .concurrent .ConcurrentHashMap ;
12- import java .util .concurrent .ConcurrentMap ;
13- import java .util .concurrent .ScheduledFuture ;
14- import java .util .concurrent .TimeUnit ;
15- import java .util .concurrent .atomic .AtomicInteger ;
16- import javax .annotation .Nullable ;
7+ import static oracle .kubernetes .operator .helpers .LegalNames .toJobIntrospectorName ;
178
189import io .kubernetes .client .models .V1ConfigMap ;
10+ import io .kubernetes .client .models .V1ContainerState ;
11+ import io .kubernetes .client .models .V1ContainerStatus ;
1912import io .kubernetes .client .models .V1Event ;
2013import io .kubernetes .client .models .V1ObjectMeta ;
2114import io .kubernetes .client .models .V1ObjectReference ;
2417import io .kubernetes .client .models .V1Service ;
2518import io .kubernetes .client .models .V1ServiceList ;
2619import io .kubernetes .client .util .Watch ;
20+ import java .util .ArrayList ;
21+ import java .util .List ;
22+ import java .util .Map ;
23+ import java .util .Optional ;
24+ import java .util .concurrent .ConcurrentHashMap ;
25+ import java .util .concurrent .ConcurrentMap ;
26+ import java .util .concurrent .ScheduledFuture ;
27+ import java .util .concurrent .TimeUnit ;
28+ import java .util .concurrent .atomic .AtomicInteger ;
29+ import javax .annotation .Nullable ;
2730import oracle .kubernetes .operator .TuningParameters .MainTuning ;
2831import oracle .kubernetes .operator .calls .CallResponse ;
2932import oracle .kubernetes .operator .helpers .CallBuilder ;
@@ -148,12 +151,9 @@ static Step bringAdminServerUp(
148151 return Step .chain (bringAdminServerUpSteps (info , podAwaiterStepFactory , next ));
149152 }
150153
151- private static Step [] domainIntrospectionSteps (DomainPresenceInfo info , Step next ) {
152- Domain dom = info .getDomain ();
154+ private static Step [] domainIntrospectionSteps (Step next ) {
153155 List <Step > resources = new ArrayList <>();
154- resources .add (
155- JobHelper .deleteDomainIntrospectorJobStep (
156- dom .getDomainUid (), dom .getMetadata ().getNamespace (), null ));
156+ resources .add (JobHelper .deleteDomainIntrospectorJobStep (null ));
157157 resources .add (JobHelper .createDomainIntrospectorJobStep (next ));
158158 return resources .toArray (new Step [0 ]);
159159 }
@@ -206,40 +206,67 @@ public void stopNamespace(String ns) {
206206 }
207207
208208 public void dispatchPodWatch (Watch .Response <V1Pod > item ) {
209- V1Pod pod = item .object ;
210- if (pod != null ) {
211- V1ObjectMeta metadata = pod .getMetadata ();
212- String domainUid = metadata .getLabels ().get (LabelConstants .DOMAINUID_LABEL );
213- String serverName = metadata .getLabels ().get (LabelConstants .SERVERNAME_LABEL );
214- if (domainUid != null && serverName != null ) {
215- DomainPresenceInfo info = getExistingDomainPresenceInfo (metadata .getNamespace (), domainUid );
216- if (info != null ) {
217- switch (item .type ) {
218- case "ADDED" :
219- info .setServerPodBeingDeleted (serverName , Boolean .FALSE );
220- // fall through
221- case "MODIFIED" :
222- info .setServerPodFromEvent (serverName , pod );
223- break ;
224- case "DELETED" :
225- boolean removed = info .deleteServerPodFromEvent (serverName , pod );
226- if (removed && info .isNotDeleting () && !info .isServerPodBeingDeleted (serverName )) {
227- LOGGER .info (
228- MessageKeys .POD_DELETED , domainUid , metadata .getNamespace (), serverName );
229- makeRightDomainPresence (info , true , false , true );
230- }
231- break ;
209+ if (getPodLabel (item .object , LabelConstants .DOMAINUID_LABEL ) == null ) return ;
232210
233- case "ERROR" :
234- default :
235- }
211+ if (getPodLabel (item .object , LabelConstants .SERVERNAME_LABEL ) != null )
212+ processServerPodWatch (item .object , item .type );
213+ else if (getPodLabel (item .object , LabelConstants .JOBNAME_LABEL ) != null )
214+ processIntrospectorJobPodWatch (item .object , item .type );
215+ }
216+
217+ private void processServerPodWatch (V1Pod pod , String watchType ) {
218+ String domainUid = getPodLabel (pod , LabelConstants .DOMAINUID_LABEL );
219+ DomainPresenceInfo info = getExistingDomainPresenceInfo (getNamespace (pod ), domainUid );
220+ if (info == null ) return ;
221+
222+ String serverName = getPodLabel (pod , LabelConstants .SERVERNAME_LABEL );
223+ switch (watchType ) {
224+ case "ADDED" :
225+ info .setServerPodBeingDeleted (serverName , Boolean .FALSE );
226+ // fall through
227+ case "MODIFIED" :
228+ info .setServerPodFromEvent (serverName , pod );
229+ break ;
230+ case "DELETED" :
231+ boolean removed = info .deleteServerPodFromEvent (serverName , pod );
232+ if (removed && info .isNotDeleting () && !info .isServerPodBeingDeleted (serverName )) {
233+ LOGGER .info (MessageKeys .POD_DELETED , domainUid , getNamespace (pod ), serverName );
234+ makeRightDomainPresence (info , true , false , true );
236235 }
237- }
236+ break ;
237+
238+ case "ERROR" :
239+ default :
238240 }
239241 }
240242
241- private V1Pod getNewerPod (V1Pod first , V1Pod second ) {
242- return KubernetesUtils .isFirstNewer (getMetadata (first ), getMetadata (second )) ? first : second ;
243+ private String getNamespace (V1Pod pod ) {
244+ return Optional .ofNullable (pod )
245+ .map (V1Pod ::getMetadata )
246+ .map (V1ObjectMeta ::getNamespace )
247+ .orElse (null );
248+ }
249+
250+ private String getPodLabel (V1Pod pod , String labelName ) {
251+ return Optional .ofNullable (pod )
252+ .map (V1Pod ::getMetadata )
253+ .map (V1ObjectMeta ::getLabels )
254+ .map (m -> m .get (labelName ))
255+ .orElse (null );
256+ }
257+
258+ private void processIntrospectorJobPodWatch (V1Pod pod , String watchType ) {
259+ String domainUid = getPodLabel (pod , LabelConstants .DOMAINUID_LABEL );
260+ DomainPresenceInfo info = getExistingDomainPresenceInfo (getNamespace (pod ), domainUid );
261+ if (info == null ) return ;
262+
263+ switch (watchType ) {
264+ case "ADDED" :
265+ case "MODIFIED" :
266+ new DomainStatusUpdate (info .getDomain (), pod , domainUid ).invoke ();
267+ break ;
268+ default :
269+ }
243270 }
244271
245272 /* Recently, we've seen a number of intermittent bugs where K8s reports
@@ -248,10 +275,6 @@ private V1Pod getNewerPod(V1Pod first, V1Pod second) {
248275 * a MODIFIED event for an object that has already had subsequent modifications.
249276 */
250277
251- private V1ObjectMeta getMetadata (V1Pod pod ) {
252- return pod == null ? null : pod .getMetadata ();
253- }
254-
255278 public void dispatchServiceWatch (Watch .Response <V1Service > item ) {
256279 V1Service service = item .object ;
257280 String domainUid = ServiceHelper .getServiceDomainUid (service );
@@ -360,32 +383,37 @@ private void scheduleDomainStatusUpdating(DomainPresenceInfo info) {
360383 DomainStatusUpdater .createStatusStep (main .statusUpdateTimeoutSeconds , null );
361384 FiberGate gate = getStatusFiberGate (info .getNamespace ());
362385
363- Fiber f = gate .startFiberIfNoCurrentFiber (
364- info .getDomainUid (),
365- strategy ,
366- packet ,
367- new CompletionCallback () {
368- @ Override
369- public void onCompletion (Packet packet ) {
370- AtomicInteger serverHealthRead =
371- packet .getValue (ProcessingConstants .REMAINING_SERVERS_HEALTH_TO_READ );
372- if (serverHealthRead == null || serverHealthRead .get () == 0 ) {
373- loggingFilter .setFiltering (false ).resetLogHistory ();
374- } else {
375- loggingFilter .setFiltering (true );
376- }
377- }
378-
379- @ Override
380- public void onThrowable (Packet packet , Throwable throwable ) {
381- LOGGER .severe (MessageKeys .EXCEPTION , throwable );
382- loggingFilter .setFiltering (true );
383- }
384- });
386+ Fiber f =
387+ gate .startFiberIfNoCurrentFiber (
388+ info .getDomainUid (),
389+ strategy ,
390+ packet ,
391+ new CompletionCallback () {
392+ @ Override
393+ public void onCompletion (Packet packet ) {
394+ AtomicInteger serverHealthRead =
395+ packet .getValue (
396+ ProcessingConstants .REMAINING_SERVERS_HEALTH_TO_READ );
397+ if (serverHealthRead == null || serverHealthRead .get () == 0 ) {
398+ loggingFilter .setFiltering (false ).resetLogHistory ();
399+ } else {
400+ loggingFilter .setFiltering (true );
401+ }
402+ }
403+
404+ @ Override
405+ public void onThrowable (Packet packet , Throwable throwable ) {
406+ LOGGER .severe (MessageKeys .EXCEPTION , throwable );
407+ loggingFilter .setFiltering (true );
408+ }
409+ });
385410 } catch (Throwable t ) {
386411 LOGGER .severe (MessageKeys .EXCEPTION , t );
387412 }
388- }, main .initialShortDelay , main .initialShortDelay , TimeUnit .SECONDS ));
413+ },
414+ main .initialShortDelay ,
415+ main .initialShortDelay ,
416+ TimeUnit .SECONDS ));
389417 }
390418
391419 public void makeRightDomainPresence (
@@ -429,6 +457,8 @@ public void makeRightDomainPresence(
429457
430458 private void internalMakeRightDomainPresence (
431459 @ Nullable DomainPresenceInfo info , boolean isDeleting , boolean isWillInterrupt ) {
460+ if (info == null ) return ;
461+
432462 String ns = info .getNamespace ();
433463 String domainUid = info .getDomainUid ();
434464 Domain dom = info .getDomain ();
@@ -456,6 +486,7 @@ private Step readExistingServices(DomainPresenceInfo info) {
456486 .listServiceAsync (info .getNamespace (), new ServiceListStep (info ));
457487 }
458488
489+ @ SuppressWarnings ("unused" )
459490 private void runDomainPlan (
460491 Domain dom ,
461492 String domainUid ,
@@ -537,7 +568,6 @@ Step createDomainUpPlan(DomainPresenceInfo info) {
537568 Step strategy =
538569 Step .chain (
539570 domainIntrospectionSteps (
540- info ,
541571 new DomainStatusStep (
542572 info ,
543573 bringAdminServerUp (
@@ -737,4 +767,33 @@ public NextAction apply(Packet packet) {
737767 return doNext (packet );
738768 }
739769 }
770+
771+ private class DomainStatusUpdate {
772+ private Domain domain ;
773+ private V1Pod pod ;
774+ private String domainUid ;
775+
776+ DomainStatusUpdate (Domain domain , V1Pod pod , String domainUid ) {
777+ this .domain = domain ;
778+ this .pod = pod ;
779+ this .domainUid = domainUid ;
780+ }
781+
782+ public void invoke () {
783+ Optional .ofNullable (getMatchingContainerStatus (pod , domainUid ).getState ())
784+ .map (V1ContainerState ::getWaiting )
785+ .ifPresent (waiting -> updateStatus (waiting .getReason (), waiting .getMessage ()));
786+ }
787+
788+ private void updateStatus (String reason , String message ) {
789+ KubernetesUtils .updateStatus (domain , reason , message );
790+ }
791+
792+ private V1ContainerStatus getMatchingContainerStatus (V1Pod pod , String domainUid ) {
793+ return pod .getStatus ().getContainerStatuses ().stream ()
794+ .filter (s -> toJobIntrospectorName (domainUid ).equals (s .getName ()))
795+ .findFirst ()
796+ .orElse (null );
797+ }
798+ }
740799}
0 commit comments