Skip to content

Commit 363c80e

Browse files
committed
add pub/sub batching
1 parent bf91347 commit 363c80e

File tree

10 files changed

+499
-94
lines changed

10 files changed

+499
-94
lines changed

batch.go

Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
package unitdb
2+
3+
import (
4+
"errors"
5+
"sync"
6+
"time"
7+
8+
"github.com/unit-io/unitdb-go/utp"
9+
)
10+
11+
const (
12+
defaultTimeout = 2 * time.Second
13+
defaultPoolCapacity = 27
14+
)
15+
16+
type (
17+
timeID int64
18+
batchOptions struct {
19+
batchDuration time.Duration
20+
batchCountThreshold int
21+
batchByteThreshold int
22+
poolCapacity int
23+
}
24+
batch struct {
25+
count int
26+
size int
27+
r *PublishResult
28+
msgs []*utp.PublishMessage
29+
30+
timeRefs []timeID
31+
}
32+
// batchGroup map[timeID]*batch
33+
batchManager struct {
34+
mu sync.RWMutex
35+
batchGroup map[timeID]*batch
36+
opts *batchOptions
37+
publishQueue chan *batch
38+
send chan *batch
39+
stop chan struct{}
40+
stopOnce sync.Once
41+
stopWg sync.WaitGroup
42+
}
43+
)
44+
45+
func (m *batchManager) newBatch(timeID timeID) *batch {
46+
b := &batch{
47+
r: &PublishResult{result: result{complete: make(chan struct{})}},
48+
msgs: make([]*utp.PublishMessage, 0),
49+
}
50+
m.batchGroup[timeID] = b
51+
52+
return b
53+
}
54+
55+
func (c *client) newBatchManager(opts *batchOptions) {
56+
if opts.poolCapacity == 0 {
57+
opts.poolCapacity = defaultPoolCapacity
58+
}
59+
m := &batchManager{
60+
opts: opts,
61+
batchGroup: make(map[timeID]*batch),
62+
publishQueue: make(chan *batch, 1),
63+
send: make(chan *batch, opts.poolCapacity),
64+
stop: make(chan struct{}),
65+
}
66+
67+
// timeID of next batch in queue
68+
m.newBatch(m.TimeID(0))
69+
70+
// start the publish loop
71+
go m.publishLoop(defaultBatchDuration)
72+
73+
// start the commit loop
74+
m.stopWg.Add(1)
75+
publishWaitTimeout := c.opts.writeTimeout
76+
if publishWaitTimeout == 0 {
77+
publishWaitTimeout = time.Second * 30
78+
}
79+
go m.publish(c, publishWaitTimeout)
80+
81+
// start the dispacther
82+
m.stopWg.Add(1)
83+
go m.dispatch(defaultTimeout)
84+
85+
c.batchManager = m
86+
}
87+
88+
// close tells dispatcher to exit, and wether or not complete queued jobs.
89+
func (m *batchManager) close() {
90+
m.stopOnce.Do(func() {
91+
// Close write queue and wait for currently running jobs to finish.
92+
close(m.stop)
93+
})
94+
m.stopWg.Wait()
95+
}
96+
97+
// add adds a publish message to a batch in the batch group.
98+
func (m *batchManager) add(delay int32, p *utp.PublishMessage) *PublishResult {
99+
m.mu.Lock()
100+
defer m.mu.Unlock()
101+
timeID := m.TimeID(delay)
102+
b, ok := m.batchGroup[timeID]
103+
if !ok {
104+
b = m.newBatch(timeID)
105+
}
106+
b.msgs = append(b.msgs, p)
107+
b.count++
108+
b.size += len(p.Payload)
109+
if b.count > m.opts.batchCountThreshold || b.size > m.opts.batchByteThreshold {
110+
m.push(b)
111+
delete(m.batchGroup, timeID)
112+
}
113+
return b.r
114+
}
115+
116+
// push enqueues a batch to publish.
117+
func (m *batchManager) push(b *batch) {
118+
if len(b.msgs) != 0 {
119+
m.publishQueue <- b
120+
}
121+
}
122+
123+
// publishLoop enqueue the publish batches.
124+
func (m *batchManager) publishLoop(interval time.Duration) {
125+
var publishC <-chan time.Time
126+
127+
if interval > 0 {
128+
publishTicker := time.NewTicker(interval)
129+
defer publishTicker.Stop()
130+
publishC = publishTicker.C
131+
}
132+
133+
for {
134+
select {
135+
case <-m.stop:
136+
timeNow := timeID(TimeNow().UnixNano())
137+
for timeID, batch := range m.batchGroup {
138+
if timeID < timeNow {
139+
m.mu.Lock()
140+
m.push(batch)
141+
delete(m.batchGroup, timeID)
142+
m.mu.Unlock()
143+
}
144+
}
145+
close(m.publishQueue)
146+
147+
return
148+
case <-publishC:
149+
timeNow := timeID(TimeNow().UnixNano())
150+
for timeID, batch := range m.batchGroup {
151+
if timeID < timeNow {
152+
m.mu.Lock()
153+
m.push(batch)
154+
delete(m.batchGroup, timeID)
155+
m.mu.Unlock()
156+
}
157+
}
158+
}
159+
}
160+
}
161+
162+
// dispatch handles publishing messages for the batches in queue.
163+
func (m *batchManager) dispatch(timeout time.Duration) {
164+
LOOP:
165+
for {
166+
select {
167+
case b, ok := <-m.publishQueue:
168+
if !ok {
169+
close(m.send)
170+
m.stopWg.Done()
171+
return
172+
}
173+
174+
select {
175+
case m.send <- b:
176+
default:
177+
// pool is full, let GC handle the batches
178+
goto WAIT
179+
}
180+
}
181+
}
182+
183+
WAIT:
184+
// Wait for a while
185+
time.Sleep(timeout)
186+
goto LOOP
187+
}
188+
189+
// publish publishes the messages.
190+
func (m *batchManager) publish(c *client, publishWaitTimeout time.Duration) {
191+
for {
192+
select {
193+
case <-m.stop:
194+
// run queued messges from the publish queue and
195+
// process it until queue is empty.
196+
for {
197+
select {
198+
case b, ok := <-m.send:
199+
if !ok {
200+
m.stopWg.Done()
201+
return
202+
}
203+
pub := &utp.Publish{Messages: b.msgs}
204+
mID := c.nextID(b.r)
205+
pub.MessageID = c.outboundID(mID)
206+
207+
// persist outbound
208+
c.storeOutbound(pub)
209+
210+
select {
211+
case c.send <- &PacketAndResult{p: pub, r: b.r}:
212+
case <-time.After(publishWaitTimeout):
213+
b.r.setError(errors.New("publish timeout error occurred"))
214+
}
215+
default:
216+
}
217+
}
218+
case b := <-m.send:
219+
if b != nil {
220+
pub := &utp.Publish{Messages: b.msgs}
221+
mID := c.nextID(b.r)
222+
pub.MessageID = c.outboundID(mID)
223+
224+
// persist outbound
225+
c.storeOutbound(pub)
226+
227+
select {
228+
case c.send <- &PacketAndResult{p: pub, r: b.r}:
229+
case <-time.After(publishWaitTimeout):
230+
b.r.setError(errors.New("publish timeout error occurred"))
231+
}
232+
}
233+
}
234+
}
235+
}
236+
237+
func (m *batchManager) TimeID(delay int32) timeID {
238+
return timeID(TimeNow().Add(m.opts.batchDuration + (time.Duration(delay) * time.Millisecond)).Truncate(m.opts.batchDuration).UnixNano())
239+
}

0 commit comments

Comments
 (0)