|
30 | 30 | import org.joda.time.Period; |
31 | 31 |
|
32 | 32 | import java.util.*; |
33 | | -import java.util.concurrent.ConcurrentHashMap; |
34 | | -import java.util.concurrent.Semaphore; |
| 33 | +import java.util.concurrent.*; |
| 34 | +import java.util.concurrent.atomic.AtomicBoolean; |
35 | 35 | import java.util.stream.Collectors; |
36 | 36 |
|
37 | 37 | import static cd.go.contrib.elasticagents.docker.DockerPlugin.LOG; |
|
40 | 40 | public class DockerContainers implements AgentInstances<DockerContainer> { |
41 | 41 | private final Map<String, DockerContainer> instances = new ConcurrentHashMap<>(); |
42 | 42 | private List<JobIdentifier> jobsWaitingForAgentCreation = new ArrayList<>(); |
43 | | - private boolean refreshed; |
| 43 | + private AtomicBoolean refreshed = new AtomicBoolean(false); |
| 44 | + private final int FORCE_REFRESH_TIMEOUT_MINUTES = 60; |
| 45 | + |
44 | 46 | public Clock clock = Clock.DEFAULT; |
45 | 47 |
|
46 | 48 | final Semaphore semaphore = new Semaphore(0, true); |
47 | 49 |
|
| 50 | + private final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor(); |
| 51 | + |
| 52 | + public DockerContainers() { |
| 53 | + scheduleForceRefresh(); |
| 54 | + } |
| 55 | + |
48 | 56 | @Override |
49 | 57 | public DockerContainer create(CreateAgentRequest request, PluginRequest pluginRequest, ConsoleLogAppender consoleLogAppender) throws Exception { |
50 | 58 | LOG.info(String.format("[Create Agent] Processing create agent request for %s", request.jobIdentifier())); |
@@ -77,16 +85,26 @@ public DockerContainer create(CreateAgentRequest request, PluginRequest pluginRe |
77 | 85 | } |
78 | 86 | } |
79 | 87 |
|
| 88 | + private void scheduleForceRefresh() { |
| 89 | + timerService.scheduleAtFixedRate(this::forceNextRefresh, 0, FORCE_REFRESH_TIMEOUT_MINUTES, TimeUnit.MINUTES); |
| 90 | + } |
| 91 | + |
80 | 92 | private void doWithLockOnSemaphore(Runnable runnable) { |
81 | 93 | synchronized (semaphore) { |
82 | 94 | runnable.run(); |
83 | 95 | } |
84 | 96 | } |
85 | 97 |
|
| 98 | + protected void forceNextRefresh() { |
| 99 | + refreshed.set(false); |
| 100 | + } |
| 101 | + |
86 | 102 | @Override |
87 | 103 | public void terminate(String agentId, PluginSettings settings) throws Exception { |
| 104 | + LOG.info("(claykirk) -> terminating agent " + agentId); |
88 | 105 | DockerContainer instance = instances.get(agentId); |
89 | 106 | if (instance != null) { |
| 107 | + LOG.info("(claykirk) -> found instance " + instance.name() + " to terminate."); |
90 | 108 | instance.terminate(docker(settings)); |
91 | 109 | } else { |
92 | 110 | LOG.warn("Requested to terminate an instance that does not exist " + agentId); |
@@ -120,29 +138,36 @@ public void terminateUnregisteredInstances(PluginSettings settings, Agents agent |
120 | 138 | @Override |
121 | 139 | public Agents instancesCreatedAfterTimeout(PluginSettings settings, Agents agents) { |
122 | 140 | ArrayList<Agent> oldAgents = new ArrayList<>(); |
| 141 | + LOG.info("(claykirk) Looking to retire agents from list: {}", String.join(", ", agents.agentIds())); |
123 | 142 | for (Agent agent : agents.agents()) { |
124 | 143 | DockerContainer instance = instances.get(agent.elasticAgentId()); |
125 | 144 | if (instance == null) { |
| 145 | + LOG.info("(claykirk) couldn't find agent with elastic id: {}", agent.elasticAgentId()); |
126 | 146 | continue; |
127 | 147 | } |
128 | 148 |
|
| 149 | + LOG.info("(claykirk) instance: {} has creation time: {}", agent.elasticAgentId(), instance.createdAt()); |
129 | 150 | if (clock.now().isAfter(instance.createdAt().plus(settings.getAutoRegisterPeriod()))) { |
| 151 | + LOG.info("(claykirk) agent with id: {} is out of date", agent.elasticAgentId()); |
130 | 152 | oldAgents.add(agent); |
| 153 | + } else { |
| 154 | + LOG.info("(claykirk) agent with id: {} is not out of date", agent.elasticAgentId()); |
131 | 155 | } |
132 | 156 | } |
133 | 157 |
|
| 158 | + LOG.info("(claykirk) -> instances created after timeout: {}", oldAgents.stream().map(Agent::elasticAgentId).collect(Collectors.joining(", "))); |
134 | 159 | return new Agents(oldAgents); |
135 | 160 | } |
136 | 161 |
|
137 | 162 | @Override |
138 | 163 | public void refreshAll(ClusterProfileProperties clusterProfileProperties) throws Exception { |
139 | | - if (!refreshed) { |
| 164 | + if (!refreshed.get()){ |
140 | 165 | DockerClient docker = docker(clusterProfileProperties); |
141 | 166 | List<Container> containers = docker.listContainers(DockerClient.ListContainersParam.withLabel(Constants.CREATED_BY_LABEL_KEY, Constants.PLUGIN_ID)); |
142 | 167 | for (Container container : containers) { |
143 | 168 | register(DockerContainer.fromContainerInfo(docker.inspectContainer(container.id()))); |
144 | 169 | } |
145 | | - refreshed = true; |
| 170 | + refreshed.set(true); |
146 | 171 | } |
147 | 172 | } |
148 | 173 |
|
@@ -219,4 +244,9 @@ public Optional<DockerContainer> find(JobIdentifier jobIdentifier) { |
219 | 244 | protected boolean isEmpty() { |
220 | 245 | return instances.isEmpty(); |
221 | 246 | } |
| 247 | + |
| 248 | + |
| 249 | + public String getInstanceIds() { |
| 250 | + return String.join(", ", instances.keySet()); |
| 251 | + } |
222 | 252 | } |
0 commit comments