Skip to content

Commit 96fe877

Browse files
committed
[server] update utp encoding and restructure code
1 parent 060c3ec commit 96fe877

File tree

21 files changed

+942
-837
lines changed

21 files changed

+942
-837
lines changed

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,5 @@ require (
1414
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4
1515
google.golang.org/grpc v1.39.0
1616
)
17+
18+
replace github.com/unit-io/unitdb-go => /src/github.com/unit-io/unitdb-go

server/internal/batch.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ import (
44
"sync"
55
"time"
66

7-
lp "github.com/unit-io/unitdb/server/internal/net"
87
"github.com/unit-io/unitdb/server/internal/store"
8+
"github.com/unit-io/unitdb/server/utp"
99
)
1010

1111
const (
@@ -29,7 +29,7 @@ type (
2929
batch struct {
3030
count int
3131
size int
32-
msgs []*lp.PublishMessage
32+
msgs []*utp.PublishMessage
3333
}
3434
batchManager struct {
3535
mu sync.RWMutex
@@ -45,7 +45,7 @@ type (
4545

4646
func (m *batchManager) newBatch(timeID timeID) *batch {
4747
b := &batch{
48-
msgs: make([]*lp.PublishMessage, 0),
48+
msgs: make([]*utp.PublishMessage, 0),
4949
}
5050
m.batchGroup[timeID] = b
5151

@@ -90,7 +90,7 @@ func (m *batchManager) close() {
9090
}
9191

9292
// add adds a publish message to a batch in the batch group.
93-
func (m *batchManager) add(delay int32, p *lp.PublishMessage) {
93+
func (m *batchManager) add(delay int32, p *utp.PublishMessage) {
9494
m.mu.Lock()
9595
defer m.mu.Unlock()
9696
timeID := m.TimeID(delay)
@@ -188,11 +188,11 @@ func (m *batchManager) publish(c *_Conn, publishWaitTimeout time.Duration) {
188188
m.stopWg.Done()
189189
return
190190
}
191-
pub := &lp.Publish{Messages: b.msgs}
192-
pub.MessageID = c.MessageIds.NextID(lp.PUBLISH.Value())
191+
pub := &utp.Publish{Messages: b.msgs}
192+
pub.MessageID = uint16(c.MessageIds.NextID(utp.PUBLISH))
193193

194194
// persist outbound
195-
store.Log.PersistOutbound(c.adp, uint32(c.connID), pub)
195+
store.Log.PersistOutbound(uint32(c.connID), pub)
196196

197197
select {
198198
case c.pub <- pub:
@@ -201,11 +201,11 @@ func (m *batchManager) publish(c *_Conn, publishWaitTimeout time.Duration) {
201201
}
202202
case b := <-m.send:
203203
if b != nil {
204-
pub := &lp.Publish{Messages: b.msgs}
205-
pub.MessageID = c.MessageIds.NextID(lp.PUBLISH.Value())
204+
pub := &utp.Publish{Messages: b.msgs}
205+
pub.MessageID = uint16(c.MessageIds.NextID(utp.PUBLISH))
206206

207207
// persist outbound
208-
store.Log.PersistOutbound(c.adp, uint32(c.connID), pub)
208+
store.Log.PersistOutbound(uint32(c.connID), pub)
209209
select {
210210
case c.pub <- pub:
211211
case <-time.After(publishWaitTimeout):

server/internal/cluster.go

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
rh "github.com/unit-io/unitdb/server/internal/pkg/hash"
3535
"github.com/unit-io/unitdb/server/internal/pkg/log"
3636
"github.com/unit-io/unitdb/server/internal/pkg/uid"
37+
"github.com/unit-io/unitdb/server/utp"
3738
)
3839

3940
const (
@@ -103,9 +104,9 @@ type ClusterReq struct {
103104
// Cluster is desynchronized.
104105
Signature string
105106

106-
MsgSub *lp.Subscribe
107-
MsgPub *lp.Publish
108-
MsgUnsub *lp.Unsubscribe
107+
MsgSub *utp.Subscribe
108+
MsgPub *utp.Publish
109+
MsgUnsub *utp.Unsubscribe
109110
Topic *security.Topic
110111
Type uint8
111112
Message *message.Message
@@ -119,9 +120,9 @@ type ClusterReq struct {
119120
// ClusterResp is a Master to Proxy response message.
120121
type ClusterResp struct {
121122
Type uint8
122-
MsgSub *lp.Subscribe
123-
MsgPub *lp.Publish
124-
MsgUnsub *lp.Unsubscribe
123+
MsgSub *utp.Subscribe
124+
MsgPub *utp.Publish
125+
MsgUnsub *utp.Unsubscribe
125126
Msg []byte
126127
Topic *security.Topic
127128
Message *message.Message
@@ -310,7 +311,6 @@ func (c *Cluster) Master(msg *ClusterReq, rejected *bool) error {
310311
go conn.rpcWriteLoop()
311312
}
312313
// Update session params which may have changed since the last call.
313-
conn.proto = msg.Conn.Proto
314314
conn.connID = msg.Conn.ConnID
315315
conn.clientID = msg.Conn.ClientID
316316

@@ -373,7 +373,7 @@ func (c *Cluster) isRemoteContract(contract string) bool {
373373
}
374374

375375
// Forward client message to the Master (cluster node which owns the topic)
376-
func (c *Cluster) routeToContract(msg lp.LineProtocol, topic *security.Topic, msgType uint8, m *message.Message, conn *_Conn) error {
376+
func (c *Cluster) routeToContract(msg lp.MessagePack, topic *security.Topic, msgType uint8, m *message.Message, conn *_Conn) error {
377377
// Find the cluster node which owns the topic, then forward to it.
378378
n := c.nodeForContract(fmt.Sprint(conn.clientID.Contract()))
379379
if n == nil {
@@ -387,18 +387,18 @@ func (c *Cluster) routeToContract(msg lp.LineProtocol, topic *security.Topic, ms
387387
conn.nodes[n.name] = true
388388

389389
// var msgSub,msgPub,msgUnsub lp.Packet
390-
var msgSub *lp.Subscribe
391-
var msgPub *lp.Publish
392-
var msgUnsub *lp.Unsubscribe
390+
var msgSub *utp.Subscribe
391+
var msgPub *utp.Publish
392+
var msgUnsub *utp.Unsubscribe
393393
switch msgType {
394394
case message.SUBSCRIBE:
395-
msgSub = msg.(*lp.Subscribe)
395+
msgSub = msg.(*utp.Subscribe)
396396
msgSub.IsForwarded = true
397397
case message.UNSUBSCRIBE:
398-
msgUnsub = msg.(*lp.Unsubscribe)
398+
msgUnsub = msg.(*utp.Unsubscribe)
399399
msgUnsub.IsForwarded = true
400400
case message.PUBLISH:
401-
msgPub = msg.(*lp.Publish)
401+
msgPub = msg.(*utp.Publish)
402402
msgPub.IsForwarded = true
403403
}
404404
return n.forward(
@@ -413,7 +413,6 @@ func (c *Cluster) routeToContract(msg lp.LineProtocol, topic *security.Topic, ms
413413
Message: m,
414414
Conn: &ClusterSess{
415415
//RemoteAddr: conn.(),
416-
Proto: conn.proto,
417416
ConnID: conn.connID,
418417
SessID: conn.sessID,
419418
ClientID: conn.clientID}})
@@ -471,9 +470,9 @@ func ClusterInit(configString json.RawMessage, self *string) int {
471470

472471
gob.Register([]interface{}{})
473472
gob.Register(map[string]interface{}{})
474-
gob.Register(lp.Publish{})
475-
gob.Register(lp.Subscribe{})
476-
gob.Register(lp.Unsubscribe{})
473+
gob.Register(utp.Publish{})
474+
gob.Register(utp.Subscribe{})
475+
gob.Register(utp.Unsubscribe{})
477476

478477
Globals.Cluster = &Cluster{
479478
thisNodeName: thisName,
@@ -530,10 +529,7 @@ func (c *_Conn) rpcWriteLoop() {
530529
// channel closed
531530
return
532531
}
533-
if c.adp == nil {
534-
return
535-
}
536-
m, err := lp.Encode(c.adp, msg)
532+
m, err := lp.Encode(msg)
537533
if err != nil {
538534
log.Error("conn.writeRpc", err.Error())
539535
return

0 commit comments

Comments
 (0)