@@ -3,7 +3,7 @@ defmodule RealtimeWeb.RealtimeChannel do
33 Used for handling channels and subscriptions.
44 """
55 use RealtimeWeb , :channel
6- use Realtime.Logs
6+ use RealtimeWeb.RealtimeChannel.Logging
77
88 alias RealtimeWeb.SocketDisconnect
99 alias DBConnection.Backoff
@@ -23,7 +23,6 @@ defmodule RealtimeWeb.RealtimeChannel do
2323
2424 alias RealtimeWeb.ChannelsAuthorization
2525 alias RealtimeWeb.RealtimeChannel.BroadcastHandler
26- alias RealtimeWeb.RealtimeChannel.Logging
2726 alias RealtimeWeb.RealtimeChannel.MessageDispatcher
2827 alias RealtimeWeb.RealtimeChannel.PresenceHandler
2928 alias RealtimeWeb.RealtimeChannel.Tracker
@@ -32,7 +31,7 @@ defmodule RealtimeWeb.RealtimeChannel do
3231
3332 @ impl true
3433 def join ( "realtime:" , _params , socket ) do
35- Logging . log_error ( socket , "TopicNameRequired" , "You must provide a topic name" )
34+ log_error ( socket , "TopicNameRequired" , "You must provide a topic name" )
3635 end
3736
3837 def join ( "realtime:" <> sub_topic = topic , params , socket ) do
@@ -120,77 +119,77 @@ defmodule RealtimeWeb.RealtimeChannel do
120119 { :ok , state , assign ( socket , assigns ) }
121120 else
122121 { :error , :expired_token , msg } ->
123- Logging . maybe_log_warning ( socket , "InvalidJWTToken" , msg )
122+ maybe_log_warning ( socket , "InvalidJWTToken" , msg )
124123
125124 { :error , :missing_claims } ->
126125 msg = "Fields `role` and `exp` are required in JWT"
127- Logging . maybe_log_warning ( socket , "InvalidJWTToken" , msg )
126+ maybe_log_warning ( socket , "InvalidJWTToken" , msg )
128127
129128 { :error , :unauthorized , msg } ->
130- Logging . log_error ( socket , "Unauthorized" , msg )
129+ log_error ( socket , "Unauthorized" , msg )
131130
132131 { :error , :too_many_channels } ->
133132 msg = "Too many channels"
134- Logging . log_error ( socket , "ChannelRateLimitReached" , msg )
133+ log_error ( socket , "ChannelRateLimitReached" , msg )
135134
136135 { :error , :too_many_connections } ->
137136 msg = "Too many connected users"
138- Logging . log_error ( socket , "ConnectionRateLimitReached" , msg )
137+ log_error ( socket , "ConnectionRateLimitReached" , msg )
139138
140139 { :error , :too_many_joins } ->
141140 msg = "ClientJoinRateLimitReached: Too many joins per second"
142141 { :error , % { reason: msg } }
143142
144143 { :error , :increase_connection_pool } ->
145144 msg = "Please increase your connection pool size"
146- Logging . log_error ( socket , "IncreaseConnectionPool" , msg )
145+ log_error ( socket , "IncreaseConnectionPool" , msg )
147146
148147 { :error , :tenant_db_too_many_connections } ->
149148 msg = "Database can't accept more connections, Realtime won't connect"
150- Logging . log_error ( socket , "DatabaseLackOfConnections" , msg )
149+ log_error ( socket , "DatabaseLackOfConnections" , msg )
151150
152151 { :error , :unable_to_set_policies , error } ->
153- Logging . log_error ( socket , "UnableToSetPolicies" , error )
152+ log_error ( socket , "UnableToSetPolicies" , error )
154153 { :error , % { reason: "Realtime was unable to connect to the project database" } }
155154
156155 { :error , :tenant_database_unavailable } ->
157- Logging . log_error ( socket , "UnableToConnectToProject" , "Realtime was unable to connect to the project database" )
156+ log_error ( socket , "UnableToConnectToProject" , "Realtime was unable to connect to the project database" )
158157
159158 { :error , :rpc_error , :timeout } ->
160- Logging . log_error ( socket , "TimeoutOnRpcCall" , "Node request timeout" )
159+ log_error ( socket , "TimeoutOnRpcCall" , "Node request timeout" )
161160
162161 { :error , :rpc_error , reason } ->
163- Logging . log_error ( socket , "ErrorOnRpcCall" , "RPC call error: " <> inspect ( reason ) )
162+ log_error ( socket , "ErrorOnRpcCall" , "RPC call error: " <> inspect ( reason ) )
164163
165164 { :error , :initializing } ->
166- Logging . log_error ( socket , "InitializingProjectConnection" , "Realtime is initializing the project connection" )
165+ log_error ( socket , "InitializingProjectConnection" , "Realtime is initializing the project connection" )
167166
168167 { :error , :tenant_database_connection_initializing } ->
169- Logging . log_error ( socket , "InitializingProjectConnection" , "Connecting to the project database" )
168+ log_error ( socket , "InitializingProjectConnection" , "Connecting to the project database" )
170169
171170 { :error , :token_malformed , msg } ->
172- Logging . log_error ( socket , "MalformedJWT" , msg )
171+ log_error ( socket , "MalformedJWT" , msg )
173172
174173 { :error , invalid_exp } when is_integer ( invalid_exp ) and invalid_exp <= 0 ->
175- Logging . log_error ( socket , "InvalidJWTToken" , "Token expiration time is invalid" )
174+ log_error ( socket , "InvalidJWTToken" , "Token expiration time is invalid" )
176175
177176 { :error , :private_only } ->
178- Logging . log_error ( socket , "PrivateOnly" , "This project only allows private channels" )
177+ log_error ( socket , "PrivateOnly" , "This project only allows private channels" )
179178
180179 { :error , :tenant_not_found } ->
181- Logging . log_error ( socket , "TenantNotFound" , "Tenant with the given ID does not exist" )
180+ log_error ( socket , "TenantNotFound" , "Tenant with the given ID does not exist" )
182181
183182 { :error , :tenant_suspended } ->
184- Logging . log_error ( socket , "RealtimeDisabledForTenant" , "Realtime disabled for this tenant" )
183+ log_error ( socket , "RealtimeDisabledForTenant" , "Realtime disabled for this tenant" )
185184
186185 { :error , :signature_error } ->
187- Logging . log_error ( socket , "JwtSignatureError" , "Failed to validate JWT signature" )
186+ log_error ( socket , "JwtSignatureError" , "Failed to validate JWT signature" )
188187
189188 { :error , :shutdown_in_progress } ->
190- Logging . log_error ( socket , "RealtimeRestarting" , "Realtime is restarting, please standby" )
189+ log_error ( socket , "RealtimeRestarting" , "Realtime is restarting, please standby" )
191190
192191 { :error , error } ->
193- Logging . log_error ( socket , "UnknownErrorOnChannel" , error )
192+ log_error ( socket , "UnknownErrorOnChannel" , error )
194193 { :error , % { reason: "Unknown Error on Channel" } }
195194 end
196195 end
@@ -231,25 +230,16 @@ defmodule RealtimeWeb.RealtimeChannel do
231230 end
232231
233232 def handle_info ( % { event: "presence_diff" , payload: payload } = msg , socket ) do
234- % { presence_rate_counter: presence_rate_counter , limits: % { max_events_per_second: max } } = socket . assigns
235-
233+ % { presence_rate_counter: presence_rate_counter } = socket . assigns
236234 GenCounter . add ( presence_rate_counter . id )
237- { :ok , rate_counter } = RateCounter . get ( presence_rate_counter )
238-
239- # Let's just log for now
240- if rate_counter . avg > max do
241- message = "Too many presence messages per second"
242- log_warning ( "TooManyPresenceMessages" , message )
243- end
244-
245- Logging . maybe_log_info ( socket , msg )
235+ maybe_log_info ( socket , msg )
246236 push ( socket , "presence_diff" , payload )
247237 { :noreply , socket }
248238 end
249239
250240 def handle_info ( % { event: type , payload: payload } = msg , socket ) do
251241 count ( socket )
252- Logging . maybe_log_info ( socket , msg )
242+ maybe_log_info ( socket , msg )
253243 push ( socket , type , payload )
254244 { :noreply , socket }
255245 end
@@ -274,19 +264,19 @@ defmodule RealtimeWeb.RealtimeChannel do
274264 case PostgresCdc . after_connect ( module , response , postgres_extension , pg_change_params ) do
275265 { :ok , _response } ->
276266 message = "Subscribed to PostgreSQL"
277- Logging . maybe_log_info ( socket , message )
267+ maybe_log_info ( socket , message )
278268 push_system_message ( "postgres_changes" , socket , "ok" , message , channel_name )
279269 { :noreply , assign ( socket , :pg_sub_ref , nil ) }
280270
281271 error ->
282- Logging . maybe_log_warning ( socket , "RealtimeDisabledForConfiguration" , error )
272+ maybe_log_warning ( socket , "RealtimeDisabledForConfiguration" , error )
283273
284274 push_system_message ( "postgres_changes" , socket , "error" , error , channel_name )
285275 { :noreply , assign ( socket , :pg_sub_ref , postgres_subscribe ( 5 , 10 ) ) }
286276 end
287277
288278 nil ->
289- Logging . maybe_log_warning (
279+ maybe_log_warning (
290280 socket ,
291281 "ReconnectSubscribeToPostgres" ,
292282 "Re-connecting to PostgreSQL with params: " <> inspect ( pg_change_params )
@@ -295,13 +285,13 @@ defmodule RealtimeWeb.RealtimeChannel do
295285 { :noreply , assign ( socket , :pg_sub_ref , postgres_subscribe ( ) ) }
296286
297287 error ->
298- Logging . maybe_log_error ( socket , "UnableToSubscribeToPostgres" , error )
288+ maybe_log_error ( socket , "UnableToSubscribeToPostgres" , error )
299289 push_system_message ( "postgres_changes" , socket , "error" , error , channel_name )
300290 { :noreply , assign ( socket , :pg_sub_ref , postgres_subscribe ( 5 , 10 ) ) }
301291 end
302292 rescue
303293 error ->
304- log_warning ( "UnableToSubscribeToPostgres" , error )
294+ log_warning ( socket , "UnableToSubscribeToPostgres" , error )
305295 push_system_message ( "postgres_changes" , socket , "error" , error , channel_name )
306296 { :noreply , assign ( socket , :pg_sub_ref , postgres_subscribe ( 5 , 10 ) ) }
307297 end
@@ -319,7 +309,7 @@ defmodule RealtimeWeb.RealtimeChannel do
319309 shutdown_response ( socket , msg )
320310
321311 { :error , error } ->
322- shutdown_response ( socket , to_log ( error ) )
312+ shutdown_response ( socket , Realtime.Logs . to_log ( error ) )
323313 end
324314 end
325315
@@ -329,7 +319,16 @@ defmodule RealtimeWeb.RealtimeChannel do
329319 { :stop , :shutdown , socket }
330320 end
331321
332- def handle_info ( :sync_presence , % { assigns: % { presence_enabled?: true } } = socket ) , do: PresenceHandler . sync ( socket )
322+ def handle_info ( :sync_presence , % { assigns: % { presence_enabled?: true } } = socket ) do
323+ case PresenceHandler . sync ( socket ) do
324+ :ok ->
325+ { :noreply , socket }
326+
327+ { :error , :rate_limit_exceeded } ->
328+ shutdown_response ( socket , "Too many presence messages per second" )
329+ end
330+ end
331+
333332 def handle_info ( :sync_presence , socket ) , do: { :noreply , socket }
334333 def handle_info ( _ , socket ) , do: { :noreply , socket }
335334
@@ -341,7 +340,7 @@ defmodule RealtimeWeb.RealtimeChannel do
341340 BroadcastHandler . handle ( payload , db_conn , socket )
342341 else
343342 { :error , error } ->
344- log_error ( "UnableToHandleBroadcast" , error )
343+ log_error ( socket , "UnableToHandleBroadcast" , error )
345344 { :noreply , socket }
346345 end
347346 end
@@ -353,17 +352,30 @@ defmodule RealtimeWeb.RealtimeChannel do
353352 def handle_in ( "presence" , payload , % { assigns: % { private?: true } } = socket ) do
354353 % { tenant: tenant_id } = socket . assigns
355354
356- with { :ok , db_conn } <- Connect . lookup_or_start_connection ( tenant_id ) do
357- PresenceHandler . handle ( payload , db_conn , socket )
355+ with { :ok , db_conn } <- Connect . lookup_or_start_connection ( tenant_id ) ,
356+ { :ok , socket } <- PresenceHandler . handle ( payload , db_conn , socket ) do
357+ { :reply , :ok , socket }
358358 else
359+ { :error , :rate_limit_exceeded } ->
360+ shutdown_response ( socket , "Too many presence messages per second" )
361+
359362 { :error , error } ->
360- log_error ( "UnableToHandlePresence" , error )
361- { :noreply , socket }
363+ log_error ( socket , "UnableToHandlePresence" , error )
364+ { :reply , :error , socket }
362365 end
363366 end
364367
365368 def handle_in ( "presence" , payload , % { assigns: % { private?: false } } = socket ) do
366- PresenceHandler . handle ( payload , socket )
369+ with { :ok , socket } <- PresenceHandler . handle ( payload , socket ) do
370+ { :reply , :ok , socket }
371+ else
372+ { :error , :rate_limit_exceeded } ->
373+ shutdown_response ( socket , "Too many presence messages per second" )
374+
375+ { :error , error } ->
376+ log_error ( socket , "UnableToHandlePresence" , error )
377+ { :reply , :error , socket }
378+ end
367379 end
368380
369381 def handle_in ( "access_token" , % { "access_token" => "sb_" <> _ } , socket ) do
@@ -476,7 +488,7 @@ defmodule RealtimeWeb.RealtimeChannel do
476488 { :error , :too_many_joins }
477489
478490 error ->
479- Logging . log_error ( socket , "UnknownErrorOnCounter" , error )
491+ log_error ( socket , "UnknownErrorOnCounter" , error )
480492 { :error , error }
481493 end
482494 end
@@ -553,7 +565,7 @@ defmodule RealtimeWeb.RealtimeChannel do
553565 assign ( socket , :access_token , tenant_token )
554566 end
555567
556- defp confirm_token ( % { assigns: assigns } = socket ) do
568+ defp confirm_token ( % { assigns: assigns } ) do
557569 % { jwt_secret: jwt_secret , access_token: access_token } = assigns
558570
559571 jwt_jwks = Map . get ( assigns , :jwt_jwks )
@@ -586,7 +598,7 @@ defmodule RealtimeWeb.RealtimeChannel do
586598 defp shutdown_response ( socket , message ) when is_binary ( message ) do
587599 % { assigns: % { channel_name: channel_name } } = socket
588600 push_system_message ( "system" , socket , "error" , message , channel_name )
589- Logging . maybe_log_warning ( socket , "ChannelShutdown" , message )
601+ maybe_log_warning ( socket , "ChannelShutdown" , message )
590602 { :stop , :normal , socket }
591603 end
592604
@@ -725,7 +737,7 @@ defmodule RealtimeWeb.RealtimeChannel do
725737 { :error , :increase_connection_pool }
726738
727739 { :error , :rls_policy_error , error } ->
728- log_error ( "RlsPolicyError" , error )
740+ log_error ( socket , "RlsPolicyError" , error )
729741
730742 { :error , :unauthorized , "You do not have permissions to read from this Channel topic: #{ topic } " }
731743
0 commit comments