Skip to content

Commit 20754d6

Browse files
committed
Add test case for ots writer.
1 parent 136ea1e commit 20754d6

File tree

5 files changed

+1148
-0
lines changed

5 files changed

+1148
-0
lines changed
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
package com.aliyun.openservices.ots.writer;
2+
3+
import com.lmax.disruptor.*;
4+
import com.lmax.disruptor.dsl.Disruptor;
5+
import com.lmax.disruptor.dsl.ProducerType;
6+
7+
import java.util.ArrayList;
8+
import java.util.List;
9+
import java.util.concurrent.Executor;
10+
import java.util.concurrent.Executors;
11+
import java.util.concurrent.atomic.AtomicInteger;
12+
import java.util.concurrent.atomic.AtomicLong;
13+
14+
public class MultiPMultiC {
15+
private static int threadCount = 1;
16+
private static int consumerCount = 1;
17+
private static int queueSize = 4096;
18+
19+
public static class LongEvent {
20+
private long value;
21+
22+
public void set(long value) {
23+
this.value = value;
24+
}
25+
}
26+
27+
public static class LongEventFactory implements EventFactory<LongEvent> {
28+
public LongEvent newInstance() {
29+
return new LongEvent();
30+
}
31+
}
32+
33+
public static class LongEventProducer {
34+
public AtomicInteger count = new AtomicInteger();
35+
private final RingBuffer<LongEvent> ringBuffer;
36+
37+
public LongEventProducer(RingBuffer<LongEvent> ringBuffer)
38+
{
39+
this.ringBuffer = ringBuffer;
40+
}
41+
42+
public void tryPut(long value) {
43+
while (true) {
44+
try {
45+
long sequence = ringBuffer.tryNext();
46+
LongEvent event = ringBuffer.get(sequence);
47+
event.set(value);
48+
ringBuffer.publish(sequence);
49+
return;
50+
} catch (InsufficientCapacityException e) {
51+
try {
52+
Thread.sleep(1);
53+
} catch (InterruptedException exp) {
54+
}
55+
}
56+
}
57+
}
58+
59+
public void busyPut(long value) {
60+
long sequence = ringBuffer.next();
61+
try {
62+
LongEvent event = ringBuffer.get(sequence);
63+
event.set(value);
64+
} finally {
65+
ringBuffer.publish(sequence);
66+
}
67+
}
68+
69+
public void onData(long value) {
70+
count.incrementAndGet();
71+
tryPut(value);
72+
}
73+
}
74+
75+
public static class LongEventHandler implements EventHandler<LongEvent> {
76+
private int ordinal;
77+
private int id;
78+
79+
public LongEventHandler(int ordinal, int id) {
80+
this.ordinal = ordinal;
81+
this.id = id;
82+
}
83+
84+
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {
85+
86+
}
87+
}
88+
89+
public static class WorkerHandler implements WorkHandler<LongEvent> {
90+
91+
@Override
92+
public void onEvent(LongEvent longEvent) throws Exception {
93+
//Thread.sleep(1000);
94+
//System.out.println(this + ":" + longEvent.value);
95+
}
96+
}
97+
98+
public static void main(String[] args) throws Exception {
99+
if (args.length > 0) {
100+
threadCount = Integer.parseInt(args[0]);
101+
consumerCount = Integer.parseInt(args[1]);
102+
}
103+
System.out.println("ThreadCount: " + threadCount);
104+
System.out.println("ConsumerCount: " + consumerCount);
105+
106+
Executor executor = Executors.newCachedThreadPool();
107+
// The factory for the event
108+
LongEventFactory factory = new LongEventFactory();
109+
110+
// Specify the size of the ring buffer, must be power of 2.
111+
int bufferSize = queueSize;
112+
113+
// Construct the Disruptor
114+
//Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, executor, ProducerType.MULTI, new BusySpinWaitStrategy());
115+
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, executor);
116+
117+
final RingBuffer ringBuffer = disruptor.getRingBuffer();
118+
EventHandler[] handlers = new EventHandler[consumerCount];
119+
for (int i = 0; i < consumerCount; i++) {
120+
handlers[i] = new LongEventHandler(consumerCount, i);
121+
}
122+
disruptor.handleEventsWith(handlers);
123+
124+
disruptor.start();
125+
126+
127+
final List<LongEventProducer> ps = new ArrayList<LongEventProducer>();
128+
List<Thread> ts = new ArrayList<Thread>();
129+
for (int i = 0; i < threadCount; i++) {
130+
final LongEventProducer producer = new LongEventProducer(ringBuffer);
131+
ps.add(producer);
132+
Thread th = new Thread(new Runnable() {
133+
@Override
134+
public void run() {
135+
while (true) {
136+
//System.out.println("Put data: " + s);
137+
producer.onData(0);
138+
}
139+
}
140+
});
141+
ts.add(th);
142+
}
143+
144+
Thread qps = new Thread(new Runnable() {
145+
@Override
146+
public void run() {
147+
while (true) {
148+
try {
149+
Thread.sleep(1000);
150+
} catch (InterruptedException e) {
151+
e.printStackTrace();
152+
}
153+
154+
long totalCount = 0;
155+
for (LongEventProducer p : ps) {
156+
totalCount += p.count.getAndSet(0);
157+
}
158+
System.out.println(totalCount);
159+
}
160+
}
161+
});
162+
163+
qps.start();
164+
165+
for (Thread th : ts) {
166+
th.start();
167+
}
168+
169+
qps.join();
170+
}
171+
}
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
package com.aliyun.openservices.ots.writer;
2+
3+
import com.lmax.disruptor.*;
4+
import com.lmax.disruptor.dsl.Disruptor;
5+
6+
import java.util.ArrayList;
7+
import java.util.List;
8+
import java.util.concurrent.Executor;
9+
import java.util.concurrent.Executors;
10+
import java.util.concurrent.atomic.AtomicInteger;
11+
12+
public class OnePOneC {
13+
private static int queueSize = 4096;
14+
private static int count = 1;
15+
16+
public static class LongEvent {
17+
private long value;
18+
19+
public void set(long value) {
20+
this.value = value;
21+
}
22+
}
23+
24+
public static class LongEventFactory implements EventFactory<LongEvent> {
25+
public LongEvent newInstance() {
26+
return new LongEvent();
27+
}
28+
}
29+
30+
public static class LongEventProducer {
31+
public AtomicInteger count = new AtomicInteger();
32+
private final RingBuffer<LongEvent> ringBuffer;
33+
34+
public LongEventProducer(RingBuffer<LongEvent> ringBuffer)
35+
{
36+
this.ringBuffer = ringBuffer;
37+
}
38+
39+
public void tryPut(long value) {
40+
while (true) {
41+
try {
42+
long sequence = ringBuffer.tryNext();
43+
LongEvent event = ringBuffer.get(sequence);
44+
event.set(value);
45+
ringBuffer.publish(sequence);
46+
return;
47+
} catch (InsufficientCapacityException e) {
48+
try {
49+
Thread.sleep(1);
50+
} catch (InterruptedException exp) {
51+
}
52+
}
53+
}
54+
}
55+
56+
public void busyPut(long value) {
57+
long sequence = ringBuffer.next();
58+
try {
59+
LongEvent event = ringBuffer.get(sequence);
60+
event.set(value);
61+
} finally {
62+
ringBuffer.publish(sequence);
63+
}
64+
}
65+
66+
public void onData(long value) {
67+
count.incrementAndGet();
68+
tryPut(value);
69+
}
70+
}
71+
72+
public static class LongEventHandler implements EventHandler<LongEvent> {
73+
private int ordinal;
74+
private int id;
75+
76+
public LongEventHandler(int ordinal, int id) {
77+
this.ordinal = ordinal;
78+
this.id = id;
79+
}
80+
81+
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {
82+
83+
}
84+
}
85+
86+
public static class WorkThread implements Runnable {
87+
88+
public LongEventProducer producer;
89+
90+
@Override
91+
public void run() {
92+
Executor executor = Executors.newCachedThreadPool();
93+
// The factory for the event
94+
LongEventFactory factory = new LongEventFactory();
95+
96+
// Specify the size of the ring buffer, must be power of 2.
97+
int bufferSize = queueSize;
98+
99+
// Construct the Disruptor
100+
//Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, executor, ProducerType.MULTI, new BusySpinWaitStrategy());
101+
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, executor);
102+
103+
final RingBuffer ringBuffer = disruptor.getRingBuffer();
104+
disruptor.handleEventsWith(new LongEventHandler(0, 0));
105+
106+
disruptor.start();
107+
108+
final List<LongEventProducer> ps = new ArrayList<LongEventProducer>();
109+
List<Thread> ts = new ArrayList<Thread>();
110+
for (int i = 0; i < 1; i++) {
111+
producer = new LongEventProducer(ringBuffer);
112+
ps.add(producer);
113+
Thread th = new Thread(new Runnable() {
114+
@Override
115+
public void run() {
116+
while (true) {
117+
//System.out.println("Put data: " + s);
118+
producer.onData(0);
119+
}
120+
}
121+
});
122+
ts.add(th);
123+
}
124+
125+
for (Thread th : ts) {
126+
th.start();
127+
}
128+
129+
for (Thread th : ts) {
130+
try {
131+
th.join();
132+
} catch (InterruptedException e) {
133+
e.printStackTrace();
134+
}
135+
}
136+
}
137+
}
138+
139+
public static void main(String[] args) throws Exception {
140+
if (args.length > 0) {
141+
count = Integer.parseInt(args[0]);
142+
}
143+
System.out.println("ThreadCount: " + count);
144+
145+
final WorkThread[] wts = new WorkThread[count];
146+
final List<Thread> ts = new ArrayList<Thread>();
147+
148+
for (int i = 0; i < count; i++) {
149+
wts[i] = new WorkThread();
150+
Thread th = new Thread(wts[i]);
151+
ts.add(th);
152+
}
153+
154+
for (Thread t : ts) {
155+
t.start();
156+
}
157+
158+
Thread qps = new Thread(new Runnable() {
159+
@Override
160+
public void run() {
161+
while (true) {
162+
try {
163+
Thread.sleep(1000);
164+
} catch (InterruptedException e) {
165+
e.printStackTrace();
166+
}
167+
168+
long totalCount = 0;
169+
for (WorkThread p : wts) {
170+
totalCount += p.producer.count.getAndSet(0);
171+
}
172+
System.out.println(totalCount);
173+
}
174+
}
175+
});
176+
177+
qps.start();
178+
qps.join();
179+
}
180+
}

0 commit comments

Comments
 (0)