Skip to content

Commit ad4ee66

Browse files
committed
update utp encoding
1 parent eebe427 commit ad4ee66

File tree

15 files changed

+134
-654
lines changed

15 files changed

+134
-654
lines changed

batch.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"sync"
66
"time"
77

8-
"github.com/unit-io/unitdb-go/internal/utp"
8+
"github.com/unit-io/unitdb/server/utp"
99
)
1010

1111
const (

client.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,11 @@ import (
1111
"time"
1212

1313
"github.com/golang/protobuf/proto"
14+
lp "github.com/unit-io/unitdb-go/internal/net"
1415
"github.com/unit-io/unitdb-go/internal/store"
15-
"github.com/unit-io/unitdb-go/internal/utp"
1616
"github.com/unit-io/unitdb/server/common"
1717
pbx "github.com/unit-io/unitdb/server/proto"
18+
"github.com/unit-io/unitdb/server/utp"
1819
"google.golang.org/grpc"
1920

2021
// Database store
@@ -58,7 +59,7 @@ type client struct {
5859
epoch uint32 // The session ID of the connection.
5960
conn net.Conn // the network connection
6061
send chan *MessageAndResult
61-
recv chan utp.Message
62+
recv chan lp.MessagePack
6263
pub chan *utp.Publish
6364
callbacks map[uint64]MessageHandler
6465

@@ -84,7 +85,7 @@ func NewClient(target, clientID string, opts ...Options) (Client, error) {
8485
cancel: cancel,
8586
messageIds: messageIds{index: make(map[MID]Result)},
8687
send: make(chan *MessageAndResult, 1), // buffered
87-
recv: make(chan utp.Message),
88+
recv: make(chan lp.MessagePack),
8889
pub: make(chan *utp.Publish),
8990
callbacks: make(map[uint64]MessageHandler),
9091
// close
@@ -543,12 +544,12 @@ func TimeNow() time.Time {
543544
return time.Now().UTC().Round(time.Millisecond)
544545
}
545546

546-
func (c *client) inboundID(id int32) MID {
547-
return MID(c.connID - id)
547+
func (c *client) inboundID(id uint16) MID {
548+
return MID(c.connID - int32(id))
548549
}
549550

550-
func (c *client) outboundID(mid MID) (id int32) {
551-
return c.connID - (int32(mid))
551+
func (c *client) outboundID(mid MID) (id uint16) {
552+
return uint16(c.connID - (int32(mid)))
552553
}
553554

554555
func (c *client) updateLastAction() {
@@ -561,11 +562,11 @@ func (c *client) updateLastTouched() {
561562
c.lastTouched.Store(TimeNow())
562563
}
563564

564-
func (c *client) storeInbound(m utp.Message) {
565+
func (c *client) storeInbound(m lp.MessagePack) {
565566
store.Log.PersistInbound(uint32(c.sessID), m)
566567
}
567568

568-
func (c *client) storeOutbound(m utp.Message) {
569+
func (c *client) storeOutbound(m lp.MessagePack) {
569570
store.Log.PersistOutbound(uint32(c.sessID), m)
570571
}
571572

hdl_conn.go

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,15 @@ import (
99
"net"
1010
"time"
1111

12-
"github.com/unit-io/unitdb-go/internal/utp"
12+
lp "github.com/unit-io/unitdb-go/internal/net"
13+
"github.com/unit-io/unitdb/server/utp"
1314
)
1415

1516
// Connect takes a connected net.Conn and performs the initial handshake. Paramaters are:
1617
// conn - Connected net.Conn
1718
// cm - Connect Message
18-
func Connect(conn net.Conn, cm *utp.Connect) (rc int32, epoch int32, cid int32, err error) {
19-
m, err := utp.Encode(cm)
19+
func Connect(conn net.Conn, cm *utp.Connect) (rc uint8, epoch int32, cid int32, err error) {
20+
m, err := lp.Encode(cm)
2021
if err != nil {
2122
fmt.Println(err)
2223
return utp.ErrRefusedServerUnavailable, 0, 0, err
@@ -31,21 +32,24 @@ func Connect(conn net.Conn, cm *utp.Connect) (rc int32, epoch int32, cid int32,
3132
// when the connection is first started.
3233
// This prevents receiving incoming data while resume
3334
// is in progress if clean session is false.
34-
func verifyCONNACK(conn net.Conn) (int32, int32, int32, error) {
35-
ca, err := utp.Read(conn)
35+
func verifyCONNACK(conn net.Conn) (uint8, int32, int32, error) {
36+
ca, err := lp.Read(conn)
3637
if err != nil {
3738
return utp.ErrRefusedServerUnavailable, 0, 0, err
3839
}
3940
if ca == nil {
40-
return utp.ErrRefusedServerUnavailable, 0, 0, errors.New("nil Connect Acknowledge Message")
41+
return utp.ErrRefusedServerUnavailable, 0, 0, errors.New("nil connect acknowledge message")
4142
}
4243

43-
msg, ok := ca.(*utp.ConnectAcknowledge)
44+
pack, ok := ca.(*utp.ControlMessage)
4445
if !ok {
4546
return utp.ErrRefusedServerUnavailable, 0, 0, errors.New("first message must be connect acknowledge message")
4647
}
4748

48-
return msg.ReturnCode, msg.Epoch, msg.ConnID, nil
49+
connack := &utp.ConnectAcknowledge{}
50+
connack.FromBinary(utp.FixedHeader{MessageType: utp.CONNECT, FlowControl: utp.ACKNOWLEDGE}, pack.Message)
51+
52+
return connack.ReturnCode, connack.Epoch, connack.ConnID, nil
4953
}
5054

5155
// Handle handles incoming messages
@@ -67,7 +71,7 @@ func (c *client) readLoop(ctx context.Context) error {
6771
return nil
6872
default:
6973
// Unpack an incoming Message
70-
msg, err := utp.Read(reader)
74+
msg, err := lp.Read(reader)
7175
if err != nil {
7276
return err
7377
}
@@ -84,7 +88,7 @@ func (c *client) readLoop(ctx context.Context) error {
8488
}
8589

8690
// handle handles inbound messages.
87-
func (c *client) handler(inMsg utp.Message) error {
91+
func (c *client) handler(inMsg lp.MessagePack) error {
8892
c.updateLastAction()
8993

9094
switch inMsg.Type() {
@@ -143,7 +147,7 @@ func (c *client) writeLoop(ctx context.Context) {
143147
mId := c.inboundID(msg.MessageID)
144148
c.freeID(mId)
145149
}
146-
m, err := utp.Encode(outMsg.m)
150+
m, err := lp.Encode(outMsg.m)
147151
if err != nil {
148152
fmt.Println(err)
149153
// return
@@ -183,7 +187,7 @@ func (c *client) dispatcher(ctx context.Context) {
183187
// keepalive - Send ping when connection unused for set period
184188
// connection passed in to avoid race condition on shutdown
185189
func (c *client) keepalive(ctx context.Context) {
186-
var pingInterval int64
190+
var pingInterval int32
187191
var pingSent time.Time
188192

189193
if c.opts.keepAlive > 10 {
@@ -192,7 +196,7 @@ func (c *client) keepalive(ctx context.Context) {
192196
pingInterval = c.opts.keepAlive / 2
193197
}
194198

195-
pingTicker := time.NewTicker(time.Duration(pingInterval * int64(time.Second)))
199+
pingTicker := time.NewTicker(time.Duration(pingInterval * int32(time.Second)))
196200
defer func() {
197201
pingTicker.Stop()
198202
}()
@@ -206,12 +210,12 @@ func (c *client) keepalive(ctx context.Context) {
206210
case <-pingTicker.C:
207211
lastAction := c.lastAction.Load().(time.Time)
208212
lastTouched := c.lastTouched.Load().(time.Time)
209-
live := TimeNow().Add(-time.Duration(c.opts.keepAlive * int64(time.Second)))
213+
live := TimeNow().Add(-time.Duration(c.opts.keepAlive * int32(time.Second)))
210214
timeout := TimeNow().Add(-c.opts.pingTimeout)
211215

212216
if lastAction.After(live) || lastTouched.After(live) {
213217
ping := &utp.Pingreq{}
214-
m, err := utp.Encode(ping)
218+
m, err := lp.Encode(ping)
215219
if err != nil {
216220
fmt.Println(err)
217221
}

internal/net/message.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package net
2+
3+
import (
4+
"bytes"
5+
"fmt"
6+
"io"
7+
8+
"github.com/unit-io/unitdb/server/utp"
9+
)
10+
11+
// MessagePack is the interface for all Messages
12+
type MessagePack interface {
13+
Type() utp.MessageType
14+
Info() utp.Info
15+
}
16+
17+
// Read unpacks the packet from the provided reader.
18+
func Read(r io.Reader) (MessagePack, error) {
19+
var fh utp.FixedHeader
20+
fh.FromBinary(r)
21+
22+
// Check for empty packets
23+
switch fh.MessageType {
24+
case utp.DISCONNECT:
25+
return &utp.Disconnect{}, nil
26+
}
27+
28+
rawMsg := make([]byte, fh.MessageLength)
29+
_, err := io.ReadFull(r, rawMsg)
30+
if err != nil {
31+
return nil, err
32+
}
33+
34+
// unpack the body
35+
if fh.FlowControl != utp.NONE {
36+
pack := &utp.ControlMessage{}
37+
pack.FromBinary(fh, rawMsg)
38+
return pack, nil
39+
}
40+
switch fh.MessageType {
41+
case utp.PUBLISH:
42+
pack := &utp.Publish{}
43+
pack.FromBinary(fh, rawMsg)
44+
return pack, nil
45+
default:
46+
return nil, fmt.Errorf("message::Read: Invalid zero-length packet type %d", fh.MessageType)
47+
}
48+
}
49+
50+
// Encode encodes the message into binary data
51+
func Encode(pack MessagePack) (bytes.Buffer, error) {
52+
switch pack.Type() {
53+
case utp.PINGREQ:
54+
return pack.(*utp.Pingreq).ToBinary()
55+
case utp.CONNECT:
56+
return pack.(*utp.Connect).ToBinary()
57+
case utp.DISCONNECT:
58+
return pack.(*utp.Disconnect).ToBinary()
59+
case utp.RELAY:
60+
return pack.(*utp.Relay).ToBinary()
61+
case utp.SUBSCRIBE:
62+
return pack.(*utp.Subscribe).ToBinary()
63+
case utp.UNSUBSCRIBE:
64+
return pack.(*utp.Unsubscribe).ToBinary()
65+
case utp.PUBLISH:
66+
return pack.(*utp.Publish).ToBinary()
67+
case utp.FLOWCONTROL:
68+
return pack.(*utp.ControlMessage).ToBinary()
69+
default:
70+
return bytes.Buffer{}, fmt.Errorf("message::Encode: Invalid zero-length packet type %d", pack.Type())
71+
}
72+
}

internal/store/store.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ import (
66
"fmt"
77

88
adapter "github.com/unit-io/unitdb-go/internal/db"
9-
"github.com/unit-io/unitdb-go/internal/utp"
9+
lp "github.com/unit-io/unitdb-go/internal/net"
10+
"github.com/unit-io/unitdb/server/utp"
1011
)
1112

1213
var adp adapter.Adapter
@@ -95,13 +96,13 @@ type MessageLog struct{}
9596
var Log MessageLog
9697

9798
// handle which outgoing messages are stored
98-
func (l *MessageLog) PersistOutbound(blockID uint32, outMsg utp.Message) {
99+
func (l *MessageLog) PersistOutbound(blockID uint32, outMsg lp.MessagePack) {
99100
switch outMsg.(type) {
100101
case *utp.Publish, *utp.Subscribe, *utp.Unsubscribe:
101102
// Received a publish. store it in ibound
102103
// until ACKNOWLEDGE or COMPLETE is sent
103104
okey := uint64(blockID)<<32 + uint64(outMsg.Info().MessageID)
104-
m, err := utp.Encode(outMsg)
105+
m, err := lp.Encode(outMsg)
105106
if err != nil {
106107
fmt.Println(err)
107108
return
@@ -115,7 +116,7 @@ func (l *MessageLog) PersistOutbound(blockID uint32, outMsg utp.Message) {
115116
// Received a RECEIPT control message. store it in obound
116117
// until COMPLETE is received.
117118
okey := uint64(blockID)<<32 + uint64(outMsg.Info().MessageID)
118-
m, err := utp.Encode(outMsg)
119+
m, err := lp.Encode(outMsg)
119120
if err != nil {
120121
fmt.Println(err)
121122
return
@@ -126,13 +127,13 @@ func (l *MessageLog) PersistOutbound(blockID uint32, outMsg utp.Message) {
126127
}
127128

128129
// handle which incoming messages are stored
129-
func (l *MessageLog) PersistInbound(blockID uint32, inMsg utp.Message) {
130+
func (l *MessageLog) PersistInbound(blockID uint32, inMsg lp.MessagePack) {
130131
switch inMsg.(type) {
131132
case *utp.Publish:
132133
// Received a publish. store it in ibound
133134
// until COMPLETE sent
134135
ikey := uint64(blockID)<<32 + uint64(inMsg.Info().MessageID)
135-
m, err := utp.Encode(inMsg)
136+
m, err := lp.Encode(inMsg)
136137
if err != nil {
137138
fmt.Println(err)
138139
return
@@ -151,7 +152,7 @@ func (l *MessageLog) PersistInbound(blockID uint32, inMsg utp.Message) {
151152
// Sending RECEIPT. store in obound
152153
// until COMPLETE received
153154
ikey := uint64(blockID)<<32 + uint64(inMsg.Info().MessageID)
154-
m, err := utp.Encode(inMsg)
155+
m, err := lp.Encode(inMsg)
155156
if err != nil {
156157
fmt.Println(err)
157158
return
@@ -162,10 +163,10 @@ func (l *MessageLog) PersistInbound(blockID uint32, inMsg utp.Message) {
162163
}
163164

164165
// Get performs a query and attempts to fetch message for the given key
165-
func (l *MessageLog) Get(key uint64) utp.Message {
166+
func (l *MessageLog) Get(key uint64) lp.MessagePack {
166167
if raw, err := adp.GetMessage(key); raw != nil && err == nil {
167168
r := bytes.NewReader(raw)
168-
if msg, err := utp.Read(r); err == nil {
169+
if msg, err := lp.Read(r); err == nil {
169170
return msg
170171
}
171172
}

0 commit comments

Comments
 (0)