Skip to content

Commit 31d6a76

Browse files
committed
update uTP protocol
1 parent 363c80e commit 31d6a76

File tree

17 files changed

+558
-559
lines changed

17 files changed

+558
-559
lines changed

batch.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@ func (c *client) newBatchManager(opts *batchOptions) {
8787

8888
// close tells dispatcher to exit, and wether or not complete queued jobs.
8989
func (m *batchManager) close() {
90+
if m == nil {
91+
return
92+
}
9093
m.stopOnce.Do(func() {
9194
// Close write queue and wait for currently running jobs to finish.
9295
close(m.stop)
@@ -200,15 +203,15 @@ func (m *batchManager) publish(c *client, publishWaitTimeout time.Duration) {
200203
m.stopWg.Done()
201204
return
202205
}
203-
pub := &utp.Publish{Messages: b.msgs}
206+
pub := &utp.Publish{DeliveryMode: 2, Messages: b.msgs}
204207
mID := c.nextID(b.r)
205208
pub.MessageID = c.outboundID(mID)
206209

207210
// persist outbound
208211
c.storeOutbound(pub)
209212

210213
select {
211-
case c.send <- &PacketAndResult{p: pub, r: b.r}:
214+
case c.send <- &MessageAndResult{m: pub, r: b.r}:
212215
case <-time.After(publishWaitTimeout):
213216
b.r.setError(errors.New("publish timeout error occurred"))
214217
}
@@ -217,15 +220,15 @@ func (m *batchManager) publish(c *client, publishWaitTimeout time.Duration) {
217220
}
218221
case b := <-m.send:
219222
if b != nil {
220-
pub := &utp.Publish{Messages: b.msgs}
223+
pub := &utp.Publish{DeliveryMode: 2, Messages: b.msgs}
221224
mID := c.nextID(b.r)
222225
pub.MessageID = c.outboundID(mID)
223226

224227
// persist outbound
225228
c.storeOutbound(pub)
226229

227230
select {
228-
case c.send <- &PacketAndResult{p: pub, r: b.r}:
231+
case c.send <- &MessageAndResult{m: pub, r: b.r}:
229232
case <-time.After(publishWaitTimeout):
230233
b.r.setError(errors.New("publish timeout error occurred"))
231234
}

0 commit comments

Comments
 (0)