@@ -23,12 +23,15 @@ import (
2323 "strings"
2424 "sync"
2525 "sync/atomic"
26+ "time"
2627
2728 "github.com/digitalocean/go-openvswitch/ovsdb/internal/jsonrpc"
2829)
2930
30- // A Client is an OVSDB client.
31+ // A Client is an OVSDB client. Clients can be customized by using OptionFuncs
32+ // in the Dial and New functions.
3133type Client struct {
34+ // The RPC connection, and its logger.
3235 c * jsonrpc.Conn
3336 ll * log.Logger
3437
@@ -39,7 +42,14 @@ type Client struct {
3942 cbMu sync.RWMutex
4043 callbacks map [int ]callback
4144
42- wg * sync.WaitGroup
45+ // Interval at which echo RPCs should occur in the background, and statistics
46+ // about the echo loop.
47+ echoInterval time.Duration
48+ echoOK , echoFail * int64
49+
50+ // Track and clean up background goroutines.
51+ cancel func ()
52+ wg * sync.WaitGroup
4353}
4454
4555// An OptionFunc is a function which can configure a Client.
@@ -53,6 +63,18 @@ func Debug(ll *log.Logger) OptionFunc {
5363 }
5464}
5565
66+ // EchoInterval specifies an interval at which the Client will send
67+ // echo RPCs to an OVSDB server to keep the connection alive. If this
68+ // option is not used, the Client will not send any echo RPCs on its own.
69+ //
70+ // Specify a duration of 0 to disable sending background echo RPCs.
71+ func EchoInterval (d time.Duration ) OptionFunc {
72+ return func (c * Client ) error {
73+ c .echoInterval = d
74+ return nil
75+ }
76+ }
77+
5678// Dial dials a connection to an OVSDB server and returns a Client.
5779func Dial (network , addr string , options ... OptionFunc ) (* Client , error ) {
5880 conn , err := net .Dial (network , addr )
@@ -82,10 +104,26 @@ func New(conn net.Conn, options ...OptionFunc) (*Client, error) {
82104 // Set up callbacks.
83105 client .callbacks = make (map [int ]callback )
84106
85- // Start up any background routines.
107+ // Start up any background routines, and enable canceling them via context.
108+ ctx , cancel := context .WithCancel (context .Background ())
109+ client .cancel = cancel
110+
86111 var wg sync.WaitGroup
87112 wg .Add (1 )
88113
114+ // If configured, send echo RPCs in the background at a fixed interval.
115+ var echoOK , echoFail int64
116+ client .echoOK = & echoOK
117+ client .echoFail = & echoFail
118+
119+ if d := client .echoInterval ; d != 0 {
120+ wg .Add (1 )
121+ go func () {
122+ defer wg .Done ()
123+ client .echoLoop (ctx , d )
124+ }()
125+ }
126+
89127 // Handle all incoming RPC responses and notifications.
90128 go func () {
91129 defer wg .Done ()
@@ -102,8 +140,9 @@ func (c *Client) requestID() int {
102140 return int (atomic .AddInt64 (c .rpcID , 1 ))
103141}
104142
105- // Close closes a Client's connection.
143+ // Close closes a Client's connection and cleans up its resources .
106144func (c * Client ) Close () error {
145+ c .cancel ()
107146 err := c .c .Close ()
108147 c .wg .Wait ()
109148 return err
@@ -114,9 +153,11 @@ func (c *Client) Stats() ClientStats {
114153 var s ClientStats
115154
116155 c .cbMu .RLock ()
117- defer c .cbMu .RUnlock ()
118-
119156 s .Callbacks .Current = len (c .callbacks )
157+ c .cbMu .RUnlock ()
158+
159+ s .EchoLoop .Success = int (atomic .LoadInt64 (c .echoOK ))
160+ s .EchoLoop .Failure = int (atomic .LoadInt64 (c .echoFail ))
120161
121162 return s
122163}
@@ -129,6 +170,13 @@ type ClientStats struct {
129170 // for RPC responses.
130171 Current int
131172 }
173+
174+ // Statistics about the Client's internal echo RPC loop.
175+ // Note that all counters will be zero if the echo loop is disabled.
176+ EchoLoop struct {
177+ // The number of successful and failed echo RPCs in the loop.
178+ Success , Failure int
179+ }
132180}
133181
134182// rpc performs a single RPC request, and checks the response for errors.
@@ -204,7 +252,7 @@ func (c *Client) listen() {
204252 res , err := c .c .Receive ()
205253 if err != nil {
206254 // EOF or closed connection means time to stop serving.
207- if err == io .EOF || strings . Contains (err . Error (), "use of closed network" ) {
255+ if err == io .EOF || isClosedNetwork (err ) {
208256 return
209257 }
210258
@@ -229,6 +277,43 @@ func (c *Client) listen() {
229277 }
230278}
231279
280+ // echoLoop starts a loop that sends echo RPCs at the interval defined by d.
281+ func (c * Client ) echoLoop (ctx context.Context , d time.Duration ) {
282+ t := time .NewTicker (d )
283+ defer t .Stop ()
284+
285+ for {
286+ // If context is canceled, we should exit this loop. If a tick is fired
287+ // and the context was already canceled, we exit there as well.
288+ select {
289+ case <- ctx .Done ():
290+ return
291+ case <- t .C :
292+ if err := ctx .Err (); err != nil {
293+ return
294+ }
295+ }
296+
297+ // For the time being, we will track metrics about the number of successes
298+ // and failures while sending echo RPCs.
299+ // TODO(mdlayher): improve error handling for echo loop.
300+ if err := c .Echo (ctx ); err != nil {
301+ if isClosedNetwork (err ) {
302+ // Our socket was closed, which means the context should be canceled
303+ // and we should terminate on the next loop. No need to increment
304+ // errors counter.
305+ continue
306+ }
307+
308+ // Count other errors as failures.
309+ atomic .AddInt64 (c .echoFail , 1 )
310+ continue
311+ }
312+
313+ atomic .AddInt64 (c .echoOK , 1 )
314+ }
315+ }
316+
232317// A callback can be used to send a message back to a caller, or
233318// allow the caller to cancel waiting for a message.
234319type callback struct {
@@ -275,6 +360,16 @@ func (c *Client) doCallback(id int, res rpcResponse) {
275360 delete (c .callbacks , id )
276361}
277362
363+ // isClosedNetwork checks for errors caused by a closed network connection.
364+ func isClosedNetwork (err error ) bool {
365+ if err == nil {
366+ return false
367+ }
368+
369+ // Not an awesome solution, but see: https://github.com/golang/go/issues/4373.
370+ return strings .Contains (err .Error (), "use of closed network connection" )
371+ }
372+
278373func panicf (format string , a ... interface {}) {
279374 panic (fmt .Sprintf (format , a ... ))
280375}
0 commit comments