Skip to content

Commit f077534

Browse files
committed
[Server] add linter error fixes
1 parent 74cb80a commit f077534

File tree

12 files changed

+280
-381
lines changed

12 files changed

+280
-381
lines changed

go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ require (
88
github.com/golang/snappy v0.0.3
99
github.com/gorilla/websocket v1.4.2
1010
github.com/rs/zerolog v1.21.0
11-
github.com/stretchr/testify v1.7.0
1211
github.com/unit-io/bpool v0.0.0-20200906005724-1643bbf59264
1312
github.com/unit-io/unitdb-go v0.0.0-20210407101657-d9db9270f78d
1413
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2

server/internal/batch.go

Lines changed: 26 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@ type (
3030
count int
3131
size int
3232
msgs []*lp.PublishMessage
33-
34-
timeRefs []timeID
3533
}
3634
batchManager struct {
3735
mu sync.RWMutex
@@ -158,22 +156,18 @@ func (m *batchManager) publishLoop(interval time.Duration) {
158156
// dispatch handles publishing messages for the batches in queue.
159157
func (m *batchManager) dispatch(timeout time.Duration) {
160158
LOOP:
161-
for {
162-
select {
163-
case b, ok := <-m.publishQueue:
164-
if !ok {
165-
close(m.send)
166-
m.stopWg.Done()
167-
return
168-
}
159+
b, ok := <-m.publishQueue
160+
if !ok {
161+
close(m.send)
162+
m.stopWg.Done()
163+
return
164+
}
169165

170-
select {
171-
case m.send <- b:
172-
default:
173-
// pool is full, let GC handle the batches
174-
goto WAIT
175-
}
176-
}
166+
select {
167+
case m.send <- b:
168+
default:
169+
// pool is full, let GC handle the batches
170+
goto WAIT
177171
}
178172

179173
WAIT:
@@ -189,26 +183,21 @@ func (m *batchManager) publish(c *_Conn, publishWaitTimeout time.Duration) {
189183
case <-m.stop:
190184
// run queued messges from the publish queue and
191185
// process it until queue is empty.
192-
for {
193-
select {
194-
case b, ok := <-m.send:
195-
if !ok {
196-
m.stopWg.Done()
197-
return
198-
}
199-
pub := &lp.Publish{Messages: b.msgs}
200-
pub.MessageID = c.MessageIds.NextID(lp.PUBLISH.Value())
201-
202-
// persist outbound
203-
store.Log.PersistOutbound(c.adp, uint32(c.connID), pub)
204-
205-
select {
206-
case c.pub <- pub:
207-
case <-time.After(publishWaitTimeout):
208-
// b.r.setError(errors.New("publish timeout error occurred"))
209-
}
210-
default:
211-
}
186+
b, ok := <-m.send
187+
if !ok {
188+
m.stopWg.Done()
189+
return
190+
}
191+
pub := &lp.Publish{Messages: b.msgs}
192+
pub.MessageID = c.MessageIds.NextID(lp.PUBLISH.Value())
193+
194+
// persist outbound
195+
store.Log.PersistOutbound(c.adp, uint32(c.connID), pub)
196+
197+
select {
198+
case c.pub <- pub:
199+
case <-time.After(publishWaitTimeout):
200+
// b.r.setError(errors.New("publish timeout error occurred"))
212201
}
213202
case b := <-m.send:
214203
if b != nil {

server/internal/cluster.go

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"encoding/gob"
2121
"encoding/json"
2222
"errors"
23+
"fmt"
2324
"net"
2425
"net/rpc"
2526
"sort"
@@ -225,21 +226,19 @@ func (n *ClusterNode) callAsync(proc string, msg, resp interface{}, done chan *r
225226

226227
myDone := make(chan *rpc.Call, 1)
227228
go func() {
228-
select {
229-
case call := <-myDone:
230-
if call.Error != nil {
231-
n.lock.Lock()
232-
if n.connected {
233-
n.endpoint.Close()
234-
n.connected = false
235-
go n.reconnect()
236-
}
237-
n.lock.Unlock()
229+
call := <-myDone
230+
if call.Error != nil {
231+
n.lock.Lock()
232+
if n.connected {
233+
n.endpoint.Close()
234+
n.connected = false
235+
go n.reconnect()
238236
}
237+
n.lock.Unlock()
238+
}
239239

240-
if done != nil {
241-
done <- call
242-
}
240+
if done != nil {
241+
done <- call
243242
}
244243
}()
245244

@@ -306,7 +305,7 @@ func (c *Cluster) Master(msg *ClusterReq, rejected *bool) error {
306305
return nil
307306
}
308307

309-
log.Info("cluster.Master", "new connection request"+string(msg.Conn.ConnID))
308+
log.Info("cluster.Master", "new connection request"+fmt.Sprint(msg.Conn.ConnID))
310309
conn = Globals.Service.newRpcConn(node, msg.Conn.ConnID, msg.Conn.SessID, msg.Conn.ClientID)
311310
go conn.rpcWriteLoop()
312311
}
@@ -333,7 +332,7 @@ func (c *Cluster) Master(msg *ClusterReq, rejected *bool) error {
333332

334333
// Dispatch receives messages from the master node addressed to a specific local connection.
335334
func (Cluster) Proxy(resp *ClusterResp, unused *bool) error {
336-
log.Info("cluster.Proxy", "response from Master for connection "+string(resp.FromConnID))
335+
log.Info("cluster.Proxy", "response from Master for connection "+fmt.Sprint(resp.FromConnID))
337336

338337
// This cluster member received a response from topic owner to be forwarded to a connection
339338
// Find appropriate connection, send the message to it
@@ -376,7 +375,7 @@ func (c *Cluster) isRemoteContract(contract string) bool {
376375
// Forward client message to the Master (cluster node which owns the topic)
377376
func (c *Cluster) routeToContract(msg lp.LineProtocol, topic *security.Topic, msgType uint8, m *message.Message, conn *_Conn) error {
378377
// Find the cluster node which owns the topic, then forward to it.
379-
n := c.nodeForContract(string(conn.clientID.Contract()))
378+
n := c.nodeForContract(fmt.Sprint(conn.clientID.Contract()))
380379
if n == nil {
381380
return errors.New("cluster.routeToContract: attempt to route to non-existent node")
382381
}
@@ -416,7 +415,7 @@ func (c *Cluster) routeToContract(msg lp.LineProtocol, topic *security.Topic, ms
416415
//RemoteAddr: conn.(),
417416
Proto: conn.proto,
418417
ConnID: conn.connID,
419-
SessID: conn.sessID,
418+
SessID: conn.sessID,
420419
ClientID: conn.clientID}})
421420
}
422421

@@ -620,9 +619,7 @@ func (c *Cluster) rehash(nodes []string) []string {
620619
}
621620
ringKeys = append(ringKeys, c.thisNodeName)
622621
} else {
623-
for _, name := range nodes {
624-
ringKeys = append(ringKeys, name)
625-
}
622+
ringKeys = append(ringKeys, nodes...)
626623
}
627624
ring.Add(ringKeys...)
628625

server/internal/conn.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package internal
1919
import (
2020
"encoding/binary"
2121
"encoding/json"
22+
"fmt"
2223
"net"
2324
"runtime/debug"
2425
"strconv"
@@ -37,7 +38,6 @@ import (
3738

3839
type _Conn struct {
3940
sync.Mutex
40-
tracked uint32 // Whether the connection was already tracked or not.
4141
proto lp.Proto
4242
adp lp.ProtoAdapter
4343
socket net.Conn
@@ -187,17 +187,17 @@ func (c *_Conn) subscribe(msg lp.Subscribe, topic *security.Topic, sub *lp.Subsc
187187
return err
188188
}
189189
}
190-
if exists := c.subs.Exist(key); exists && !msg.IsForwarded && Globals.Cluster.isRemoteContract(string(c.clientID.Contract())) {
190+
if exists := c.subs.Exist(key); exists && !msg.IsForwarded && Globals.Cluster.isRemoteContract(fmt.Sprint(c.clientID.Contract())) {
191191
// The contract is handled by a remote node. Forward message to it.
192192
if err := Globals.Cluster.routeToContract(&msg, topic, message.SUBSCRIBE, &message.Message{}, c); err != nil {
193193
log.ErrLogger.Err(err).Str("context", "conn.subscribe").Int64("connid", int64(c.connID)).Msg("unable to subscribe to remote topic")
194194
return err
195195
}
196-
// Add the subscription to Counters
197196
} else {
198197
messageId, err := store.Subscription.NewID()
199198
if err != nil {
200199
log.ErrLogger.Err(err).Str("context", "conn.subscribe")
200+
return err
201201
}
202202
if first := c.subs.Increment(topic.Topic[:topic.Size], key, messageId); first {
203203
// Subscribe the subscriber
@@ -232,7 +232,7 @@ func (c *_Conn) unsubscribe(msg lp.Unsubscribe, topic *security.Topic) (err erro
232232
// Decrement the subscription counter
233233
c.service.meter.Subscriptions.Dec(1)
234234
}
235-
if !msg.IsForwarded && Globals.Cluster.isRemoteContract(string(c.clientID.Contract())) {
235+
if !msg.IsForwarded && Globals.Cluster.isRemoteContract(fmt.Sprint(c.clientID.Contract())) {
236236
// The topic is handled by a remote node. Forward message to it.
237237
if err := Globals.Cluster.routeToContract(&msg, topic, message.UNSUBSCRIBE, &message.Message{}, c); err != nil {
238238
log.ErrLogger.Err(err).Str("context", "conn.unsubscribe").Int64("connid", int64(c.connID)).Msg("unable to unsubscribe to remote topic")
@@ -252,6 +252,7 @@ func (c *_Conn) publish(pkt lp.Publish, topic *security.Topic, m *lp.PublishMess
252252
subscriptions, err := store.Subscription.Get(c.clientID.Contract(), topic.Topic)
253253
if err != nil {
254254
log.ErrLogger.Err(err).Str("context", "conn.publish")
255+
return err
255256
}
256257
pubMsg := &message.Message{
257258
MessageID: pkt.MessageID,
@@ -300,12 +301,13 @@ func (c *_Conn) publish(pkt lp.Publish, topic *security.Topic, m *lp.PublishMess
300301
c.service.meter.OutMsgs.Inc(int64(msgCount))
301302
c.service.meter.OutBytes.Inc(pubMsg.Size() * int64(msgCount))
302303

303-
if !pkt.IsForwarded && Globals.Cluster.isRemoteContract(string(c.clientID.Contract())) {
304+
if !pkt.IsForwarded && Globals.Cluster.isRemoteContract(fmt.Sprint(c.clientID.Contract())) {
304305
if err = Globals.Cluster.routeToContract(&pkt, topic, message.PUBLISH, pubMsg, c); err != nil {
305306
log.ErrLogger.Err(err).Str("context", "conn.publish").Int64("connid", int64(c.connID)).Msg("unable to publish to remote topic")
307+
return err
306308
}
307309
}
308-
return err
310+
return nil
309311
}
310312

311313
// Load all stored messages and resend them to ensure DeliveryMode > 1,2 even after an application crash.

server/internal/hdl_conn.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"encoding/binary"
2323
"encoding/json"
2424
"errors"
25+
"fmt"
2526
"time"
2627

2728
"github.com/unit-io/unitdb/server/internal/message"
@@ -40,7 +41,7 @@ const (
4041
requestKeygen = 812942072 // hash("keygen")
4142
)
4243

43-
func (c *_Conn) readLoop(ctx context.Context) error {
44+
func (c *_Conn) readLoop(ctx context.Context) (err error) {
4445
defer func() {
4546
log.Info("conn.Handler", "closing...")
4647
c.close()
@@ -63,7 +64,8 @@ func (c *_Conn) readLoop(ctx context.Context) error {
6364
}
6465

6566
// Message handler
66-
if err := c.handler(pkt); err != nil {
67+
if err = c.handler(pkt); err != nil {
68+
fmt.Println("read:: handler error", err)
6769
return err
6870
}
6971
}
@@ -318,7 +320,9 @@ func (c *_Conn) onSubscribe(sub lp.Subscribe, subsc *lp.Subscription) *types.Err
318320
}
319321
}
320322

321-
c.subscribe(sub, topic, subsc)
323+
if err := c.subscribe(sub, topic, subsc); err != nil {
324+
return types.ErrServerError
325+
}
322326

323327
if subsc.Last != "" {
324328
msgs, err := store.Message.Get(c.clientID.Contract(), topic.Topic, subsc.Last)
@@ -327,7 +331,7 @@ func (c *_Conn) onSubscribe(sub lp.Subscribe, subsc *lp.Subscription) *types.Err
327331
return types.ErrServerError
328332
}
329333

330-
// Range over the messages in the channel and forward them
334+
// Range over the messages from the store and forward them to subscribers
331335
for _, m := range msgs {
332336
msg := m // Copy message
333337
c.SendMessage(&msg)
@@ -355,7 +359,9 @@ func (c *_Conn) onUnsubscribe(unsub lp.Unsubscribe, subsc *lp.Subscription) *typ
355359
}
356360
}
357361

358-
c.unsubscribe(unsub, topic)
362+
if err := c.unsubscribe(unsub, topic); err != nil {
363+
return types.ErrServerError
364+
}
359365

360366
return nil
361367
}

0 commit comments

Comments
 (0)