1212use React \Promise \PromiseInterface ;
1313use stdClass ;
1414
15- class RedisClient implements ReplicationInterface
15+ class RedisClient extends LocalClient
1616{
1717 /**
1818 * The running loop.
@@ -90,49 +90,29 @@ public function boot(LoopInterface $loop, $factoryClass = null): ReplicationInte
9090 }
9191
9292 /**
93- * Handle a message received from Redis on a specific channel .
93+ * Publish a message to a channel on behalf of a websocket user .
9494 *
95- * @param string $redisChannel
96- * @param string $payload
97- * @return void
95+ * @param string $appId
96+ * @param string $channel
97+ * @param stdClass $payload
98+ * @return bool
9899 */
99- protected function onMessage (string $ redisChannel , string $ payload )
100+ public function publish (string $ appId , string $ channel , stdClass $ payload ): bool
100101 {
101- $ payload = json_decode ($ payload );
102-
103- // Ignore messages sent by ourselves.
104- if (isset ($ payload ->serverId ) && $ this ->serverId === $ payload ->serverId ) {
105- return ;
106- }
107-
108- // Pull out the app ID. See RedisPusherBroadcaster
109- $ appId = $ payload ->appId ;
110-
111- // We need to put the channel name in the payload.
112- // We strip the app ID from the channel name, websocket clients
113- // expect the channel name to not include the app ID.
114- $ payload ->channel = Str::after ($ redisChannel , "{$ appId }: " );
115-
116- $ channelManager = app (ChannelManager::class);
117-
118- // Load the Channel instance to sync.
119- $ channel = $ channelManager ->find ($ appId , $ payload ->channel );
102+ $ payload ->appId = $ appId ;
103+ $ payload ->serverId = $ this ->getServerId ();
120104
121- // If no channel is found, none of our connections want to
122- // receive this message, so we ignore it.
123- if (! $ channel ) {
124- return ;
125- }
105+ $ payload = json_encode ($ payload );
126106
127- $ socket = $ payload -> socket ?? null ;
107+ $ this -> publishClient -> __call ( ' publish ' , [ " $ appId : $ channel " , $ payload ]) ;
128108
129- // Remove fields intended for internal use from the payload.
130- unset($ payload ->socket );
131- unset($ payload ->serverId );
132- unset($ payload ->appId );
109+ DashboardLogger::log ($ appId , DashboardLogger::TYPE_REPLICATOR_MESSAGE_PUBLISHED , [
110+ 'channel ' => $ channel ,
111+ 'serverId ' => $ this ->getServerId (),
112+ 'payload ' => $ payload ,
113+ ]);
133114
134- // Push the message out to connected websocket clients.
135- $ channel ->broadcastToEveryoneExcept ($ payload , $ socket , $ appId , false );
115+ return true ;
136116 }
137117
138118 /**
@@ -153,7 +133,10 @@ public function subscribe(string $appId, string $channel): bool
153133 $ this ->subscribedChannels ["$ appId: $ channel " ]++;
154134 }
155135
156- DashboardLogger::replicatorSubscribed ($ appId , $ channel , $ this ->serverId );
136+ DashboardLogger::log ($ appId , DashboardLogger::TYPE_REPLICATOR_SUBSCRIBED , [
137+ 'channel ' => $ channel ,
138+ 'serverId ' => $ this ->getServerId (),
139+ ]);
157140
158141 return true ;
159142 }
@@ -181,25 +164,10 @@ public function unsubscribe(string $appId, string $channel): bool
181164 unset($ this ->subscribedChannels ["$ appId: $ channel " ]);
182165 }
183166
184- DashboardLogger::replicatorUnsubscribed ($ appId , $ channel , $ this ->serverId );
185-
186- return true ;
187- }
188-
189- /**
190- * Publish a message to a channel on behalf of a websocket user.
191- *
192- * @param string $appId
193- * @param string $channel
194- * @param stdClass $payload
195- * @return bool
196- */
197- public function publish (string $ appId , string $ channel , stdClass $ payload ): bool
198- {
199- $ payload ->appId = $ appId ;
200- $ payload ->serverId = $ this ->serverId ;
201-
202- $ this ->publishClient ->__call ('publish ' , ["$ appId: $ channel " , json_encode ($ payload )]);
167+ DashboardLogger::log ($ appId , DashboardLogger::TYPE_REPLICATOR_UNSUBSCRIBED , [
168+ 'channel ' => $ channel ,
169+ 'serverId ' => $ this ->getServerId (),
170+ ]);
203171
204172 return true ;
205173 }
@@ -217,6 +185,13 @@ public function publish(string $appId, string $channel, stdClass $payload): bool
217185 public function joinChannel (string $ appId , string $ channel , string $ socketId , string $ data )
218186 {
219187 $ this ->publishClient ->__call ('hset ' , ["$ appId: $ channel " , $ socketId , $ data ]);
188+
189+ DashboardLogger::log ($ appId , DashboardLogger::TYPE_REPLICATOR_JOINED_CHANNEL , [
190+ 'channel ' => $ channel ,
191+ 'serverId ' => $ this ->getServerId (),
192+ 'socketId ' => $ socketId ,
193+ 'data ' => $ data ,
194+ ]);
220195 }
221196
222197 /**
@@ -231,6 +206,12 @@ public function joinChannel(string $appId, string $channel, string $socketId, st
231206 public function leaveChannel (string $ appId , string $ channel , string $ socketId )
232207 {
233208 $ this ->publishClient ->__call ('hdel ' , ["$ appId: $ channel " , $ socketId ]);
209+
210+ DashboardLogger::log ($ appId , DashboardLogger::TYPE_REPLICATOR_LEFT_CHANNEL , [
211+ 'channel ' => $ channel ,
212+ 'serverId ' => $ this ->getServerId (),
213+ 'socketId ' => $ socketId ,
214+ ]);
234215 }
235216
236217 /**
@@ -272,6 +253,62 @@ public function channelMemberCounts(string $appId, array $channelNames): Promise
272253 });
273254 }
274255
256+ /**
257+ * Handle a message received from Redis on a specific channel.
258+ *
259+ * @param string $redisChannel
260+ * @param string $payload
261+ * @return void
262+ */
263+ protected function onMessage (string $ redisChannel , string $ payload )
264+ {
265+ $ payload = json_decode ($ payload );
266+
267+ // Ignore messages sent by ourselves.
268+ if (isset ($ payload ->serverId ) && $ this ->getServerId () === $ payload ->serverId ) {
269+ return ;
270+ }
271+
272+ // Pull out the app ID. See RedisPusherBroadcaster
273+ $ appId = $ payload ->appId ;
274+
275+ // We need to put the channel name in the payload.
276+ // We strip the app ID from the channel name, websocket clients
277+ // expect the channel name to not include the app ID.
278+ $ payload ->channel = Str::after ($ redisChannel , "{$ appId }: " );
279+
280+ $ channelManager = app (ChannelManager::class);
281+
282+ // Load the Channel instance to sync.
283+ $ channel = $ channelManager ->find ($ appId , $ payload ->channel );
284+
285+ // If no channel is found, none of our connections want to
286+ // receive this message, so we ignore it.
287+ if (! $ channel ) {
288+ return ;
289+ }
290+
291+ $ socket = $ payload ->socket ?? null ;
292+ $ serverId = $ payload ->serverId ?? null ;
293+
294+ // Remove fields intended for internal use from the payload.
295+ unset($ payload ->socket );
296+ unset($ payload ->serverId );
297+ unset($ payload ->appId );
298+
299+ // Push the message out to connected websocket clients.
300+ $ channel ->broadcastToEveryoneExcept ($ payload , $ socket , $ appId , false );
301+
302+ DashboardLogger::log ($ appId , DashboardLogger::TYPE_REPLICATOR_MESSAGE_RECEIVED , [
303+ 'channel ' => $ channel ->getChannelName (),
304+ 'redisChannel ' => $ redisChannel ,
305+ 'serverId ' => $ this ->getServer (),
306+ 'incomingServerId ' => $ serverId ,
307+ 'incomingSocketId ' => $ socket ,
308+ 'payload ' => $ payload ,
309+ ]);
310+ }
311+
275312 /**
276313 * Build the Redis connection URL from Laravel database config.
277314 *
0 commit comments