@@ -15,8 +15,28 @@ import (
1515
1616// chatServer enables broadcasting to a set of subscribers.
1717type chatServer struct {
18+ registerOnce sync.Once
19+ m http.ServeMux
20+
1821 subscribersMu sync.RWMutex
19- subscribers map [chan <- []byte ]struct {}
22+ subscribers map [* subscriber ]struct {}
23+ }
24+
25+ // subscriber represents a subscriber.
26+ // Messages are sent on the msgs channel and if the client
27+ // cannot keep up with the messages, closeSlow is called.
28+ type subscriber struct {
29+ msgs chan []byte
30+ closeSlow func ()
31+ }
32+
33+ func (cs * chatServer ) ServeHTTP (w http.ResponseWriter , r * http.Request ) {
34+ cs .registerOnce .Do (func () {
35+ cs .m .Handle ("/" , http .FileServer (http .Dir ("." )))
36+ cs .m .HandleFunc ("/subscribe" , cs .subscribeHandler )
37+ cs .m .HandleFunc ("/publish" , cs .publishHandler )
38+ })
39+ cs .m .ServeHTTP (w , r )
2040}
2141
2242// subscribeHandler accepts the WebSocket connection and then subscribes
@@ -57,11 +77,13 @@ func (cs *chatServer) publishHandler(w http.ResponseWriter, r *http.Request) {
5777 }
5878
5979 cs .publish (msg )
80+
81+ w .WriteHeader (http .StatusAccepted )
6082}
6183
6284// subscribe subscribes the given WebSocket to all broadcast messages.
63- // It creates a msgs chan with a buffer of 16 to give some room to slower
64- // connections and then registers it . It then listens for all messages
85+ // It creates a subscriber with a buffered msgs chan to give some room to slower
86+ // connections and then registers the subscriber . It then listens for all messages
6587// and writes them to the WebSocket. If the context is cancelled or
6688// an error occurs, it returns and deletes the subscription.
6789//
@@ -70,13 +92,18 @@ func (cs *chatServer) publishHandler(w http.ResponseWriter, r *http.Request) {
7092func (cs * chatServer ) subscribe (ctx context.Context , c * websocket.Conn ) error {
7193 ctx = c .CloseRead (ctx )
7294
73- msgs := make (chan []byte , 16 )
74- cs .addSubscriber (msgs )
75- defer cs .deleteSubscriber (msgs )
95+ s := & subscriber {
96+ msgs : make (chan []byte , 16 ),
97+ closeSlow : func () {
98+ c .Close (websocket .StatusPolicyViolation , "connection too slow to keep up with messages" )
99+ },
100+ }
101+ cs .addSubscriber (s )
102+ defer cs .deleteSubscriber (s )
76103
77104 for {
78105 select {
79- case msg := <- msgs :
106+ case msg := <- s . msgs :
80107 err := writeTimeout (ctx , time .Second * 5 , c , msg )
81108 if err != nil {
82109 return err
@@ -94,29 +121,29 @@ func (cs *chatServer) publish(msg []byte) {
94121 cs .subscribersMu .RLock ()
95122 defer cs .subscribersMu .RUnlock ()
96123
97- for c := range cs .subscribers {
124+ for s := range cs .subscribers {
98125 select {
99- case c <- msg :
126+ case s . msgs <- msg :
100127 default :
128+ go s .closeSlow ()
101129 }
102130 }
103131}
104132
105- // addSubscriber registers a subscriber with a channel
106- // on which to send messages.
107- func (cs * chatServer ) addSubscriber (msgs chan <- []byte ) {
133+ // addSubscriber registers a subscriber.
134+ func (cs * chatServer ) addSubscriber (s * subscriber ) {
108135 cs .subscribersMu .Lock ()
109136 if cs .subscribers == nil {
110- cs .subscribers = make (map [chan <- [] byte ]struct {})
137+ cs .subscribers = make (map [* subscriber ]struct {})
111138 }
112- cs .subscribers [msgs ] = struct {}{}
139+ cs .subscribers [s ] = struct {}{}
113140 cs .subscribersMu .Unlock ()
114141}
115142
116- // deleteSubscriber deletes the subscriber with the given msgs channel .
117- func (cs * chatServer ) deleteSubscriber (msgs chan [] byte ) {
143+ // deleteSubscriber deletes the given subscriber .
144+ func (cs * chatServer ) deleteSubscriber (s * subscriber ) {
118145 cs .subscribersMu .Lock ()
119- delete (cs .subscribers , msgs )
146+ delete (cs .subscribers , s )
120147 cs .subscribersMu .Unlock ()
121148}
122149
0 commit comments