55package oracle .kubernetes .operator .helpers ;
66
77import io .kubernetes .client .models .V1Pod ;
8- import java .util .ArrayList ;
9- import java .util .Collection ;
10- import java .util .HashMap ;
11- import java .util .Iterator ;
12- import java .util .List ;
13- import java .util .Map ;
14- import oracle .kubernetes .operator .PodAwaiterStepFactory ;
8+ import java .util .*;
9+ import java .util .concurrent .ConcurrentLinkedQueue ;
1510import oracle .kubernetes .operator .ProcessingConstants ;
1611import oracle .kubernetes .operator .logging .LoggingFacade ;
1712import oracle .kubernetes .operator .logging .LoggingFactory ;
@@ -74,7 +69,7 @@ public NextAction apply(Packet packet) {
7469 List <String > availableServers = getReadyServers (info );
7570
7671 Collection <StepAndPacket > serversThatCanRestartNow = new ArrayList <>();
77- Map <String , Collection <StepAndPacket >> clusteredRestarts = new HashMap <>();
72+ Map <String , Queue <StepAndPacket >> clusteredRestarts = new HashMap <>();
7873
7974 List <String > servers = new ArrayList <>();
8075 for (Map .Entry <String , StepAndPacket > entry : rolling .entrySet ()) {
@@ -96,9 +91,9 @@ public NextAction apply(Packet packet) {
9691 }
9792
9893 // clustered server
99- Collection <StepAndPacket > cr = clusteredRestarts .get (clusterName );
94+ Queue <StepAndPacket > cr = clusteredRestarts .get (clusterName );
10095 if (cr == null ) {
101- cr = new ArrayList <>();
96+ cr = new ConcurrentLinkedQueue <>();
10297 clusteredRestarts .put (clusterName , cr );
10398 }
10499 cr .add (entry .getValue ());
@@ -116,7 +111,7 @@ public NextAction apply(Packet packet) {
116111 }
117112
118113 if (!clusteredRestarts .isEmpty ()) {
119- for (Map .Entry <String , Collection <StepAndPacket >> entry : clusteredRestarts .entrySet ()) {
114+ for (Map .Entry <String , Queue <StepAndPacket >> entry : clusteredRestarts .entrySet ()) {
120115 work .add (
121116 new StepAndPacket (
122117 new RollSpecificClusterStep (entry .getKey (), entry .getValue (), null ), packet ));
@@ -160,13 +155,13 @@ private static List<String> getReadyServers(DomainPresenceInfo info) {
160155
161156 private static class RollSpecificClusterStep extends Step {
162157 private final String clusterName ;
163- private final Iterator <StepAndPacket > it ;
158+ private final Queue <StepAndPacket > servers ;
164159
165160 public RollSpecificClusterStep (
166- String clusterName , Collection <StepAndPacket > clusteredServerRestarts , Step next ) {
161+ String clusterName , Queue <StepAndPacket > clusteredServerRestarts , Step next ) {
167162 super (next );
168163 this .clusterName = clusterName ;
169- it = clusteredServerRestarts . iterator () ;
164+ servers = clusteredServerRestarts ;
170165 }
171166
172167 @ Override
@@ -176,89 +171,58 @@ public String getDetail() {
176171
177172 @ Override
178173 public NextAction apply (Packet packet ) {
179- synchronized (it ) {
180- if (it .hasNext ()) {
181- DomainPresenceInfo info = packet .getSPI (DomainPresenceInfo .class );
182- WlsDomainConfig config =
183- (WlsDomainConfig ) packet .get (ProcessingConstants .DOMAIN_TOPOLOGY );
184-
185- // Refresh as this is constantly changing
186- Domain dom = info .getDomain ();
187- // These are presently Ready servers
188- List <String > availableServers = getReadyServers (info );
189-
190- List <String > servers = new ArrayList <>();
191- List <String > readyServers = new ArrayList <>();
192- List <V1Pod > notReadyServers = new ArrayList <>();
193-
194- Collection <StepAndPacket > serversThatCanRestartNow = new ArrayList <>();
195-
196- int countReady = 0 ;
197- WlsClusterConfig cluster = config != null ? config .getClusterConfig (clusterName ) : null ;
198- if (cluster != null ) {
199- List <WlsServerConfig > serversConfigs = cluster .getServerConfigs ();
200- if (serversConfigs != null ) {
201- for (WlsServerConfig s : serversConfigs ) {
202- // figure out how many servers are currently ready
203- String name = s .getName ();
204- if (availableServers .contains (name )) {
205- readyServers .add (s .getName ());
206- countReady ++;
207- } else {
208- V1Pod pod = info .getServerPod (name );
209- if (pod != null ) {
210- notReadyServers .add (pod );
211- }
212- }
213- }
214- }
215- }
174+ DomainPresenceInfo info = packet .getSPI (DomainPresenceInfo .class );
175+ WlsDomainConfig config = (WlsDomainConfig ) packet .get (ProcessingConstants .DOMAIN_TOPOLOGY );
216176
217- // then add as many as possible next() entries leaving at least minimum cluster
218- // availability
219- while (countReady -- > dom .getMinAvailable (clusterName )) {
220- StepAndPacket current = it .next ();
221- WlsServerConfig serverConfig =
222- (WlsServerConfig ) current .packet .get (ProcessingConstants .SERVER_SCAN );
223- String serverName = null ;
224- if (serverConfig != null ) {
225- serverName = serverConfig .getName ();
226- } else if (config != null ) {
227- serverName = config .getAdminServerName ();
228- }
229- if (serverName != null ) {
230- servers .add (serverName );
231- }
232- serversThatCanRestartNow .add (current );
233- if (!it .hasNext ()) {
234- break ;
235- }
236- }
177+ // Refresh as this is constantly changing
178+ Domain dom = info .getDomain ();
179+ // These are presently Ready servers
180+ List <String > availableServers = getReadyServers (info );
237181
238- if (serversThatCanRestartNow .isEmpty ()) {
239- // Not enough servers are ready to let us restart a server now
240- if (!notReadyServers .isEmpty ()) {
241- PodAwaiterStepFactory pw = PodHelper .getPodAwaiterStepFactory (packet );
242- Collection <StepAndPacket > waitForUnreadyServers = new ArrayList <>();
243- for (V1Pod pod : notReadyServers ) {
244- waitForUnreadyServers .add (
245- new StepAndPacket (pw .waitForReady (pod , null ), packet .clone ()));
246- }
247-
248- // Wait for at least one of the not-yet-ready servers to become ready
249- return doForkAtLeastOne (this , packet , waitForUnreadyServers );
250- } else {
251- throw new IllegalStateException ();
182+ List <String > readyServers = new ArrayList <>();
183+
184+ int countReady = 0 ;
185+ WlsClusterConfig cluster = config != null ? config .getClusterConfig (clusterName ) : null ;
186+ if (cluster != null ) {
187+ List <WlsServerConfig > serversConfigs = cluster .getServerConfigs ();
188+ if (serversConfigs != null ) {
189+ for (WlsServerConfig s : serversConfigs ) {
190+ // figure out how many servers are currently ready
191+ String name = s .getName ();
192+ if (availableServers .contains (name )) {
193+ readyServers .add (s .getName ());
194+ countReady ++;
252195 }
253196 }
197+ }
198+ }
254199
255- readyServers .removeAll (servers );
256- LOGGER .info (MessageKeys .ROLLING_SERVERS , dom .getDomainUID (), servers , readyServers );
200+ LOGGER .info (MessageKeys .ROLLING_SERVERS , dom .getDomainUID (), servers , readyServers );
257201
258- return doNext (new ServersThatCanRestartNowStep (serversThatCanRestartNow , this ), packet );
259- }
202+ int countToRestartNow = Math .max (1 , countReady - dom .getMinAvailable (clusterName ));
203+ Collection <StepAndPacket > restarts = new ArrayList <>();
204+ for (int i = 0 ; i < countToRestartNow ; i ++) {
205+ restarts .add (new StepAndPacket (new RestartOneClusteredServerStep (servers , null ), packet ));
260206 }
207+ return doForkJoin (getNext (), packet , restarts );
208+ }
209+ }
210+
211+ private static class RestartOneClusteredServerStep extends Step {
212+ private final Queue <StepAndPacket > servers ;
261213
214+ public RestartOneClusteredServerStep (Queue <StepAndPacket > servers , Step next ) {
215+ super (next );
216+ this .servers = servers ;
217+ }
218+
219+ @ Override
220+ public NextAction apply (Packet packet ) {
221+ StepAndPacket serverToRestart = servers .poll ();
222+ if (serverToRestart != null ) {
223+ Collection <StepAndPacket > col = Collections .singleton (serverToRestart );
224+ return doForkJoin (this , packet , col );
225+ }
262226 return doNext (packet );
263227 }
264228 }
0 commit comments