Skip to content

Commit 060c3ec

Browse files
committed
[server] add pubsub relay to uTP protocol to retrieve persisted messages from server
1 parent f077534 commit 060c3ec

File tree

19 files changed

+466
-230
lines changed

19 files changed

+466
-230
lines changed

README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,11 @@ Make use of the client by importing it in your Go client source code. For exampl
3333
3434
Unitdb supports Get, Put, Delete operations. It also supports encryption, batch operations, and writing to wildcard topics. See [usage guide](https://github.com/unit-io/unitdb/tree/master/docs/usage.md).
3535

36-
Samples are available in the examples directory for reference.
36+
Samples are available in the examples directory for reference. To build unitdb client from latest source code use "replace" in go.mod to point to your local module.
37+
38+
```golang
39+
go mod edit -replace github.com/unit-io/unitdb-go=$GOPATH/src/github.com/unit-io/unitdb-go
40+
```
3741

3842
## Clustering
3943
To bring up the Unitdb cluster start 2 or more nodes. For fault tolerance 3 nodes or more are recommended.

docs/utp.md

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
+ [Message Types](#Message-Types)
2525
- [CONNECT](#CONNECT---Connection-Request)
2626
- [PUBLISH](#PUBLISH---Publish-message)
27+
- [RELAY](#RELAY---Relay-request)
2728
- [SUBSCRIBE](#SUBSCRIBE---Subscribe-request)
2829
- [UNSUBSCRIBE](#UNSUBSCRIBE---Unsubscribe-request)
2930
- [PINGREQ](#PINGREQ---PING-request)
@@ -44,13 +45,15 @@ An application transport the data by uTP across network, it contains payload dat
4445
### Client
4546
A Client opens the network connection to the Server using TCP/IP, WebSocket, GRPC or other bi-direction network protocols.
4647
- Pubslihes Application Mesasges to a topic that other Clients subscribes to.
48+
- Sends a relay request to retrieve persisted Application Messages from Server.
4749
- Subscribes to a topic to receive Application Messages.
4850
- Unsubcribe to remove a topic subscription.
4951
- Closes the network connection to the Server.
5052

5153
### Server
5254
- Accepts the network connections from Clients.
5355
- Recieves and store Application Messages published by Clients.
56+
- Relays pesisted Application Messages that matches the relay requests from Clients.
5457
- Processes topic subscription requests from Clients.
5558
- Route Application Messages that match Client subscriptions.
5659
- Closes the network connection from the Client.
@@ -75,24 +78,25 @@ Represented as enum value, the values are shown below.
7578
| RERSERVED | 0 | Forbidden |
7679
| CONNECT | 1 | Client to Server |
7780
| PUBLISH | 2 | Client to Server or Server to Client |
78-
| SUBSCRIBE | 3 | Client to Server |
79-
| UNSUBSCRIBE | 4 | Client to Server |
80-
| PINGREQ | 5 | Client to Server |
81-
| DISCONNECT | 6 | Client to Server or Server to Client |
81+
| RELAY | 3 | Client to Server |
82+
| SUBSCRIBE | 4 | Client to Server |
83+
| UNSUBSCRIBE | 5 | Client to Server |
84+
| PINGREQ | 6 | Client to Server |
85+
| DISCONNECT | 7 | Client to Server or Server to Client |
8286

8387
### Flow Control
8488
Flow Control is Control Message sent in response to a uTP Message Type or another Control Message.
8589

86-
Client will send one of Message Type CONNECT, SUBSCRIBE, UNSUBSCRIBE, or PINGREQ then Server will reponse with ACKNOWLEDGE Control Message. PUBLISH Message is sent from a Client to a Server or from a Server to a Client and if Delivery Mode is Express Delivery then the receiver will respond with ACKNOWLEDGE Control Message.
90+
Client will send one of Message Type CONNECT, RELAY, SUBSCRIBE, UNSUBSCRIBE, or PINGREQ then Server will reponse with ACKNOWLEDGE Control Message. PUBLISH Message is sent from a Client to a Server or from a Server to a Client and if Delivery Mode is Express Delivery then the receiver will respond with ACKNOWLEDGE Control Message.
8791

8892
| Name | Value | Direction of Flow |
8993
| :--- | :--- | :--- |
9094
| None | 0 | None |
9195
| ACKNOWLEDGE | 1 | Client to Server or Server to Client |
9296
| NOTIFY | 2 | Server to Client |
9397
| RECEIVE | 3 | Client to Server |
94-
| RECEIPT | 4 | Client to Server or Server to Client |
95-
| COMPLETE | 5 | Client to Server or Server to Client |
98+
| RECEIPT | 4 | Client to Server |
99+
| COMPLETE | 5 | Server to Client |
96100

97101
### Message Length
98102
The Message Length represents number of bytes within the current Message.
@@ -106,7 +110,8 @@ In order to implement a Reliable Message Delivery flow the Client and Server nee
106110
The Server persists an unique Session State per Client Identifier. Client can also specify Sesson Key during Connection to persist multiple Session States per Client Identifier. The Session can continue after network re-connection until Client specify Clean Session during a new Connection.
107111

108112
The Client Session pesist following States:
109-
- Reliable and Batch Delivery messages sent to the Server, but have not been completely acknowledged.
113+
- Reliable and Batch Delivery messages sent to the Server, but have not been acknowledged.
114+
- Relay request sent to the Server, but have not been acknowledged.
110115
- Reliable and Batch Delivery messages which have been received from the Server, but have not been completely acknowledged.
111116

112117
The Server Session persiste following states:
@@ -278,6 +283,26 @@ The PublishMessage contains folowing fields:
278283
### TTL
279284
A publisher can specify time to-live (TTL) when publishing an Application Message.
280285

286+
### RELAY - Relay request
287+
The RELAY Message is sent from the Client to the Server to get persisted Application Messages from server for one or more topics. Each Relay request pairs the topics with last durations. The Server sends PUBLISH Messages to the Client to forward Application Messages that were persisted by the Server for the Topics that match these Relay requests. The RELAY Message also specifies (for each request) the Last duration for which the Server can send persisted Application Messages to the Client.
288+
289+
The payload contains following fields.
290+
291+
| Name | Type |
292+
| :--- | :--- |
293+
| MessageID | int32 |
294+
| RelayRequests | repeated RelayRequest |
295+
296+
The RelayRequest contains folowing fields:
297+
298+
| Name | Type |
299+
| :--- | :--- |
300+
| Topic | string |
301+
| Last | string |
302+
303+
#### Last
304+
A Client can specify Last duration (for example "1h") to retrive persisted Application Messages published to the Topic.
305+
281306
### SUBSCRIBE - Subscribe request
282307
The SUBSCRIBE Message is sent from the Client to the Server to create one or more Subscriptions. Each Subscription registers one or more Topics for a Client. The Server sends PUBLISH Messages to the Client to forward Application Messages that were published to Topics that match these Subscriptions. The SUBSCRIBE Message also specifies (for each Subscription) the Delivery Mode with which the Server can send Application Messages to the Client.
283308

@@ -295,10 +320,7 @@ The Subscription contains folowing fields:
295320
| DeliveryMode | int32 |
296321
| Delay | int32 |
297322
| Topic | string |
298-
| Last | string |
299323

300-
#### Last
301-
A Subscriber can spcify Last duration (for example "1h") to retrive persisted Application Messages published to the subscribing Topic.
302324

303325
### UNSUBSCRIBE - Unsubscribe request
304326
An UNSUBSCRIBE Message is sent by the Client to the Server, to unsubscribe from topics.
@@ -320,7 +342,7 @@ The DISCONNECT Message is the final uTP Message sent from the Client or the Serv
320342

321343
## Control Message
322344
### ACKNOWLEDGE - acknowledgement
323-
The ACKNOWLEDGE Control Message is sent by the Server in response to CONNECT, SUBSCRIBE, UNSUBSCRIBE, PUBLISH or PINGREQ Message from a Client.
345+
The ACKNOWLEDGE Control Message is sent by the Server in response to CONNECT, RELAY, SUBSCRIBE, UNSUBSCRIBE, PUBLISH or PINGREQ Message from a Client.
324346

325347
The ACKNOWLEGE Message on CONNECT is sent with Message in the form of binary data that contains below Payload inforation.
326348

@@ -358,11 +380,9 @@ The Server must send a NOTIFY Control Message containing a new Message Identifie
358380
The Client must send a RECEIVE Control Message to respond to the NOTIFY Control Message to tell Server it is ready to receive the message. It must contain the same Message Identifier from NOTIFY Control Message.
359381

360382
### RECEIPT - Publish receipt
361-
The Server must respond with RECEIPT Control Message if it receive a PUBLISH Message with Delivery Mode as RELIABLE or BATCH Delivery.
362-
363383
The Client must respond to a PUBLISH Message with a RECEIPT Control Message if it has a matching subscription with Delivery Mode RELIABLE or BATCH Delivery and sender has sent a PUBLISH Message with RELIABLE or BATCH Delivery Mode.
364384

365385
The RECEIPT Control Message must contain the same Message Identifier from PUBLISH Message.
366386

367387
### COMPLETE - Publish complete
368-
The Client or the Server must respond with COMPLETE Control Message if it receives a RECEIPT Control Message to mark the Publish complete. The COMPLETE Control Message must contain the same Message Identifier from RECEIPT Control Message.
388+
The Server must respond with COMPLETE Control Message if it receives a RECEIPT Control Message from Client to mark the Publish complete. The COMPLETE Control Message must contain the same Message Identifier from RECEIPT Control Message.

examples/pubsub/main.go

Lines changed: 0 additions & 63 deletions
This file was deleted.

examples/sample/main.go

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@ func main() {
1818
defer db.Close()
1919

2020
// Use Entry.WithPayload() method to bulk store messages as topic is parsed one time on first request.
21-
topic := []byte("teams.alpha.ch1.u1")
21+
topic := []byte("teams.alpha.ch1.r1")
2222
entry := &unitdb.Entry{Topic: topic}
2323
for j := 0; j < 50; j++ {
24-
db.PutEntry(entry.WithPayload([]byte(fmt.Sprintf("msg for team alpha channel1 receiver1 #%2d", j))))
24+
db.PutEntry(entry.WithPayload([]byte(fmt.Sprintf("msg for team alpha channel1 recipient1 #%2d", j))))
2525
}
2626

27-
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.alpha.ch1.u1?last=1h")).WithLimit(100)); err == nil {
27+
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.alpha.ch1.r1?last=1h")).WithLimit(100)); err == nil {
2828
for _, msg := range msgs {
2929
log.Printf("%s ", msg)
3030
}
@@ -33,42 +33,42 @@ func main() {
3333
// Writing to single topic in a batch
3434
err = db.Batch(func(b *unitdb.Batch, completed <-chan struct{}) error {
3535
topic := []byte("teams.alpha.ch1.*?ttl=1h")
36-
b.Put(topic, []byte("msg for team alpha channel1 all receivers"))
36+
b.Put(topic, []byte("msg for team alpha channel1 all recipients"))
3737
return nil
3838
})
3939
if err != nil {
4040
log.Fatal(err)
4141
return
4242
}
4343

44-
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.alpha.ch1.u2?last=1h")).WithLimit(10)); err == nil {
44+
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.alpha.ch1.r2?last=1h")).WithLimit(10)); err == nil {
4545
for _, msg := range msgs {
4646
log.Printf("%s ", msg)
4747
}
4848
}
49-
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.alpha.ch1.u3?last=1h")).WithLimit(10)); err == nil {
49+
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.alpha.ch1.r3?last=1h")).WithLimit(10)); err == nil {
5050
for _, msg := range msgs {
5151
log.Printf("%s ", msg)
5252
}
5353
}
5454

5555
// Writing to multiple topics in a batch
5656
err = db.Batch(func(b *unitdb.Batch, completed <-chan struct{}) error {
57-
b.PutEntry(unitdb.NewEntry([]byte("teams.alpha.ch1.u2"), []byte("msg for team alpha channel1 receiver2")))
58-
b.PutEntry(unitdb.NewEntry([]byte("teams.alpha.ch1.u3"), []byte("msg for team alpha channel1 receiver3")))
57+
b.PutEntry(unitdb.NewEntry([]byte("teams.alpha.ch1.r2"), []byte("msg for team alpha channel1 recipient2")))
58+
b.PutEntry(unitdb.NewEntry([]byte("teams.alpha.ch1.r3"), []byte("msg for team alpha channel1 recipient3")))
5959
return nil
6060
})
6161
if err != nil {
6262
log.Fatal(err)
6363
return
6464
}
6565

66-
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.alpha.ch1.u2?last=1h")).WithLimit(10)); err == nil {
66+
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.alpha.ch1.r2?last=1h")).WithLimit(10)); err == nil {
6767
for _, msg := range msgs {
6868
log.Printf("%s ", msg)
6969
}
7070
}
71-
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.alpha.ch1.u3?last=1h")).WithLimit(10)); err == nil {
71+
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.alpha.ch1.r3?last=1h")).WithLimit(10)); err == nil {
7272
for _, msg := range msgs {
7373
log.Printf("%s ", msg)
7474
}
@@ -82,19 +82,19 @@ func main() {
8282
err = db.Batch(func(b *unitdb.Batch, completed <-chan struct{}) error {
8383
b.SetOptions(unitdb.WithBatchContract(contract))
8484
topic := []byte("teams.alpha.ch1.*?ttl=1h")
85-
b.Put(topic, []byte("msg for team alpha channel1 all receivers #1"))
86-
b.Put(topic, []byte("msg for team alpha channel1 all receivers #2"))
87-
b.Put(topic, []byte("msg for team alpha channel1 all receivers #3"))
85+
b.Put(topic, []byte("msg for team alpha channel1 all recipients #1"))
86+
b.Put(topic, []byte("msg for team alpha channel1 all recipients #2"))
87+
b.Put(topic, []byte("msg for team alpha channel1 all recipients #3"))
8888
return nil
8989
})
9090

9191
// Writing to multiple topics in a batch
9292
err = db.Batch(func(b *unitdb.Batch, completed <-chan struct{}) error {
9393
b.SetOptions(unitdb.WithBatchContract(contract))
94-
b.PutEntry(unitdb.NewEntry([]byte("teams.*.ch1"), []byte("msg for any team channel1")))
95-
b.PutEntry(unitdb.NewEntry([]byte("teams.alpha.*"), []byte("msg for team alpha all channels")))
94+
b.PutEntry(unitdb.NewEntry([]byte("teams.*.ch1"), []byte("msg for channel1 in any teams")))
95+
b.PutEntry(unitdb.NewEntry([]byte("teams.alpha.*"), []byte("msg for all channels in team alpha")))
9696
b.PutEntry(unitdb.NewEntry([]byte("teams..."), []byte("msg for all teams and all channels")))
97-
b.PutEntry(unitdb.NewEntry([]byte("..."), []byte("msg broadcast to all receivers of all teams all channels")))
97+
b.PutEntry(unitdb.NewEntry([]byte("..."), []byte("msg broadcast to all recipients of all channels in all teams")))
9898
return nil
9999
})
100100
if err != nil {
@@ -116,8 +116,8 @@ func main() {
116116
}
117117
}
118118

119-
// Get message for team beta channel2 receiver1
120-
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.beta.ch2.u1?last=1h")).WithLimit(10)); err == nil {
119+
// Get message for team beta channel2 recipient11
120+
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.beta.ch2.r1?last=1h")).WithLimit(10)); err == nil {
121121
for _, msg := range msgs {
122122
log.Printf("%s ", msg)
123123
}
@@ -127,8 +127,8 @@ func main() {
127127
// Note, encryption can also be set on entire database using DB.Open() and set encryption flag in options parameter.
128128
err = db.Batch(func(b *unitdb.Batch, completed <-chan struct{}) error {
129129
b.SetOptions(unitdb.WithBatchEncryption())
130-
topic := []byte("teams.alpha.ch1.u1?ttl=1h")
131-
b.Put(topic, []byte("msg for team alpha channel1 receiver1"))
130+
topic := []byte("teams.alpha.ch1.r1?ttl=1h")
131+
b.Put(topic, []byte("msg for team alpha channel1 recipient1"))
132132
return nil
133133
})
134134

0 commit comments

Comments
 (0)