Skip to content

Commit f754a2f

Browse files
authored
Merge pull request #65 from caskdata/feature/fix-race-condition
Fix a race condition from net
2 parents 6255c8e + 3203699 commit f754a2f

File tree

2 files changed

+358
-1
lines changed

2 files changed

+358
-1
lines changed

src/main/java/co/cask/http/NettyHttpService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import co.cask.http.internal.BasicHandlerContext;
2020
import co.cask.http.internal.HttpDispatcher;
2121
import co.cask.http.internal.HttpResourceHandler;
22+
import co.cask.http.internal.NonStickyEventExecutorGroup;
2223
import co.cask.http.internal.RequestRouter;
2324
import io.netty.bootstrap.ServerBootstrap;
2425
import io.netty.channel.Channel;
@@ -39,7 +40,6 @@
3940
import io.netty.util.concurrent.EventExecutorGroup;
4041
import io.netty.util.concurrent.Future;
4142
import io.netty.util.concurrent.ImmediateEventExecutor;
42-
import io.netty.util.concurrent.NonStickyEventExecutorGroup;
4343
import io.netty.util.concurrent.UnorderedThreadPoolEventExecutor;
4444
import org.slf4j.Logger;
4545
import org.slf4j.LoggerFactory;
Lines changed: 357 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,357 @@
1+
/*
2+
* Copyright 2016 The Netty Project
3+
*
4+
* The Netty Project licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package co.cask.http.internal;
17+
18+
import io.netty.util.concurrent.AbstractEventExecutor;
19+
import io.netty.util.concurrent.EventExecutor;
20+
import io.netty.util.concurrent.EventExecutorGroup;
21+
import io.netty.util.concurrent.Future;
22+
import io.netty.util.concurrent.OrderedEventExecutor;
23+
import io.netty.util.concurrent.ScheduledFuture;
24+
import io.netty.util.internal.ObjectUtil;
25+
import io.netty.util.internal.PlatformDependent;
26+
import io.netty.util.internal.UnstableApi;
27+
28+
import java.util.Collection;
29+
import java.util.Iterator;
30+
import java.util.List;
31+
import java.util.Queue;
32+
import java.util.concurrent.Callable;
33+
import java.util.concurrent.ExecutionException;
34+
import java.util.concurrent.RejectedExecutionException;
35+
import java.util.concurrent.TimeUnit;
36+
import java.util.concurrent.TimeoutException;
37+
import java.util.concurrent.atomic.AtomicInteger;
38+
39+
/**
40+
* {@link EventExecutorGroup} which will preserve {@link Runnable} execution order but makes no guarantees about what
41+
* {@link EventExecutor} (and therefore {@link Thread}) will be used to execute the {@link Runnable}s.
42+
*
43+
* <p>The {@link EventExecutorGroup#next()} for the wrapped {@link EventExecutorGroup} must <strong>NOT</strong> return
44+
* executors of type {@link OrderedEventExecutor}.
45+
*
46+
* NOTE: This class is copied from the netty project to fix the netty bug #8230. This class should be removed
47+
* after the fix goes in the netty library
48+
*/
49+
@UnstableApi
50+
public final class NonStickyEventExecutorGroup implements EventExecutorGroup {
51+
private final EventExecutorGroup group;
52+
private final int maxTaskExecutePerRun;
53+
54+
/**
55+
* Creates a new instance. Be aware that the given {@link EventExecutorGroup} <strong>MUST NOT</strong> contain
56+
* any {@link OrderedEventExecutor}s.
57+
*/
58+
public NonStickyEventExecutorGroup(EventExecutorGroup group) {
59+
this(group, 1024);
60+
}
61+
62+
/**
63+
* Creates a new instance. Be aware that the given {@link EventExecutorGroup} <strong>MUST NOT</strong> contain
64+
* any {@link OrderedEventExecutor}s.
65+
*/
66+
public NonStickyEventExecutorGroup(EventExecutorGroup group, int maxTaskExecutePerRun) {
67+
this.group = verify(group);
68+
this.maxTaskExecutePerRun = ObjectUtil.checkPositive(maxTaskExecutePerRun, "maxTaskExecutePerRun");
69+
}
70+
71+
private static EventExecutorGroup verify(EventExecutorGroup group) {
72+
Iterator<EventExecutor> executors = ObjectUtil.checkNotNull(group, "group").iterator();
73+
while (executors.hasNext()) {
74+
EventExecutor executor = executors.next();
75+
if (executor instanceof OrderedEventExecutor) {
76+
throw new IllegalArgumentException("EventExecutorGroup " + group
77+
+ " contains OrderedEventExecutors: " + executor);
78+
}
79+
}
80+
return group;
81+
}
82+
83+
private NonStickyOrderedEventExecutor newExecutor(EventExecutor executor) {
84+
return new NonStickyOrderedEventExecutor(executor, maxTaskExecutePerRun);
85+
}
86+
87+
@Override
88+
public boolean isShuttingDown() {
89+
return group.isShuttingDown();
90+
}
91+
92+
@Override
93+
public Future<?> shutdownGracefully() {
94+
return group.shutdownGracefully();
95+
}
96+
97+
@Override
98+
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
99+
return group.shutdownGracefully(quietPeriod, timeout, unit);
100+
}
101+
102+
@Override
103+
public Future<?> terminationFuture() {
104+
return group.terminationFuture();
105+
}
106+
107+
@SuppressWarnings("deprecation")
108+
@Override
109+
public void shutdown() {
110+
group.shutdown();
111+
}
112+
113+
@SuppressWarnings("deprecation")
114+
@Override
115+
public List<Runnable> shutdownNow() {
116+
return group.shutdownNow();
117+
}
118+
119+
@Override
120+
public EventExecutor next() {
121+
return newExecutor(group.next());
122+
}
123+
124+
@Override
125+
public Iterator<EventExecutor> iterator() {
126+
final Iterator<EventExecutor> itr = group.iterator();
127+
return new Iterator<EventExecutor>() {
128+
@Override
129+
public boolean hasNext() {
130+
return itr.hasNext();
131+
}
132+
133+
@Override
134+
public EventExecutor next() {
135+
return newExecutor(itr.next());
136+
}
137+
138+
@Override
139+
public void remove() {
140+
itr.remove();
141+
}
142+
};
143+
}
144+
145+
@Override
146+
public Future<?> submit(Runnable task) {
147+
return group.submit(task);
148+
}
149+
150+
@Override
151+
public <T> Future<T> submit(Runnable task, T result) {
152+
return group.submit(task, result);
153+
}
154+
155+
@Override
156+
public <T> Future<T> submit(Callable<T> task) {
157+
return group.submit(task);
158+
}
159+
160+
@Override
161+
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
162+
return group.schedule(command, delay, unit);
163+
}
164+
165+
@Override
166+
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
167+
return group.schedule(callable, delay, unit);
168+
}
169+
170+
@Override
171+
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
172+
return group.scheduleAtFixedRate(command, initialDelay, period, unit);
173+
}
174+
175+
@Override
176+
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
177+
return group.scheduleWithFixedDelay(command, initialDelay, delay, unit);
178+
}
179+
180+
@Override
181+
public boolean isShutdown() {
182+
return group.isShutdown();
183+
}
184+
185+
@Override
186+
public boolean isTerminated() {
187+
return group.isTerminated();
188+
}
189+
190+
@Override
191+
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
192+
return group.awaitTermination(timeout, unit);
193+
}
194+
195+
@Override
196+
public <T> List<java.util.concurrent.Future<T>> invokeAll(
197+
Collection<? extends Callable<T>> tasks) throws InterruptedException {
198+
return group.invokeAll(tasks);
199+
}
200+
201+
@Override
202+
public <T> List<java.util.concurrent.Future<T>> invokeAll(
203+
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
204+
return group.invokeAll(tasks, timeout, unit);
205+
}
206+
207+
@Override
208+
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
209+
return group.invokeAny(tasks);
210+
}
211+
212+
@Override
213+
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
214+
throws InterruptedException, ExecutionException, TimeoutException {
215+
return group.invokeAny(tasks, timeout, unit);
216+
}
217+
218+
@Override
219+
public void execute(Runnable command) {
220+
group.execute(command);
221+
}
222+
223+
private static final class NonStickyOrderedEventExecutor extends AbstractEventExecutor
224+
implements Runnable, OrderedEventExecutor {
225+
private final EventExecutor executor;
226+
private final Queue<Runnable> tasks = PlatformDependent.newMpscQueue();
227+
228+
private static final int NONE = 0;
229+
private static final int SUBMITTED = 1;
230+
private static final int RUNNING = 2;
231+
232+
private final AtomicInteger state = new AtomicInteger();
233+
private final int maxTaskExecutePerRun;
234+
235+
NonStickyOrderedEventExecutor(EventExecutor executor, int maxTaskExecutePerRun) {
236+
super(executor);
237+
this.executor = executor;
238+
this.maxTaskExecutePerRun = maxTaskExecutePerRun;
239+
}
240+
241+
@Override
242+
public void run() {
243+
if (!state.compareAndSet(SUBMITTED, RUNNING)) {
244+
return;
245+
}
246+
for (;;) {
247+
int i = 0;
248+
try {
249+
for (; i < maxTaskExecutePerRun; i++) {
250+
Runnable task = tasks.poll();
251+
if (task == null) {
252+
break;
253+
}
254+
safeExecute(task);
255+
}
256+
} finally {
257+
if (i == maxTaskExecutePerRun) {
258+
try {
259+
state.set(SUBMITTED);
260+
executor.execute(this);
261+
return; // done
262+
} catch (Throwable ignore) {
263+
// Reset the state back to running as we will keep on executing tasks.
264+
state.set(RUNNING);
265+
// if an error happened we should just ignore it and let the loop run again as there is not
266+
// much else we can do. Most likely this was triggered by a full task queue. In this case
267+
// we just will run more tasks and try again later.
268+
}
269+
} else {
270+
state.set(NONE);
271+
// After setting the state to NONE, look at the tasks queue one more time.
272+
// If it is empty, then we can return from this method.
273+
// Otherwise, it means the producer thread has called execute(Runnable)
274+
// and enqueued a task in between the tasks.poll() above and the state.set(NONE) here.
275+
// There are two possible scenarios when this happen
276+
//
277+
// 1. The producer thread sees state == NONE, hence the compareAndSet(NONE, SUBMITTED)
278+
// is successfully setting the state to SUBMITTED. This mean the producer
279+
// will call / has called executor.execute(this). In this case, we can just return.
280+
// 2. The producer thread don't see the state change, hence the compareAndSet(NONE, SUBMITTED)
281+
// returns false. In this case, the producer thread won't call executor.execute.
282+
// In this case, we need to change the state to RUNNING and keeps running.
283+
//
284+
// The above cases can be distinguished by performing a
285+
// compareAndSet(NONE, RUNNING). If it returns "false", it is case 1; otherwise it is case 2.
286+
if (tasks.peek() == null || !state.compareAndSet(NONE, RUNNING)) {
287+
return; // done
288+
}
289+
}
290+
}
291+
}
292+
}
293+
294+
@Override
295+
public boolean inEventLoop(Thread thread) {
296+
return false;
297+
}
298+
299+
@Override
300+
public boolean inEventLoop() {
301+
return false;
302+
}
303+
304+
@Override
305+
public boolean isShuttingDown() {
306+
return executor.isShutdown();
307+
}
308+
309+
@Override
310+
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
311+
return executor.shutdownGracefully(quietPeriod, timeout, unit);
312+
}
313+
314+
@Override
315+
public Future<?> terminationFuture() {
316+
return executor.terminationFuture();
317+
}
318+
319+
@Override
320+
public void shutdown() {
321+
executor.shutdown();
322+
}
323+
324+
@Override
325+
public boolean isShutdown() {
326+
return executor.isShutdown();
327+
}
328+
329+
@Override
330+
public boolean isTerminated() {
331+
return executor.isTerminated();
332+
}
333+
334+
@Override
335+
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
336+
return executor.awaitTermination(timeout, unit);
337+
}
338+
339+
@Override
340+
public void execute(Runnable command) {
341+
if (!tasks.offer(command)) {
342+
throw new RejectedExecutionException();
343+
}
344+
if (state.compareAndSet(NONE, SUBMITTED)) {
345+
// Actually it could happen that the runnable was picked up in between but we not care to much and just
346+
// execute ourself. At worst this will be a NOOP when run() is called.
347+
try {
348+
executor.execute(this);
349+
} catch (Throwable e) {
350+
// Not reset the state as some other Runnable may be added to the queue already in the meantime.
351+
tasks.remove(command);
352+
PlatformDependent.throwException(e);
353+
}
354+
}
355+
}
356+
}
357+
}

0 commit comments

Comments
 (0)