Skip to content

Commit 8a37160

Browse files
committed
滑动窗口算法
1 parent a646f87 commit 8a37160

File tree

4 files changed

+283
-0
lines changed

4 files changed

+283
-0
lines changed

pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,18 @@
124124
<version>1.16.22</version>
125125
</dependency>
126126

127+
<dependency>
128+
<groupId>org.apache.httpcomponents</groupId>
129+
<artifactId>httpclient</artifactId>
130+
<version>4.5.5</version>
131+
</dependency>
132+
133+
<dependency>
134+
<groupId>commons-io</groupId>
135+
<artifactId>commons-io</artifactId>
136+
<version>2.6</version>
137+
</dependency>
138+
127139
</dependencies>
128140

129141
<build>
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package code.algorithm.slidingwindow;
2+
3+
import java.io.Serializable;
4+
import java.util.concurrent.atomic.AtomicInteger;
5+
import java.util.concurrent.atomic.LongAdder;
6+
7+
/**
8+
* 〈滑动节点〉<p>
9+
* 〈功能详细描述〉
10+
*
11+
* @author zixiao
12+
* @date 2019/5/24
13+
*/
14+
public class SlidingNode implements Serializable{
15+
16+
/**
17+
* 周期秒数
18+
*/
19+
private int period;
20+
21+
/**
22+
* 总数量
23+
*/
24+
private AtomicInteger totalCount = new AtomicInteger();
25+
26+
/**
27+
* 总RT
28+
*/
29+
private LongAdder totalRt = new LongAdder();
30+
31+
public SlidingNode(int period) {
32+
this.period = period;
33+
}
34+
35+
public int getPeriod(){
36+
return period;
37+
}
38+
39+
public void addCount(int count){
40+
totalCount.addAndGet(count);
41+
}
42+
43+
public void addRt(long rt){
44+
totalRt.add(rt);
45+
}
46+
47+
public int totalCount(){
48+
return totalCount.get();
49+
}
50+
51+
public long totalRt(){
52+
return totalRt.longValue();
53+
}
54+
55+
public long avgRt(){
56+
return totalCount() == 0 ? 0 : totalRt() / totalCount();
57+
}
58+
59+
public int tps(){
60+
return totalCount() / getPeriod();
61+
}
62+
63+
/**
64+
* 重置计数
65+
*/
66+
public void reset(){
67+
totalCount.set(0);
68+
totalRt.reset();
69+
}
70+
71+
@Override
72+
public String toString() {
73+
return "SlidingNode{" +
74+
"period=" + period +
75+
", totalCount=" + totalCount() +
76+
", totalRt=" + totalRt() +
77+
'}';
78+
}
79+
80+
public String prettyPrint(){
81+
return String.format("TotalSec=%s, totalCount=%s, TPS=%s, totalRT=%s, avgRT=%s",
82+
getPeriod(), totalCount(), tps(), totalRt(), avgRt());
83+
}
84+
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package code.algorithm.slidingwindow;
2+
3+
import java.io.Closeable;
4+
import java.util.concurrent.ScheduledExecutorService;
5+
import java.util.concurrent.ScheduledThreadPoolExecutor;
6+
import java.util.concurrent.TimeUnit;
7+
8+
/**
9+
* 〈滑动窗口〉<p>
10+
* 〈功能详细描述〉
11+
*
12+
* @author zixiao
13+
* @date 2019/5/24
14+
*/
15+
public class SlidingWindow implements Closeable{
16+
17+
/**
18+
* 节点数据
19+
*/
20+
private SlidingNode[] nodes;
21+
22+
/**
23+
* slot数
24+
*/
25+
private int slotNum;
26+
27+
/**
28+
* 当前slot位置
29+
*/
30+
private int currentSlot = 0;
31+
32+
/**
33+
* 窗口大小
34+
*/
35+
private int windowSize;
36+
37+
/**
38+
* 单节点的周期(秒)
39+
*/
40+
private int nodePeriod;
41+
42+
private ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(1);
43+
44+
public SlidingWindow(int slotNum, int windowSize){
45+
this(slotNum, windowSize, 1);
46+
}
47+
48+
public SlidingWindow(int slotNum, int windowSize, int nodePeriod){
49+
this.slotNum = slotNum;
50+
this.windowSize = windowSize;
51+
this.nodePeriod = nodePeriod;
52+
53+
this.nodes = new SlidingNode[slotNum];
54+
this.scheduler = new ScheduledThreadPoolExecutor(1);
55+
56+
this.init();
57+
}
58+
59+
private void init(){
60+
for(int i=0; i<slotNum; i++){
61+
nodes[i] = new SlidingNode(nodePeriod);
62+
}
63+
64+
scheduler.scheduleAtFixedRate(new Runnable() {
65+
@Override
66+
public void run() {
67+
slidingToNext();
68+
}
69+
}, nodePeriod, nodePeriod, TimeUnit.SECONDS);
70+
}
71+
72+
/**
73+
* 滑动到下一个窗口
74+
*/
75+
private void slidingToNext(){
76+
int nextSlot = currentSlot;
77+
if (nextSlot >= (slotNum-1)) {
78+
nextSlot = 0;
79+
}else{
80+
nextSlot++;
81+
}
82+
//清除数据
83+
nodes[nextSlot].reset();
84+
currentSlot = nextSlot;
85+
}
86+
87+
/**
88+
* 上一个窗口
89+
* @return
90+
*/
91+
public SlidingNode lastWindow(){
92+
int slotIndex = currentSlot;
93+
SlidingNode total = new SlidingNode(windowSize * nodePeriod);
94+
for (int i=0; i< windowSize; i++){
95+
slotIndex = lastNodeIndex(slotIndex);
96+
total.addCount(nodes[slotIndex].totalCount());
97+
total.addRt(nodes[slotIndex].totalRt());
98+
}
99+
return total;
100+
}
101+
102+
/**
103+
* 上一个节点位置
104+
* @param slotIndex
105+
* @return
106+
*/
107+
private int lastNodeIndex(int slotIndex){
108+
return (slotIndex == 0) ? (slotNum-1) : (slotIndex - 1);
109+
}
110+
111+
@Override
112+
public void close(){
113+
scheduler.shutdownNow();
114+
}
115+
116+
public SlidingNode current(){
117+
return nodes[currentSlot];
118+
}
119+
120+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package code.algorithm.slidingwindow;
2+
3+
import java.util.Random;
4+
import java.util.concurrent.*;
5+
6+
/**
7+
* 〈一句话功能简述〉<p>
8+
* 〈功能详细描述〉
9+
*
10+
* @author zixiao
11+
* @date 2019/5/24
12+
*/
13+
public class SlidingWindowTest {
14+
15+
public static void main(String[] args) throws InterruptedException {
16+
int windowSize = 5;
17+
SlidingWindow slidingWindow = new SlidingWindow(60, windowSize);
18+
ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(1);
19+
scheduler.scheduleAtFixedRate(new Runnable() {
20+
@Override
21+
public void run() {
22+
SlidingNode node = slidingWindow.lastWindow();
23+
System.out.println(node.prettyPrint());
24+
}
25+
}, windowSize, 3, TimeUnit.SECONDS);
26+
27+
Thread t = new Thread(new Runnable() {
28+
@Override
29+
public void run() {
30+
for(int i=0; i<2000; i++){
31+
int rt = new Random().nextInt(21);
32+
slidingWindow.current().addCount(1);
33+
slidingWindow.current().addRt(rt);
34+
try {
35+
Thread.sleep(rt);
36+
} catch (InterruptedException e) {
37+
e.printStackTrace();
38+
}
39+
}
40+
}
41+
});
42+
t.setDaemon(true);
43+
t.start();
44+
45+
Thread t2 = new Thread(new Runnable() {
46+
@Override
47+
public void run() {
48+
for(int i=0; i<2000; i++){
49+
int rt = new Random().nextInt(21);
50+
slidingWindow.current().addCount(1);
51+
slidingWindow.current().addRt(rt);
52+
try {
53+
Thread.sleep(rt);
54+
} catch (InterruptedException e) {
55+
e.printStackTrace();
56+
}
57+
}
58+
}
59+
});
60+
t2.setDaemon(true);
61+
t2.start();
62+
63+
Thread.sleep(30000);
64+
scheduler.shutdownNow();
65+
slidingWindow.close();
66+
}
67+
}

0 commit comments

Comments
 (0)