Skip to content

Commit b523d74

Browse files
authored
[ISSUE#161]optimize the thread pool ThreadFactory implementation
1 parent ba6424f commit b523d74

File tree

4 files changed

+120
-32
lines changed

4 files changed

+120
-32
lines changed

src/main/java/io/openmessaging/storage/dledger/DLedgerRpcNettyService.java

Lines changed: 3 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@
4141
import java.util.concurrent.CompletableFuture;
4242
import java.util.concurrent.ExecutorService;
4343
import java.util.concurrent.Executors;
44-
import java.util.concurrent.ThreadFactory;
45-
import java.util.concurrent.atomic.AtomicInteger;
4644
import org.apache.rocketmq.remoting.ChannelEventListener;
4745
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
4846
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
@@ -69,32 +67,11 @@ public class DLedgerRpcNettyService extends DLedgerRpcService {
6967

7068
private DLedgerServer dLedgerServer;
7169

72-
private ExecutorService futureExecutor = Executors.newFixedThreadPool(4, new ThreadFactory() {
73-
private AtomicInteger threadIndex = new AtomicInteger(0);
70+
private ExecutorService futureExecutor = Executors.newFixedThreadPool(4, new NamedThreadFactory("FutureExecutor"));
7471

75-
@Override
76-
public Thread newThread(Runnable r) {
77-
return new Thread(r, "FutureExecutor_" + this.threadIndex.incrementAndGet());
78-
}
79-
});
80-
81-
private ExecutorService voteInvokeExecutor = Executors.newCachedThreadPool(new ThreadFactory() {
82-
private AtomicInteger threadIndex = new AtomicInteger(0);
83-
84-
@Override
85-
public Thread newThread(Runnable r) {
86-
return new Thread(r, "voteInvokeExecutor_" + this.threadIndex.incrementAndGet());
87-
}
88-
});
89-
90-
private ExecutorService heartBeatInvokeExecutor = Executors.newCachedThreadPool(new ThreadFactory() {
91-
private AtomicInteger threadIndex = new AtomicInteger(0);
72+
private ExecutorService voteInvokeExecutor = Executors.newCachedThreadPool(new NamedThreadFactory("voteInvokeExecutor"));
9273

93-
@Override
94-
public Thread newThread(Runnable r) {
95-
return new Thread(r, "heartBeatInvokeExecutor_" + this.threadIndex.incrementAndGet());
96-
}
97-
});
74+
private ExecutorService heartBeatInvokeExecutor = Executors.newCachedThreadPool(new NamedThreadFactory("heartBeatInvokeExecutor"));
9875

9976
public DLedgerRpcNettyService(DLedgerServer dLedgerServer) {
10077
this(dLedgerServer, null, null, null);

src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,7 @@ public DLedgerServer(DLedgerConfig dLedgerConfig, NettyServerConfig nettyServerC
101101
dLedgerRpcService = new DLedgerRpcNettyService(this, nettyServerConfig, nettyClientConfig, channelEventListener);
102102
dLedgerEntryPusher = new DLedgerEntryPusher(dLedgerConfig, memberState, dLedgerStore, dLedgerRpcService);
103103
dLedgerLeaderElector = new DLedgerLeaderElector(dLedgerConfig, memberState, dLedgerRpcService);
104-
executorService = Executors.newSingleThreadScheduledExecutor(r -> {
105-
Thread t = new Thread(r);
106-
t.setDaemon(true);
107-
t.setName("DLedgerServer-ScheduledExecutor");
108-
return t;
109-
});
104+
executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(null, "DLedgerServer-ScheduledExecutor", true));
110105
this.fsmCaller = Optional.empty();
111106
}
112107

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2017-2022 The DLedger Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.openmessaging.storage.dledger;
18+
19+
import java.util.concurrent.ThreadFactory;
20+
import java.util.concurrent.atomic.AtomicInteger;
21+
22+
public class NamedThreadFactory implements ThreadFactory {
23+
24+
private AtomicInteger threadIndex;
25+
26+
private final String threadNamePrefix;
27+
28+
private boolean isDemoThread;
29+
30+
public NamedThreadFactory(final String threadNamePrefix, boolean isDemoThread) {
31+
this(new AtomicInteger(0), threadNamePrefix, isDemoThread);
32+
}
33+
34+
public NamedThreadFactory(AtomicInteger threadIndex, final String threadNamePrefix, boolean isDemoThread) {
35+
this.threadIndex = threadIndex;
36+
this.threadNamePrefix = threadNamePrefix;
37+
this.isDemoThread = isDemoThread;
38+
}
39+
40+
public NamedThreadFactory(final String threadNamePrefix) {
41+
this(threadNamePrefix, false);
42+
}
43+
44+
/**
45+
* Constructs a new {@code Thread}. Implementations may also initialize priority, name, daemon status, {@code
46+
* ThreadGroup}, etc.
47+
*
48+
* @param r a runnable to be executed by new thread instance
49+
* @return constructed thread, or {@code null} if the request to create a thread is rejected
50+
*/
51+
@Override
52+
public Thread newThread(Runnable r) {
53+
54+
StringBuilder threadName = new StringBuilder(threadNamePrefix);
55+
if (null != threadIndex) {
56+
threadName.append("-").append(threadIndex.incrementAndGet());
57+
}
58+
Thread thread = new Thread(r, threadName.toString());
59+
if (isDemoThread) {
60+
thread.setDaemon(true);
61+
}
62+
return thread;
63+
}
64+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2017-2022 The DLedger Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.openmessaging.storage.dledger;
18+
19+
import java.util.concurrent.atomic.AtomicInteger;
20+
import org.junit.jupiter.api.Assertions;
21+
import org.junit.jupiter.api.Test;
22+
23+
public class NamedThreadFactoryTest {
24+
25+
@Test
26+
public void testNamedThreadFactory() {
27+
28+
NamedThreadFactory threadFactory = new NamedThreadFactory("DledgerThread");
29+
Runnable runnable = () -> {
30+
};
31+
Thread thread = threadFactory.newThread(runnable);
32+
Assertions.assertEquals("DledgerThread-1", thread.getName());
33+
Assertions.assertFalse(thread.isDaemon());
34+
35+
threadFactory = new NamedThreadFactory("DledgerThread", true);
36+
Thread thread1 = threadFactory.newThread(runnable);
37+
Assertions.assertEquals("DledgerThread-1", thread1.getName());
38+
Assertions.assertTrue(thread1.isDaemon());
39+
40+
threadFactory = new NamedThreadFactory(null, "DledgerThread", true);
41+
Thread thread2 = threadFactory.newThread(runnable);
42+
Assertions.assertEquals("DledgerThread", thread2.getName());
43+
Assertions.assertTrue(thread2.isDaemon());
44+
45+
threadFactory = new NamedThreadFactory(new AtomicInteger(0), "DledgerThread", true);
46+
Thread thread3 = threadFactory.newThread(runnable);
47+
Assertions.assertEquals("DledgerThread-1", thread3.getName());
48+
Assertions.assertTrue(thread2.isDaemon());
49+
50+
}
51+
52+
}

0 commit comments

Comments
 (0)