Skip to content

Commit 4aa9da7

Browse files
authored
Allow multiplexing multiple monitor connections (#549)
1 parent b01c438 commit 4aa9da7

File tree

3 files changed

+269
-63
lines changed

3 files changed

+269
-63
lines changed

.licenses/arduino-router/NOTICE

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,102 @@ SOFTWARE.
2929

3030
Released under the [MIT license](LICENSE).
3131

32+
*****
33+
github.com/djherbis/buffer@v1.2.0
34+
35+
The MIT License (MIT)
36+
37+
Copyright (c) 2015 Dustin H
38+
39+
Permission is hereby granted, free of charge, to any person obtaining a copy of
40+
this software and associated documentation files (the "Software"), to deal in
41+
the Software without restriction, including without limitation the rights to
42+
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
43+
the Software, and to permit persons to whom the Software is furnished to do so,
44+
subject to the following conditions:
45+
46+
The above copyright notice and this permission notice shall be included in all
47+
copies or substantial portions of the Software.
48+
49+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
50+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
51+
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
52+
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
53+
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
54+
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
55+
56+
*****
57+
github.com/djherbis/buffer/limio@v1.2.0
58+
59+
The MIT License (MIT)
60+
61+
Copyright (c) 2015 Dustin H
62+
63+
Permission is hereby granted, free of charge, to any person obtaining a copy of
64+
this software and associated documentation files (the "Software"), to deal in
65+
the Software without restriction, including without limitation the rights to
66+
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
67+
the Software, and to permit persons to whom the Software is furnished to do so,
68+
subject to the following conditions:
69+
70+
The above copyright notice and this permission notice shall be included in all
71+
copies or substantial portions of the Software.
72+
73+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
74+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
75+
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
76+
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
77+
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
78+
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
79+
80+
*****
81+
github.com/djherbis/buffer/wrapio@v1.2.0
82+
83+
The MIT License (MIT)
84+
85+
Copyright (c) 2015 Dustin H
86+
87+
Permission is hereby granted, free of charge, to any person obtaining a copy of
88+
this software and associated documentation files (the "Software"), to deal in
89+
the Software without restriction, including without limitation the rights to
90+
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
91+
the Software, and to permit persons to whom the Software is furnished to do so,
92+
subject to the following conditions:
93+
94+
The above copyright notice and this permission notice shall be included in all
95+
copies or substantial portions of the Software.
96+
97+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
98+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
99+
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
100+
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
101+
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
102+
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
103+
104+
*****
105+
github.com/djherbis/nio/v3@v3.0.1
106+
107+
The MIT License (MIT)
108+
109+
Copyright (c) 2015 Dustin H
110+
111+
Permission is hereby granted, free of charge, to any person obtaining a copy of
112+
this software and associated documentation files (the "Software"), to deal in
113+
the Software without restriction, including without limitation the rights to
114+
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
115+
the Software, and to permit persons to whom the Software is furnished to do so,
116+
subject to the following conditions:
117+
118+
The above copyright notice and this permission notice shall be included in all
119+
copies or substantial portions of the Software.
120+
121+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
122+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
123+
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
124+
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
125+
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
126+
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
127+
32128
*****
33129
github.com/spf13/cobra@v1.9.1
34130

cmd/arduino-router/monitorapi/monitor-api.go

Lines changed: 77 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -2,28 +2,35 @@ package monitorapi
22

33
import (
44
"context"
5-
"errors"
65
"fmt"
76
"log/slog"
87
"net"
9-
"os"
108
"sync"
9+
"sync/atomic"
1110
"time"
1211

12+
"github.com/djherbis/buffer"
13+
"github.com/djherbis/nio/v3"
14+
1315
"github.com/bcmi-labs/orchestrator/cmd/arduino-router/msgpackrouter"
1416
"github.com/bcmi-labs/orchestrator/cmd/arduino-router/msgpackrpc"
1517
)
1618

17-
var lock sync.RWMutex
18-
var socket net.Conn
19-
var monitorConnectionLost sync.Cond = *sync.NewCond(&lock)
19+
var socketsLock sync.RWMutex
20+
var sockets map[net.Conn]struct{}
21+
var monSendPipeRd *nio.PipeReader
22+
var monSendPipeWr *nio.PipeWriter
23+
var bytesInSendPipe atomic.Int64
2024

2125
// Register the Monitor API methods
2226
func Register(router *msgpackrouter.Router, addr string) error {
2327
listener, err := net.Listen("tcp", addr)
2428
if err != nil {
2529
return fmt.Errorf("failed to start listener: %w", err)
2630
}
31+
sockets = make(map[net.Conn]struct{})
32+
monSendPipeRd, monSendPipeWr = nio.Pipe(buffer.New(1024))
33+
2734
go connectionHandler(listener)
2835
_ = router.RegisterMethod("mon/connected", connected)
2936
_ = router.RegisterMethod("mon/read", read)
@@ -41,15 +48,26 @@ func connectionHandler(listener net.Listener) {
4148
}
4249

4350
slog.Info("Accepted monitor connection", "from", conn.RemoteAddr())
44-
lock.Lock()
45-
socket = conn
46-
lock.Unlock()
47-
48-
lock.Lock()
49-
for socket != nil {
50-
monitorConnectionLost.Wait()
51-
}
52-
lock.Unlock()
51+
socketsLock.Lock()
52+
sockets[conn] = struct{}{}
53+
socketsLock.Unlock()
54+
55+
go func() {
56+
defer close(conn)
57+
58+
// Read from the connection and write to the monitor send pipe
59+
buff := make([]byte, 1024)
60+
for {
61+
if n, err := conn.Read(buff); err != nil {
62+
// Connection closed from client
63+
return
64+
} else if written, err := monSendPipeWr.Write(buff[:n]); err != nil {
65+
return
66+
} else {
67+
bytesInSendPipe.Add(int64(written))
68+
}
69+
}
70+
}()
5371
}
5472
}
5573

@@ -58,9 +76,9 @@ func connected(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_
5876
return nil, []any{1, "Invalid number of parameters, expected no parameters"}
5977
}
6078

61-
lock.RLock()
62-
connected := socket != nil
63-
lock.RUnlock()
79+
socketsLock.RLock()
80+
connected := len(sockets) > 0
81+
socketsLock.RUnlock()
6482

6583
return connected, nil
6684
}
@@ -74,33 +92,18 @@ func read(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_resul
7492
return nil, []any{1, "Invalid parameter type, expected positive int for max bytes to read"}
7593
}
7694

77-
lock.RLock()
78-
conn := socket
79-
lock.RUnlock()
80-
81-
// No active connection, return empty slice
82-
if conn == nil {
95+
if bytesInSendPipe.Load() == 0 {
8396
return []byte{}, nil
8497
}
8598

8699
buffer := make([]byte, maxBytes)
87-
// It seems that the only way to make a non-blocking read is to set a read deadline.
88-
// BTW setting the read deadline to time.Now() will always returns an empty (zero bytes)
89-
// read, so we set it to a very short duration in the future.
90-
if err := conn.SetReadDeadline(time.Now().Add(time.Millisecond)); err != nil {
91-
return nil, []any{3, "Failed to set read timeout: " + err.Error()}
92-
}
93-
n, err := conn.Read(buffer)
94-
if errors.Is(err, os.ErrDeadlineExceeded) {
95-
// timeout
96-
} else if err != nil {
97-
// If we get an error other than timeout, we assume the connection is lost.
98-
slog.Error("Monitor connection lost, closing connection", "error", err)
99-
close()
100+
if readed, err := monSendPipeRd.Read(buffer); err != nil {
101+
slog.Error("Error reading monitor", "error", err)
100102
return nil, []any{3, "Failed to read from connection: " + err.Error()}
103+
} else {
104+
bytesInSendPipe.Add(int64(-readed))
105+
return buffer[:readed], nil
101106
}
102-
103-
return buffer[:n], nil
104107
}
105108

106109
func write(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_result any, _err any) {
@@ -117,41 +120,52 @@ func write(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_resu
117120
}
118121
}
119122

120-
lock.RLock()
121-
conn := socket
122-
lock.RUnlock()
123-
124-
if conn == nil { // No active connection, drop the data
125-
return len(data), nil
123+
socketsLock.RLock()
124+
clients := make([]net.Conn, 0, len(sockets))
125+
for c := range sockets {
126+
clients = append(clients, c)
126127
}
128+
socketsLock.RUnlock()
127129

128-
n, err := conn.Write(data)
129-
if err != nil {
130-
// If we get an error, we assume the connection is lost.
131-
slog.Error("Monitor connection lost, closing connection", "error", err)
132-
close()
133-
134-
return nil, []any{3, "Failed to write to connection: " + err.Error()}
130+
for _, conn := range clients {
131+
if len(clients) > 1 {
132+
// If there are multiple clients, allow 500 ms for the data to
133+
// get through each one.
134+
_ = conn.SetWriteDeadline(time.Now().Add(time.Millisecond * 500))
135+
} else {
136+
_ = conn.SetWriteDeadline(time.Time{})
137+
}
138+
if _, err := conn.Write(data); err != nil {
139+
// If we get an error, we assume the connection is lost.
140+
slog.Error("Monitor connection lost, closing connection", "error", err)
141+
close(conn)
142+
}
135143
}
136144

137-
return n, nil
145+
return len(data), nil
146+
}
147+
148+
func close(conn net.Conn) {
149+
socketsLock.Lock()
150+
delete(sockets, conn)
151+
socketsLock.Unlock()
152+
_ = conn.Close()
138153
}
139154

140155
func reset(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_result any, _err any) {
141156
if len(params) != 0 {
142157
return nil, []any{1, "Invalid number of parameters, expected no parameters"}
143158
}
144-
close()
145-
slog.Info("Monitor connection reset")
146-
return true, nil
147-
}
148159

149-
func close() {
150-
lock.Lock()
151-
if socket != nil {
152-
_ = socket.Close()
160+
socketsLock.Lock()
161+
socketsToClose := sockets
162+
sockets = make(map[net.Conn]struct{})
163+
socketsLock.Unlock()
164+
165+
for c := range socketsToClose {
166+
_ = c.Close()
153167
}
154-
socket = nil
155-
monitorConnectionLost.Broadcast()
156-
lock.Unlock()
168+
169+
slog.Info("Monitor connection reset")
170+
return true, nil
157171
}

0 commit comments

Comments
 (0)