55using ProjectVG . Application . Services . Session ;
66using StackExchange . Redis ;
77using System . Collections . Concurrent ;
8+ using System . Text . Json ;
89
910namespace ProjectVG . Application . Services . MessageBroker
1011{
@@ -32,35 +33,58 @@ public DistributedMessageBroker(
3233 IServerRegistrationService serverRegistration ,
3334 ILogger < DistributedMessageBroker > logger )
3435 {
35- _redis = redis ;
36- _subscriber = redis . GetSubscriber ( ) ;
37- _connectionManager = connectionManager ;
38- _serverRegistration = serverRegistration ;
39- _logger = logger ;
40- _serverId = serverRegistration . GetServerId ( ) ;
41-
42- InitializeSubscriptions ( ) ;
36+ _redis = redis ?? throw new ArgumentNullException ( nameof ( redis ) ) ;
37+ _connectionManager = connectionManager ?? throw new ArgumentNullException ( nameof ( connectionManager ) ) ;
38+ _serverRegistration = serverRegistration ?? throw new ArgumentNullException ( nameof ( serverRegistration ) ) ;
39+ _logger = logger ?? throw new ArgumentNullException ( nameof ( logger ) ) ;
40+
41+ _logger . LogInformation ( "[분산브로커] DistributedMessageBroker 생성자 시작" ) ;
42+
43+ try
44+ {
45+ _subscriber = redis . GetSubscriber ( ) ;
46+ _serverId = serverRegistration . GetServerId ( ) ;
47+
48+ _logger . LogInformation ( "[분산브로커] Redis 연결 상태: IsConnected={IsConnected}, ServerId={ServerId}" ,
49+ redis . IsConnected , _serverId ) ;
50+
51+ InitializeSubscriptions ( ) ;
52+
53+ _logger . LogInformation ( "[분산브로커] DistributedMessageBroker 생성 완료: ServerId={ServerId}" , _serverId ) ;
54+ }
55+ catch ( Exception ex )
56+ {
57+ _logger . LogError ( ex , "[분산브로커] DistributedMessageBroker 생성자 실패" ) ;
58+ throw ;
59+ }
4360 }
4461
4562 private void InitializeSubscriptions ( )
4663 {
4764 try
4865 {
66+ _logger . LogInformation ( "[분산브로커] Redis 구독 초기화 시작: ServerId={ServerId}" , _serverId ) ;
67+
4968 // 이 서버로 오는 메시지 구독
5069 var serverChannel = $ "{ SERVER_CHANNEL_PREFIX } :{ _serverId } ";
70+ _logger . LogInformation ( "[분산브로커] 서버 채널 구독 시작: Channel={Channel}" , serverChannel ) ;
5171 _subscriber . Subscribe ( serverChannel , OnServerMessageReceived ) ;
72+ _logger . LogInformation ( "[분산브로커] 서버 채널 구독 완료: Channel={Channel}" , serverChannel ) ;
5273
5374 // 브로드캐스트 메시지 구독
75+ _logger . LogInformation ( "[분산브로커] 브로드캐스트 채널 구독 시작: Channel={Channel}" , BROADCAST_CHANNEL ) ;
5476 _subscriber . Subscribe ( BROADCAST_CHANNEL , OnBroadcastMessageReceived ) ;
77+ _logger . LogInformation ( "[분산브로커] 브로드캐스트 채널 구독 완료: Channel={Channel}" , BROADCAST_CHANNEL ) ;
5578
5679 // 사용자별 메시지 패턴 구독 (현재 서버에 연결된 사용자들만)
5780 // 사용자가 연결될 때 동적으로 구독하도록 변경 예정
5881
59- _logger . LogInformation ( "분산 메시지 브로커 구독 초기화 완료: 서버 {ServerId}" , _serverId ) ;
82+ _logger . LogInformation ( "[분산브로커] 분산 메시지 브로커 구독 초기화 완료: ServerId= {ServerId}" , _serverId ) ;
6083 }
6184 catch ( Exception ex )
6285 {
63- _logger . LogError ( ex , "분산 메시지 브로커 구독 초기화 실패" ) ;
86+ _logger . LogError ( ex , "[분산브로커] 분산 메시지 브로커 구독 초기화 실패: ServerId={ServerId}" , _serverId ) ;
87+ throw ;
6488 }
6589 }
6690
@@ -92,14 +116,14 @@ public async Task SendToUserAsync(string userId, object message)
92116 return ;
93117 }
94118
95- // 3. 해당 서버로 메시지 전송
119+ // 3. 해당 서버로 메시지 전송 (서버별 채널 사용)
96120 var brokerMessage = BrokerMessage . CreateUserMessage ( userId , message , _serverId ) ;
97- var userChannel = $ "{ USER_CHANNEL_PREFIX } :{ userId } ";
121+ var serverChannel = $ "{ SERVER_CHANNEL_PREFIX } :{ targetServerId } ";
98122
99123 _logger . LogInformation ( "[분산브로커] Redis Pub 시작: Channel={Channel}, TargetServerId={TargetServerId}, SourceServerId={SourceServerId}" ,
100- userChannel , targetServerId , _serverId ) ;
124+ serverChannel , targetServerId , _serverId ) ;
101125
102- await _subscriber . PublishAsync ( userChannel , brokerMessage . ToJson ( ) ) ;
126+ await _subscriber . PublishAsync ( serverChannel , brokerMessage . ToJson ( ) ) ;
103127
104128 _logger . LogInformation ( "[분산브로커] Redis Pub 완료: UserId={UserId}, TargetServerId={TargetServerId}" , userId , targetServerId ) ;
105129 }
@@ -245,9 +269,41 @@ private async void OnServerMessageReceived(RedisChannel channel, RedisValue mess
245269 }
246270
247271 // 서버별 메시지 처리 로직
248- _logger . LogDebug ( "서버 메시지 수신: {MessageType}" , brokerMessage . MessageType ) ;
272+ _logger . LogInformation ( "[분산브로커] 서버 메시지 수신: MessageType={MessageType}, SourceServerId={SourceServerId}" ,
273+ brokerMessage . MessageType , brokerMessage . SourceServerId ) ;
249274
250- // TODO: 서버별 메시지 타입에 따른 처리 로직 구현
275+ // 사용자 메시지 처리
276+ if ( brokerMessage . MessageType == "user_message" && ! string . IsNullOrEmpty ( brokerMessage . TargetUserId ) )
277+ {
278+ _logger . LogInformation ( "[분산브로커] 사용자 메시지 처리 시작: TargetUserId={TargetUserId}" ,
279+ brokerMessage . TargetUserId ) ;
280+
281+ // 해당 사용자가 이 서버에 연결되어 있는지 확인
282+ if ( _connectionManager . HasLocalConnection ( brokerMessage . TargetUserId ) )
283+ {
284+ _logger . LogInformation ( "[분산브로커] 원본 Payload JSON: {PayloadJson}" , brokerMessage . Payload ) ;
285+
286+ // 원본 JSON 문자열을 직접 사용하여 메시지 전달
287+ await SendLocalMessageAsJson ( brokerMessage . TargetUserId , brokerMessage . Payload ) ;
288+
289+ _logger . LogInformation ( "[분산브로커] 사용자 메시지 전달 완료: TargetUserId={TargetUserId}" ,
290+ brokerMessage . TargetUserId ) ;
291+ }
292+ else
293+ {
294+ _logger . LogWarning ( "[분산브로커] 대상 사용자가 이 서버에 연결되어 있지 않음: TargetUserId={TargetUserId}, ServerId={ServerId}" ,
295+ brokerMessage . TargetUserId , _serverId ) ;
296+ }
297+ }
298+ else if ( brokerMessage . MessageType == "server_message" )
299+ {
300+ // 다른 서버별 메시지 타입 처리 (향후 확장)
301+ _logger . LogDebug ( "[분산브로커] 서버 메시지 처리: {MessageType}" , brokerMessage . MessageType ) ;
302+ }
303+ else
304+ {
305+ _logger . LogWarning ( "[분산브로커] 알 수 없는 메시지 타입: {MessageType}" , brokerMessage . MessageType ) ;
306+ }
251307 }
252308 catch ( Exception ex )
253309 {
@@ -320,6 +376,64 @@ private async void OnBroadcastMessageReceived(RedisChannel channel, RedisValue m
320376 }
321377 }
322378
379+ private async Task SendLocalMessageAsJson ( string userId , string payloadJson )
380+ {
381+ if ( string . IsNullOrEmpty ( payloadJson ) )
382+ {
383+ _logger . LogWarning ( "[분산브로커] SendLocalMessageAsJson: Payload가 비어있습니다. UserId={UserId}" , userId ) ;
384+ return ;
385+ }
386+
387+ _logger . LogInformation ( "[분산브로커] SendLocalMessageAsJson 시작: UserId={UserId}, PayloadLength={PayloadLength}" ,
388+ userId , payloadJson . Length ) ;
389+
390+ try
391+ {
392+ // 원본 JSON이 이미 WebSocketMessage 형태인지 확인
393+ using var document = JsonDocument . Parse ( payloadJson ) ;
394+ var root = document . RootElement ;
395+
396+ string messageText ;
397+
398+ // WebSocketMessage 구조인지 확인 (type과 data 필드가 있는지)
399+ if ( root . TryGetProperty ( "type" , out var typeProperty ) &&
400+ root . TryGetProperty ( "data" , out var dataProperty ) )
401+ {
402+ // 이미 WebSocketMessage 형태이므로 그대로 사용
403+ messageText = payloadJson ;
404+ _logger . LogInformation ( "[분산브로커] WebSocketMessage 형태 감지: Type={Type}" , typeProperty . GetString ( ) ) ;
405+ }
406+ else
407+ {
408+ // 일반 객체이므로 WebSocketMessage로 래핑 (예상되지 않는 케이스)
409+ _logger . LogWarning ( "[분산브로커] 예상하지 못한 JSON 구조, WebSocketMessage로 래핑: UserId={UserId}" , userId ) ;
410+ var wrappedMessage = new WebSocketMessage ( "message" , root ) ;
411+ messageText = System . Text . Json . JsonSerializer . Serialize ( wrappedMessage ) ;
412+ }
413+
414+ _logger . LogInformation ( "[분산브로커] 최종 전송 메시지: {MessageText}" , messageText ) ;
415+
416+ var success = await _connectionManager . SendTextAsync ( userId , messageText ) ;
417+ if ( success )
418+ {
419+ _logger . LogInformation ( "[분산브로커] JSON 메시지 전송 완료: UserId={UserId}" , userId ) ;
420+ }
421+ else
422+ {
423+ _logger . LogWarning ( "[분산브로커] JSON 메시지 전송 실패: UserId={UserId}" , userId ) ;
424+ }
425+ }
426+ catch ( JsonException ex )
427+ {
428+ _logger . LogError ( ex , "[분산브로커] JSON 파싱 실패: UserId={UserId}, Payload={Payload}" , userId , payloadJson ) ;
429+ }
430+ catch ( Exception ex )
431+ {
432+ _logger . LogError ( ex , "[분산브로커] SendLocalMessageAsJson 실패: UserId={UserId}" , userId ) ;
433+ throw ;
434+ }
435+ }
436+
323437 private async Task SendLocalMessage ( string userId , object ? message )
324438 {
325439 if ( message == null )
@@ -335,12 +449,18 @@ private async Task SendLocalMessage(string userId, object? message)
335449 {
336450 string messageText ;
337451
452+ // WebSocketMessage는 이미 올바른 형태이므로 그대로 직렬화
338453 if ( message is WebSocketMessage wsMessage )
339454 {
340455 messageText = System . Text . Json . JsonSerializer . Serialize ( wsMessage ) ;
456+ _logger . LogInformation ( "[분산브로커] WebSocketMessage 직렬화: UserId={UserId}, Type={Type}" ,
457+ userId , wsMessage . Type ) ;
341458 }
342459 else
343460 {
461+ // 다른 객체는 WebSocketMessage로 래핑 (하지만 ChatSuccessHandler에서는 이미 래핑됨)
462+ _logger . LogWarning ( "[분산브로커] 예상하지 못한 객체 타입: {MessageType}, UserId={UserId}" ,
463+ message . GetType ( ) . Name , userId ) ;
344464 var wrappedMessage = new WebSocketMessage ( "message" , message ) ;
345465 messageText = System . Text . Json . JsonSerializer . Serialize ( wrappedMessage ) ;
346466 }
0 commit comments