@@ -65,10 +65,15 @@ func Debug(ll *log.Logger) OptionFunc {
6565}
6666
6767// EchoInterval specifies an interval at which the Client will send
68- // echo RPCs to an OVSDB server to keep the connection alive. If this
69- // option is not used, the Client will not send any echo RPCs on its own.
68+ // echo RPCs to an OVSDB server to keep the connection alive. Note that the
69+ // OVSDB server may also send its own echo RPCs to the Client, and the Client
70+ // will always reply to those on behalf of the user.
7071//
71- // Specify a duration of 0 to disable sending background echo RPCs.
72+ // If this option is not used, the Client will only send echo RPCs when the
73+ // server sends an echo RPC to it.
74+ //
75+ // Specify a duration of 0 to disable sending background echo RPCs at a
76+ // fixed interval.
7277func EchoInterval (d time.Duration ) OptionFunc {
7378 return func (c * Client ) error {
7479 c .echoInterval = d
@@ -105,30 +110,40 @@ func New(conn net.Conn, options ...OptionFunc) (*Client, error) {
105110 // Set up callbacks.
106111 client .callbacks = make (map [string ]callback )
107112
113+ // Set up echo loop statistics.
114+ var echoOK , echoFail int64
115+ client .echoOK = & echoOK
116+ client .echoFail = & echoFail
117+
118+ // Coordinates the sending of echo messages among multiple goroutines.
119+ echoC := make (chan struct {})
120+
108121 // Start up any background routines, and enable canceling them via context.
109122 ctx , cancel := context .WithCancel (context .Background ())
110123 client .cancel = cancel
111124
112125 var wg sync.WaitGroup
113- wg .Add (1 )
114-
115- // If configured, send echo RPCs in the background at a fixed interval.
116- var echoOK , echoFail int64
117- client .echoOK = & echoOK
118- client .echoFail = & echoFail
126+ wg .Add (2 )
119127
128+ // If configured, trigger echo RPCs in the background at a fixed interval.
120129 if d := client .echoInterval ; d != 0 {
121130 wg .Add (1 )
122131 go func () {
123132 defer wg .Done ()
124- client .echoLoop (ctx , d )
133+ client .echoTicker (ctx , d , echoC )
125134 }()
126135 }
127136
137+ // Send echo RPCs when triggered by channel.
138+ go func () {
139+ defer wg .Done ()
140+ client .echoLoop (ctx , echoC )
141+ }()
142+
128143 // Handle all incoming RPC responses and notifications.
129144 go func () {
130145 defer wg .Done ()
131- client .listen ()
146+ client .listen (ctx , echoC )
132147 }()
133148
134149 client .wg = & wg
@@ -175,7 +190,6 @@ type ClientStats struct {
175190 }
176191
177192 // Statistics about the Client's internal echo RPC loop.
178- // Note that all counters will be zero if the echo loop is disabled.
179193 EchoLoop struct {
180194 // The number of successful and failed echo RPCs in the loop.
181195 Success , Failure int
@@ -250,7 +264,7 @@ func (c *Client) rpc(ctx context.Context, method string, out interface{}, args .
250264
251265// listen starts an RPC receive loop that can return RPC results to
252266// clients via a callback.
253- func (c * Client ) listen () {
267+ func (c * Client ) listen (ctx context. Context , echoC chan <- struct {} ) {
254268 for {
255269 res , err := c .c .Receive ()
256270 if err != nil {
@@ -263,7 +277,20 @@ func (c *Client) listen() {
263277 continue
264278 }
265279
266- // TODO(mdlayher): deal with RPC notifications.
280+ // Handle any JSON-RPC notifications.
281+ // TODO(mdlayher): deal with other RPC notifications.
282+ switch res .Method {
283+ case "echo" :
284+ // OVSDB server wants us to send an echo to it, but will also send
285+ // us a response to that echo. Since this goroutine is the one that
286+ // needs to receive that response and issue the callback for it, we
287+ // ask the echo loop goroutine to send an echo on our behalf.
288+ select {
289+ case <- ctx .Done ():
290+ case echoC <- struct {}{}:
291+ }
292+ continue
293+ }
267294
268295 // Handle any JSON-RPC top-level errors.
269296 if err := res .Err (); err != nil {
@@ -280,13 +307,14 @@ func (c *Client) listen() {
280307 }
281308}
282309
283- // echoLoop starts a loop that sends echo RPCs at the interval defined by d.
284- func (c * Client ) echoLoop (ctx context.Context , d time.Duration ) {
310+ // echoTicker starts a loop that triggers echo RPCs via channel at a fixed
311+ // time interval.
312+ func (c * Client ) echoTicker (ctx context.Context , d time.Duration , echoC chan <- struct {}) {
285313 t := time .NewTicker (d )
286314 defer t .Stop ()
287315
288316 for {
289- // If context is canceled, we should exit this loop. If a tick is fired
317+ // If context is canceled, we should exit this loop. If a request is fired
290318 // and the context was already canceled, we exit there as well.
291319 select {
292320 case <- ctx .Done ():
@@ -297,6 +325,30 @@ func (c *Client) echoLoop(ctx context.Context, d time.Duration) {
297325 }
298326 }
299327
328+ // Allow producer to stop if context closed instead of blocking if
329+ // the consumer is stopped.
330+ select {
331+ case <- ctx .Done ():
332+ return
333+ case echoC <- struct {}{}:
334+ }
335+ }
336+ }
337+
338+ // echoLoop starts a loop that sends echo RPCs when requested via channel.
339+ func (c * Client ) echoLoop (ctx context.Context , echoC <- chan struct {}) {
340+ for {
341+ // If context is canceled, we should exit this loop. If a request is fired
342+ // and the context was already canceled, we exit there as well.
343+ select {
344+ case <- ctx .Done ():
345+ return
346+ case <- echoC :
347+ if err := ctx .Err (); err != nil {
348+ return
349+ }
350+ }
351+
300352 // For the time being, we will track metrics about the number of successes
301353 // and failures while sending echo RPCs.
302354 // TODO(mdlayher): improve error handling for echo loop.
0 commit comments