Skip to content

Commit eebe427

Browse files
committed
add pubsub relay to uTP protocol to retrieve persisted messages from server
1 parent 31d6a76 commit eebe427

File tree

21 files changed

+498
-217
lines changed

21 files changed

+498
-217
lines changed

README.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ The Unitdb is a real-time messaging system for microservices, and real-tme inter
77
## Quick Start
88
To build [unitdb](https://github.com/unit-io/unitdb) from source code use go get command and copy unitdb.conf to the path unitdb binary is placed.
99

10-
> go get -u github.com/unit-io/unitdb
10+
> go get -u github.com/unit-io/unitdb/server
1111
1212
### Usage
1313
Detailed API documentation is available using the [godoc.org](https://godoc.org/github.com/unit-io/unitdb-go) service.
@@ -16,7 +16,11 @@ Make use of the client by importing it in your Go client source code. For exampl
1616

1717
import "github.com/unit-io/unitdb-go"
1818

19-
Samples are available in the cmd directory for reference.
19+
Samples are available in the cmd directory for reference. To build unitdb server from latest source code use "replace" in go.mod to point to your local module.
20+
21+
```golang
22+
go mod edit -replace github.com/unit-io/unitdb=$GOPATH/src/github.com/unit-io/unitdb
23+
```
2024

2125
## Contributing
2226
If you'd like to contribute, please fork the repository and use a feature branch. Pull requests are welcome.

batch.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"sync"
66
"time"
77

8-
"github.com/unit-io/unitdb-go/utp"
8+
"github.com/unit-io/unitdb-go/internal/utp"
99
)
1010

1111
const (

client.go

Lines changed: 79 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,14 @@ import (
1111
"time"
1212

1313
"github.com/golang/protobuf/proto"
14-
"github.com/unit-io/unitdb-go/store"
15-
"github.com/unit-io/unitdb-go/utp"
14+
"github.com/unit-io/unitdb-go/internal/store"
15+
"github.com/unit-io/unitdb-go/internal/utp"
1616
"github.com/unit-io/unitdb/server/common"
1717
pbx "github.com/unit-io/unitdb/server/proto"
1818
"google.golang.org/grpc"
1919

2020
// Database store
21-
_ "github.com/unit-io/unitdb-go/db/unitdb"
21+
_ "github.com/unit-io/unitdb-go/internal/db/unitdb"
2222
)
2323

2424
type Client interface {
@@ -39,23 +39,24 @@ type Client interface {
3939
Publish(topic string, payload []byte, pubOpts ...PubOptions) Result
4040
// Subscribe starts a new subscription. Provide a MessageHandler to be executed when
4141
// a message is published on the topic provided, or nil for the default handler.
42+
// Relay sends a relay request to server. Provide a MessageHandler to be executed when
43+
// a message is published on the topic provided, or nil for the default handler.
44+
Relay(topic string, relOpts ...RelOptions) Result
4245
Subscribe(topic string, subOpts ...SubOptions) Result
4346
// Unsubscribe will end the subscription from each of the topics provided.
4447
// Messages published to those topics from other clients will no longer be
4548
// received.
4649
Unsubscribe(topics ...string) Result
4750
}
4851
type client struct {
49-
mu sync.Mutex // mutex for the connection
5052
opts *options
5153
context context.Context // context for the client
5254
cancel context.CancelFunc // cancellation function
5355
messageIds // local identifier of messages
5456
connID int32 // The unique id of the connection.
55-
sessID uint32
56-
epoch uint32 // The session ID of the connection.
57-
conn net.Conn // the network connection
58-
stream grpc.Stream
57+
sessID uint32
58+
epoch uint32 // The session ID of the connection.
59+
conn net.Conn // the network connection
5960
send chan *MessageAndResult
6061
recv chan utp.Message
6162
pub chan *utp.Publish
@@ -180,18 +181,19 @@ func (c *client) ConnectContext(ctx context.Context) error {
180181
c.updateLastTouched()
181182
go c.keepalive(ctx)
182183
}
184+
// c.closeW.Add(3)
183185
go c.readLoop(ctx) // process incoming messages
184186
go c.writeLoop(ctx) // send messages to servers
185187
go c.dispatcher(ctx) // dispatch messages to client
186188

187-
// Take care of any messages in the store
188-
var sessKey uint32
189-
if c.opts.sessionKey != 0 {
190-
sessKey = c.opts.sessionKey
191-
} else {
192-
sessKey = c.epoch
193-
}
194-
189+
// Take care of any messages in the store
190+
var sessKey uint32
191+
if c.opts.sessionKey != 0 {
192+
sessKey = c.opts.sessionKey
193+
} else {
194+
sessKey = c.epoch
195+
}
196+
195197
sessID := c.connID
196198
if rawSess, err := store.Session.Get(uint64(sessKey)); err == nil {
197199
sessID := binary.LittleEndian.Uint32(rawSess[:4])
@@ -275,7 +277,7 @@ func (c *client) DisconnectContext(ctx context.Context) error {
275277
// Disconnect() called but not connected
276278
return nil
277279
}
278-
280+
279281
defer c.close()
280282
m := &utp.Disconnect{}
281283
r := &DisconnectResult{result: result{complete: make(chan struct{})}}
@@ -354,6 +356,42 @@ func (c *client) Publish(topic string, payload []byte, pubOpts ...PubOptions) Re
354356
return r
355357
}
356358

359+
// Relay send a new relay request. Provide a MessageHandler to be executed when
360+
// a message is published on the topic provided.
361+
func (c *client) Relay(topic string, relOpts ...RelOptions) Result {
362+
r := &RelayResult{result: result{complete: make(chan struct{})}}
363+
if err := c.ok(); err != nil {
364+
r.setError(errors.New("error not connected"))
365+
return r
366+
}
367+
opts := new(relOptions)
368+
for _, opt := range relOpts {
369+
opt.set(opts)
370+
}
371+
372+
rel := &utp.Relay{}
373+
rel.RelayRequests = append(rel.RelayRequests, &utp.RelayRequest{Topic: topic, Last: opts.last})
374+
375+
if rel.MessageID == 0 {
376+
mID := c.nextID(r)
377+
rel.MessageID = c.outboundID(mID)
378+
}
379+
relayWaitTimeout := c.opts.writeTimeout
380+
if relayWaitTimeout == 0 {
381+
relayWaitTimeout = time.Second * 30
382+
}
383+
// persist outbound
384+
c.storeOutbound(rel)
385+
select {
386+
case c.send <- &MessageAndResult{m: rel, r: r}:
387+
case <-time.After(relayWaitTimeout):
388+
r.setError(errors.New("relay request timeout error occurred"))
389+
return r
390+
}
391+
392+
return r
393+
}
394+
357395
// Subscribe starts a new subscription. Provide a MessageHandler to be executed when
358396
// a message is published on the topic provided.
359397
func (c *client) Subscribe(topic string, subOpts ...SubOptions) Result {
@@ -368,10 +406,7 @@ func (c *client) Subscribe(topic string, subOpts ...SubOptions) Result {
368406
}
369407

370408
sub := &utp.Subscribe{}
371-
sub.Subscriptions = append(sub.Subscriptions, &utp.Subscription{DeliveryMode: opts.deliveryMode, Delay: opts.delay, Topic: topic, Last: opts.last})
372-
373-
if opts.callback != nil {
374-
}
409+
sub.Subscriptions = append(sub.Subscriptions, &utp.Subscription{DeliveryMode: opts.deliveryMode, Delay: opts.delay, Topic: topic})
375410

376411
if sub.MessageID == 0 {
377412
mID := c.nextID(r)
@@ -435,20 +470,35 @@ func (c *client) resume(prefix uint32, subscription bool) {
435470
// isKeyOutbound
436471
if (k & (1 << 4)) == 0 {
437472
switch msg.Type() {
473+
case utp.RELAY:
474+
p := msg.(*utp.Relay)
475+
r := &RelayResult{result: result{complete: make(chan struct{})}}
476+
r.messageID = msg.Info().MessageID
477+
c.messageIds.resumeID(MID(r.messageID))
478+
// var topics []RelayRequest
479+
// for _, req := range p.RelayRequests {
480+
// var t RelayRequest
481+
// t.Topic = req.Topic
482+
// t.Last = req.Last
483+
// topics = append(topics, t)
484+
// }
485+
r.reqs = p.RelayRequests
486+
c.send <- &MessageAndResult{m: msg, r: r}
438487
case utp.SUBSCRIBE:
439488
if subscription {
440489
p := msg.(*utp.Subscribe)
441490
r := &SubscribeResult{result: result{complete: make(chan struct{})}}
442491
r.messageID = msg.Info().MessageID
443492
c.messageIds.resumeID(MID(r.messageID))
444-
var topics []Subscription
445-
for _, sub := range p.Subscriptions {
446-
var t Subscription
447-
t.Topic = sub.Topic
448-
t.DeliveryMode = sub.DeliveryMode
449-
topics = append(topics, t)
450-
}
451-
r.subs = append(r.subs, topics...)
493+
// var topics []Subscription
494+
// for _, sub := range p.Subscriptions {
495+
// var t Subscription
496+
// t.Topic = sub.Topic
497+
// t.DeliveryMode = sub.DeliveryMode
498+
// topics = append(topics, t)
499+
// }
500+
// r.subs = append(r.subs, topics...)
501+
r.subs = p.Subscriptions
452502
c.send <- &MessageAndResult{m: msg, r: r}
453503
}
454504
case utp.UNSUBSCRIBE:

cmd/sample/main.go

Lines changed: 62 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,17 @@ Options:
2828
*/
2929

3030
func main() {
31-
topic := flag.String("topic", "AbYANcEGXRTLC/teams.alpha.user1", "The topic name to/from which to publish/subscribe")
31+
topic := flag.String("topic", "DDcBReFJBDFKe/groups.private.673651407196578720.message", "The topic name to/from which to publish/subscribe")
3232
server := flag.String("server", "grpc://localhost:6080", "The server URI. ex: grpc://127.0.0.1:6080")
3333
password := flag.String("password", "", "The password (optional)")
3434
user := flag.String("user", "", "The User (optional)")
3535
id := flag.String("id", "UCBFDONCNJLaKMCAIeJBaOVfbAXUZHNPLDKKLDKLHZHKYIZLCDPQ", "The ClientID (optional)")
3636
num := flag.Int("number", 1, "The number of messages to publish or subscribe (default 1)")
3737
payload := flag.String("message", "Hello team alpha channel1!", "The message text to publish (default empty)")
38-
action := flag.String("action", "sub", "Action publish or subscribe (required)")
38+
action := flag.String("action", "relay", "Action publish, relay or subscribe (required)")
3939
flag.Parse()
4040

41-
if *action != "pub" && *action != "sub" && *action != "unsub" && *action != "keygen" {
41+
if *action != "pub" && *action != "relay" && *action != "sub" && *action != "unsub" && *action != "keygen" {
4242
fmt.Println("Invalid setting for -action, must be pub or sub")
4343
return
4444
}
@@ -86,12 +86,13 @@ func main() {
8686
log.Fatalf("err: %s", err)
8787
}
8888
fmt.Println("Keygen Started")
89-
req := struct {
89+
req := []struct {
9090
Topic string `json:"topic"`
9191
Type string `json:"type"`
92-
}{
92+
}{{
9393
*topic,
9494
"rw",
95+
},
9596
}
9697
keyReq, err := json.Marshal(req)
9798
if err != nil {
@@ -114,7 +115,57 @@ func main() {
114115
}
115116
}
116117

118+
if *action == "sub" {
119+
recv := make(chan [2][]byte)
120+
121+
client, err := unitdb.NewClient(
122+
*server,
123+
*id,
124+
// unitdb.WithInsecure(),
125+
unitdb.WithSessionKey(2339641922),
126+
unitdb.WithUserNamePassword(*user, []byte(*password)),
127+
// unitdb.WithCleanSession(),
128+
unitdb.WithKeepAlive(2*time.Second),
129+
unitdb.WithPingTimeout(1*time.Second),
130+
unitdb.WithConnectionLostHandler(func(client unitdb.Client, err error) {
131+
if err != nil {
132+
log.Fatal(err)
133+
}
134+
close(recv)
135+
}),
136+
unitdb.WithDefaultMessageHandler(func(client unitdb.Client, msg unitdb.Message) {
137+
recv <- [2][]byte{[]byte(msg.Topic()), msg.Payload()}
138+
}),
139+
unitdb.WithBatchDuration(10*time.Second),
140+
)
141+
if err != nil {
142+
log.Fatalf("err: %s", err)
143+
}
144+
ctx := context.Background()
145+
err = client.ConnectContext(ctx)
146+
if err != nil {
147+
log.Fatalf("err: %s", err)
148+
}
149+
r := client.Subscribe(*topic, unitdb.WithSubDeliveryMode(1) /*, unitdb.WithSubDelay(1*time.Second)*/)
150+
if _, err := r.Get(ctx, 1*time.Second); err != nil {
151+
fmt.Println(err)
152+
os.Exit(1)
153+
}
154+
155+
for {
156+
select {
157+
case <-ctx.Done():
158+
client.DisconnectContext(ctx)
159+
fmt.Println("Subscriber Disconnected")
160+
return
161+
case incoming := <-recv:
162+
fmt.Printf("RECEIVED TOPIC: %s MESSAGE: %s\n", incoming[0], incoming[1])
163+
}
164+
}
165+
}
166+
117167
if *action == "pub" {
168+
118169
client, err := unitdb.NewClient(
119170
*server,
120171
*id,
@@ -147,6 +198,7 @@ func main() {
147198
time.Sleep(1 * time.Second)
148199
client.DisconnectContext(ctx)
149200
fmt.Println("Publisher Disconnected")
201+
150202
} else {
151203
recv := make(chan [2][]byte)
152204

@@ -178,7 +230,7 @@ func main() {
178230
if err != nil {
179231
log.Fatalf("err: %s", err)
180232
}
181-
r := client.Subscribe(*topic, unitdb.WithSubDeliveryMode(1)/*, unitdb.WithSubDelay(1*time.Second)*/)
233+
r := client.Relay(*topic, unitdb.WithLast("1m"))
182234
if _, err := r.Get(ctx, 1*time.Second); err != nil {
183235
fmt.Println(err)
184236
os.Exit(1)
@@ -188,10 +240,13 @@ func main() {
188240
select {
189241
case <-ctx.Done():
190242
client.DisconnectContext(ctx)
191-
fmt.Println("Subscriber Disconnected")
243+
fmt.Println("Client Disconnected")
192244
return
193245
case incoming := <-recv:
194246
fmt.Printf("RECEIVED TOPIC: %s MESSAGE: %s\n", incoming[0], incoming[1])
247+
client.DisconnectContext(ctx)
248+
fmt.Println("Client Disconnected")
249+
return
195250
}
196251
}
197252
}

cmd/simple/main.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,23 +37,31 @@ func main() {
3737
log.Fatalf("err: %s", err)
3838
}
3939

40-
r := client.Subscribe("teams.alpha.user1", unitdb.WithLast("1m"), unitdb.WithSubDeliveryMode(0))
40+
var r unitdb.Result
41+
42+
r = client.Relay("ADcABeFRBDJKe/groups.private.673651407196578720.message", unitdb.WithLast("10m"))
43+
if _, err := r.Get(ctx, 1*time.Second); err != nil {
44+
fmt.Println(err)
45+
os.Exit(1)
46+
}
47+
48+
r = client.Subscribe("ADcABeFRBDJKe/groups.private.673651407196578720.message", unitdb.WithSubDeliveryMode(0))
4149
if _, err := r.Get(ctx, 1*time.Second); err != nil {
4250
fmt.Println(err)
4351
os.Exit(1)
4452
}
4553

4654
for i := 0; i < 2; i++ {
4755
msg := fmt.Sprintf("Hi #%d time!", i)
48-
r := client.Publish("teams.alpha.user1", []byte(msg), unitdb.WithTTL("1m"), unitdb.WithPubDeliveryMode(0))
56+
r := client.Publish("ADcABeFRBDJKe/groups.private.673651407196578720.message", []byte(msg), unitdb.WithTTL("1m"), unitdb.WithPubDeliveryMode(0))
4957
if _, err := r.Get(ctx, 1*time.Second); err != nil {
5058
log.Fatalf("err: %s", err)
5159
}
5260
}
5361

5462
wait := time.NewTicker(5 * time.Second)
5563
<-wait.C
56-
r = client.Unsubscribe("teams.alpha.user1")
64+
r = client.Unsubscribe("ADcABeFRBDJKe/groups.private.673651407196578720.message")
5765
if _, err := r.Get(ctx, 1*time.Second); err != nil {
5866
fmt.Println(err)
5967
os.Exit(1)

go.mod

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ go 1.16
44

55
require (
66
github.com/golang/protobuf v1.5.2
7-
github.com/unit-io/bpool v0.0.0-20200906005724-1643bbf59264 // indirect
8-
github.com/unit-io/unitdb v0.1.0
9-
google.golang.org/grpc v1.37.0
7+
github.com/unit-io/unitdb v0.1.1
8+
google.golang.org/grpc v1.39.0
109
)
10+
11+
replace github.com/unit-io/unitdb => /src/github.com/unit-io/unitdb

0 commit comments

Comments
 (0)