Skip to content
This repository was archived by the owner on Nov 7, 2023. It is now read-only.

Commit 65dfd15

Browse files
authored
Merge pull request #250 from sergefdrv/termination
Protocol instance termination
2 parents eadb1f6 + 10dcb91 commit 65dfd15

File tree

17 files changed

+410
-195
lines changed

17 files changed

+410
-195
lines changed

client/client.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,12 @@ type Stack interface {
4242
// Request requests execution of the supplied operation on the
4343
// replicated state machine and returns a channel to receive the
4444
// result of execution from.
45+
//
46+
// Terminate causes the instance to stop all activity so that the
47+
// garbage collector can reclaim any resources allocated for it.
4548
type Client interface {
4649
Request(operation []byte) (resultChan <-chan []byte)
50+
Terminate()
4751
}
4852

4953
type options struct {
@@ -72,6 +76,11 @@ func WithLogger(logger commonLogger.Logger) Option {
7276
}
7377
}
7478

79+
type client struct {
80+
handleRequest requestHandler
81+
stopChan chan struct{}
82+
}
83+
7584
// New creates an instance of Client given a client ID, total number
7685
// of replica nodes n, number of tolerated faulty replica nodes f, and
7786
// a stack of external interfaces.
@@ -80,18 +89,24 @@ func New(id uint32, n, f uint32, stack Stack, opts ...Option) (Client, error) {
8089
return nil, fmt.Errorf("insufficient number of replica nodes")
8190
}
8291

92+
stopChan := make(chan struct{})
8393
buf := requestbuffer.New()
8494
logger := makeDefaultOptions(id, opts...).logger
8595

86-
if err := startReplicaConnections(id, n, buf, stack, logger); err != nil {
96+
if err := startReplicaConnections(id, n, buf, stack, stopChan, logger); err != nil {
8797
return nil, fmt.Errorf("failed to initiate connections to replicas: %s", err)
8898
}
8999

90100
seq := makeSequenceGenerator()
91-
return makeRequestHandler(id, seq, stack, buf, f, logger), nil
101+
handleRequest := makeRequestHandler(id, seq, stack, buf, f, stopChan, logger)
102+
103+
return &client{handleRequest, stopChan}, nil
104+
}
105+
106+
func (c *client) Request(operation []byte) <-chan []byte {
107+
return c.handleRequest(operation)
92108
}
93109

94-
// Request implements Client interface on requestHandler
95-
func (handler requestHandler) Request(operation []byte) <-chan []byte {
96-
return handler(operation)
110+
func (c *client) Terminate() {
111+
close(c.stopChan)
97112
}

client/internal/requestbuffer/requestbuffer.go

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,16 +56,21 @@ func New() *T {
5656
// AddReply. The returned channel is closed when RemoveRequest is
5757
// invoked for the Request message. The returned boolean value
5858
// indicates if the message was accepted.
59-
func (rb *T) AddRequest(msg messages.Request) (<-chan messages.Reply, bool) {
59+
func (rb *T) AddRequest(msg messages.Request, cancel <-chan struct{}) (<-chan messages.Reply, bool) {
6060
rb.lock.Lock()
61-
defer rb.lock.Unlock()
62-
6361
for rb.request != nil {
62+
seq := rb.lastSeq
6463
req := rb.request
64+
6565
rb.lock.Unlock()
66-
<-req.removed
66+
select {
67+
case <-req.removed:
68+
case <-cancel:
69+
rb.RemoveRequest(seq)
70+
}
6771
rb.lock.Lock()
6872
}
73+
defer rb.lock.Unlock()
6974

7075
if msg.Sequence() <= rb.lastSeq {
7176
return nil, false
@@ -81,8 +86,8 @@ func (rb *T) AddRequest(msg messages.Request) (<-chan messages.Reply, bool) {
8186
}
8287
}
8388

84-
replyChannel := make(chan messages.Reply)
85-
go rb.request.supplyReplies(replyChannel)
89+
replyChannel := make(chan messages.Reply, 1)
90+
go rb.request.supplyReplies(replyChannel, cancel)
8691

8792
return replyChannel, true
8893
}
@@ -192,8 +197,16 @@ func (r *request) addReply(msg messages.Reply) bool {
192197
return r.replySet.addReply(msg)
193198
}
194199

195-
func (r *request) supplyReplies(ch chan<- messages.Reply) {
196-
r.replySet.supplyReplies(ch, r.removed)
200+
func (r *request) supplyReplies(ch chan<- messages.Reply, cancel <-chan struct{}) {
201+
done := make(chan struct{})
202+
go func() {
203+
defer close(done)
204+
select {
205+
case <-r.removed:
206+
case <-cancel:
207+
}
208+
}()
209+
r.replySet.supplyReplies(ch, done)
197210
}
198211

199212
type replySet struct {

client/internal/requestbuffer/requestbuffer_test.go

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,29 +35,29 @@ func TestAddRequest(t *testing.T) {
3535

3636
seq0 := uint64(0)
3737
req0 := makeRequest(seq0)
38-
_, ok := rb.AddRequest(req0)
38+
_, ok := rb.AddRequest(req0, nil)
3939
assert.False(t, ok, "Request with zero sequence ID must be rejected")
4040

4141
seq1 := seq0 + 1
4242
req1 := makeRequest(seq1)
43-
ch, ok := rb.AddRequest(req1)
43+
ch, ok := rb.AddRequest(req1, nil)
4444
require.True(t, ok)
4545
assert.NotNil(t, ch)
4646

4747
rb.RemoveRequest(seq1)
4848

49-
_, ok = rb.AddRequest(req1)
49+
_, ok = rb.AddRequest(req1, nil)
5050
assert.False(t, ok, "Subsequent Request sequence ID cannot be the same")
5151

5252
seq2 := seq1 + 1
5353
req2 := makeRequest(seq2)
54-
ch, ok = rb.AddRequest(req2)
54+
ch, ok = rb.AddRequest(req2, nil)
5555
require.True(t, ok)
5656
assert.NotNil(t, ch)
5757

5858
rb.RemoveRequest(seq2)
5959

60-
_, ok = rb.AddRequest(req1)
60+
_, ok = rb.AddRequest(req1, nil)
6161
assert.False(t, ok, "Subsequent Request sequence ID cannot decrease")
6262
}
6363

@@ -68,7 +68,7 @@ func TestRemoveRequest(t *testing.T) {
6868
rb.RemoveRequest(seq1) // no panic
6969

7070
req1 := makeRequest(seq1)
71-
ch1, ok := rb.AddRequest(req1)
71+
ch1, ok := rb.AddRequest(req1, nil)
7272
require.True(t, ok)
7373

7474
seq2 := seq1 + 1
@@ -93,7 +93,7 @@ func TestAddReply(t *testing.T) {
9393
assert.NotNil(t, ok, "Must drop Reply with empty buffer")
9494

9595
req1 := makeRequest(seq1)
96-
ch, ok := rb.AddRequest(req1)
96+
ch, ok := rb.AddRequest(req1, nil)
9797
require.True(t, ok)
9898

9999
seq2 := seq1 + 1
@@ -120,6 +120,14 @@ func TestAddReply(t *testing.T) {
120120

121121
ok = rb.AddReply(rly1r0)
122122
assert.NotNil(t, ok, "Must drop Reply after Request removal")
123+
124+
req2 := makeRequest(seq2)
125+
cancel := make(chan struct{})
126+
ch, ok = rb.AddRequest(req2, cancel)
127+
require.True(t, ok)
128+
129+
close(cancel)
130+
assert.Nil(t, <-ch)
123131
}
124132

125133
func TestRequestStream(t *testing.T) {
@@ -136,7 +144,7 @@ func TestRequestStream(t *testing.T) {
136144
close(doneChan)
137145
}()
138146

139-
_, ok := rb.AddRequest(req)
147+
_, ok := rb.AddRequest(req, nil)
140148
require.True(t, ok)
141149

142150
<-doneChan
@@ -170,7 +178,7 @@ func TestConcurrent(t *testing.T) {
170178
wg.Add(nrRequests)
171179
for seq := uint64(1); seq <= nrRequests; seq++ {
172180
req := makeRequest(seq)
173-
ch, ok := rb.AddRequest(req)
181+
ch, ok := rb.AddRequest(req, nil)
174182
assert.True(t, ok)
175183
assert.NotNil(t, ch)
176184

@@ -220,7 +228,7 @@ func TestWithFaulty(t *testing.T) {
220228
wg.Add(nrRequests)
221229
for seq := uint64(1); seq <= nrRequests; seq++ {
222230
req := makeRequest(seq)
223-
ch, ok := rb.AddRequest(req)
231+
ch, ok := rb.AddRequest(req, nil)
224232
require.True(t, ok)
225233

226234
go func(seq uint64) {

client/message-handling.go

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,15 @@ import (
3030
// starts message exchange with them given a total number of replicas,
3131
// request buffer to add/fetch messages to/from and a stack of
3232
// interfaces to external modules.
33-
func startReplicaConnections(clientID, n uint32, buf *requestbuffer.T, stack Stack, logger logger.Logger) error {
34-
outHandler := makeOutgoingMessageHandler(buf)
33+
func startReplicaConnections(clientID, n uint32, buf *requestbuffer.T, stack Stack, stop <-chan struct{}, logger logger.Logger) error {
34+
outHandler := makeOutgoingMessageHandler(buf, stop)
3535
authenticator := makeReplyAuthenticator(clientID, stack)
3636
consumer := makeReplyConsumer(buf)
3737
handleReply := makeReplyMessageHandler(consumer, authenticator, logger)
3838

3939
for i := uint32(0); i < n; i++ {
4040
connector := makeReplicaConnector(i, stack)
41-
inHandler := makeIncomingMessageHandler(i, handleReply, logger)
41+
inHandler := makeIncomingMessageHandler(i, handleReply, stop, logger)
4242
if err := startReplicaConnection(outHandler, inHandler, connector); err != nil {
4343
return fmt.Errorf("error connecting to replica %d: %s", i, err)
4444
}
@@ -68,43 +68,59 @@ func startReplicaConnection(outHandler outgoingMessageHandler, inHandler incomin
6868
return err
6969
}
7070

71-
go outHandler(out)
71+
go func() {
72+
defer close(out)
73+
outHandler(out)
74+
}()
7275
go inHandler(in)
7376

7477
return nil
7578
}
7679

7780
// makeOutgoingMessageHandler construct an outgoingMessageHandler
7881
// using the supplied request buffer as a source of outgoing messages.
79-
func makeOutgoingMessageHandler(buf *requestbuffer.T) outgoingMessageHandler {
82+
func makeOutgoingMessageHandler(buf *requestbuffer.T, stop <-chan struct{}) outgoingMessageHandler {
8083
return func(out chan<- []byte) {
81-
for req := range buf.RequestStream(nil) {
84+
for req := range buf.RequestStream(stop) {
8285
mBytes, err := req.MarshalBinary()
8386
if err != nil {
8487
panic(err)
8588
}
86-
out <- mBytes
89+
select {
90+
case out <- mBytes:
91+
case <-stop:
92+
return
93+
}
8794
}
8895
}
8996
}
9097

9198
// makeIncomingMessageHandler constructs an incomingMessageHandler
9299
// using replicaID as the ID of replica which supplies incoming
93100
// messages, and the passed abstraction to handle Reply messages.
94-
func makeIncomingMessageHandler(replicaID uint32, handleReply replyMessageHandler, logger logger.Logger) incomingMessageHandler {
101+
func makeIncomingMessageHandler(replicaID uint32, handleReply replyMessageHandler, stop <-chan struct{}, logger logger.Logger) incomingMessageHandler {
95102
return func(in <-chan []byte) {
96-
for msgBytes := range in {
97-
msg, err := messageImpl.NewFromBinary(msgBytes)
98-
if err != nil {
99-
logger.Warningf("Error unmarshaling message from replica %d: %v", replicaID, err)
100-
continue
101-
}
102-
103-
switch msg := msg.(type) {
104-
case messages.Reply:
105-
handleReply(msg)
106-
default:
107-
logger.Warningf("Received unknown message from replica %d", replicaID)
103+
for {
104+
select {
105+
case msgBytes, ok := <-in:
106+
if !ok {
107+
return
108+
}
109+
110+
msg, err := messageImpl.NewFromBinary(msgBytes)
111+
if err != nil {
112+
logger.Warningf("Error unmarshaling message from replica %d: %v", replicaID, err)
113+
continue
114+
}
115+
116+
switch msg := msg.(type) {
117+
case messages.Reply:
118+
handleReply(msg)
119+
default:
120+
logger.Warningf("Received unknown message from replica %d", replicaID)
121+
}
122+
case <-stop:
123+
return
108124
}
109125
}
110126
}

client/request.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ type requestHandler func(operation []byte) <-chan []byte
3535
// makeRequestHandler constructs a requestHandler uisng the supplied
3636
// clientID, sequence number generator, authenticator, request buffer,
3737
// and number of tolerated faulty replicas.
38-
func makeRequestHandler(clientID uint32, seq sequenceGenerator, authen api.Authenticator, buf *requestbuffer.T, f uint32, logger logger.Logger) requestHandler {
39-
submitter := makeRequestSubmitter(clientID, seq, authen, buf, logger)
38+
func makeRequestHandler(clientID uint32, seq sequenceGenerator, authen api.Authenticator, buf *requestbuffer.T, f uint32, stop <-chan struct{}, logger logger.Logger) requestHandler {
39+
submitter := makeRequestSubmitter(clientID, seq, authen, buf, stop, logger)
4040
collector := makeReplyCollector(f, buf)
4141
return func(operation []byte) <-chan []byte {
4242
return handleRequest(operation, submitter, collector)
@@ -59,7 +59,10 @@ type replyCollector func(in <-chan messages.Reply, out chan<- []byte)
5959
func handleRequest(operation []byte, submitter requestSubmitter, collector replyCollector) <-chan []byte {
6060
resultChan := make(chan []byte, 1)
6161
replyChan := submitter(operation)
62-
go collector(replyChan, resultChan)
62+
go func() {
63+
defer close(resultChan)
64+
collector(replyChan, resultChan)
65+
}()
6366
return resultChan
6467
}
6568

@@ -92,7 +95,7 @@ func collectReplies(f uint32, replyChan <-chan messages.Reply, remover requestRe
9295
if matchingResults[hash] > f {
9396
remover(reply.Sequence())
9497
resultChan <- result
95-
break
98+
return
9699
}
97100
}
98101
}
@@ -112,9 +115,9 @@ type sequenceGenerator func() uint64
112115
// makeRequestSubmitter constructs a requestSubmitter using the
113116
// supplied client ID, sequence number generator, authenticator and
114117
// request buffer to add the produced Request message to.
115-
func makeRequestSubmitter(clientID uint32, seq sequenceGenerator, authen api.Authenticator, buf *requestbuffer.T, logger logger.Logger) requestSubmitter {
118+
func makeRequestSubmitter(clientID uint32, seq sequenceGenerator, authen api.Authenticator, buf *requestbuffer.T, stop <-chan struct{}, logger logger.Logger) requestSubmitter {
116119
preparer := makeRequestPreparer(clientID, authen, seq, logger)
117-
consumer := makeRequestConsumer(buf)
120+
consumer := makeRequestConsumer(buf, stop)
118121
return func(operation []byte) <-chan messages.Reply {
119122
return submitRequest(operation, preparer, consumer)
120123
}
@@ -157,9 +160,9 @@ func makeRequestPreparer(clientID uint32, authenticator api.Authenticator, seq s
157160
// makeRequestConsumer constructs a requestConsumer using the supplied
158161
// request buffer to add the Request message to and retrieve
159162
// corresponding Reply messages from
160-
func makeRequestConsumer(buf *requestbuffer.T) requestConsumer {
163+
func makeRequestConsumer(buf *requestbuffer.T, stop <-chan struct{}) requestConsumer {
161164
return func(request messages.Request) (<-chan messages.Reply, bool) {
162-
return buf.AddRequest(request)
165+
return buf.AddRequest(request, stop)
163166
}
164167
}
165168

core/integration_test.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ type testClientStack struct {
5757
}
5858

5959
var (
60-
replicas []api.Replica
60+
replicas []minbft.Replica
6161
replicaStacks []*testReplicaStack
6262

6363
clients []cl.Client
@@ -142,6 +142,15 @@ func initTestnetPeers(numReplica int, numClient int) {
142142
makeClients(numClient, testKeys, cfg)
143143
}
144144

145+
func teardownTestnet() {
146+
for _, client := range clients {
147+
client.Terminate()
148+
}
149+
for _, replica := range replicas {
150+
replica.Terminate()
151+
}
152+
}
153+
145154
// Initialize a given number of replica instances.
146155
func makeReplicas(numReplica int, testKeys []byte, cfg api.Configer) {
147156
// replica stubs
@@ -207,6 +216,7 @@ func testAcceptOneRequest(t *testing.T) {
207216
for _, stack := range replicaStacks {
208217
assert.Equal(t, uint64(1), stack.SimpleLedger.GetLength())
209218
}
219+
210220
}
211221

212222
func TestIntegration(t *testing.T) {

0 commit comments

Comments
 (0)