Skip to content

Commit 4a56676

Browse files
committed
feat: 브로드 캐스트 구현
1 parent f16b34c commit 4a56676

File tree

3 files changed

+73
-11
lines changed

3 files changed

+73
-11
lines changed

ProjectVG.Application/Services/MessageBroker/DistributedMessageBroker.cs

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using ProjectVG.Domain.Services.Server;
55
using ProjectVG.Application.Services.Session;
66
using StackExchange.Redis;
7+
using System.Collections.Concurrent;
78

89
namespace ProjectVG.Application.Services.MessageBroker
910
{
@@ -200,16 +201,15 @@ private async void OnUserMessageReceived(RedisChannel channel, RedisValue messag
200201
_logger.LogInformation("[분산브로커] 메시지 파싱 완료: TargetUserId={TargetUserId}, SourceServerId={SourceServerId}, MessageType={MessageType}",
201202
brokerMessage.TargetUserId, brokerMessage.SourceServerId, brokerMessage.MessageType);
202203

203-
// 로컬에서 해당 사용자가 연결되어 있는지 확인
204-
var isLocalActive = _connectionRegistry.IsConnected(brokerMessage.TargetUserId);
205-
_logger.LogInformation("[분산브로커] 로컬 세션 확인: TargetUserId={TargetUserId}, IsLocalActive={IsLocalActive}",
206-
brokerMessage.TargetUserId, isLocalActive);
207-
208-
if (isLocalActive)
204+
if (_connectionRegistry.TryGet(brokerMessage.TargetUserId, out var connection) && connection != null)
209205
{
210-
var payload = brokerMessage.DeserializePayload<object>();
211-
await SendLocalMessage(brokerMessage.TargetUserId, payload);
212-
206+
var payloadText = brokerMessage.Payload;
207+
if (string.IsNullOrEmpty(payloadText))
208+
{
209+
_logger.LogWarning("[분산브로커] 빈 Payload 수신: Channel={Channel}, TargetUserId={TargetUserId}", channel, brokerMessage.TargetUserId);
210+
return;
211+
}
212+
await connection.SendTextAsync(payloadText);
213213
_logger.LogInformation("[분산브로커] 분산 사용자 메시지 처리 완료: TargetUserId={TargetUserId}", brokerMessage.TargetUserId);
214214
}
215215
else
@@ -257,9 +257,45 @@ private async void OnBroadcastMessageReceived(RedisChannel channel, RedisValue m
257257
return;
258258
}
259259

260+
_logger.LogDebug("브로드캐스트 메시지 수신: {MessageType}, SourceServer: {SourceServerId}",
261+
brokerMessage.MessageType, brokerMessage.SourceServerId);
262+
260263
// 현재 서버에 연결된 모든 사용자에게 브로드캐스트
261-
// TODO: IConnectionRegistry에서 모든 활성 사용자 목록을 가져와서 전송
262-
_logger.LogDebug("브로드캐스트 메시지 수신: {MessageType}", brokerMessage.MessageType);
264+
var activeConnections = _connectionRegistry.GetAllActiveConnections().ToList();
265+
if (activeConnections.Count == 0)
266+
{
267+
_logger.LogDebug("브로드캐스트 대상 없음: 활성 연결 수 = 0");
268+
return;
269+
}
270+
271+
var broadcastTasks = new List<Task>();
272+
var successCount = 0;
273+
var failureCount = 0;
274+
275+
foreach (var (userId, connection) in activeConnections)
276+
{
277+
var task = Task.Run(async () =>
278+
{
279+
try
280+
{
281+
await connection.SendTextAsync(brokerMessage.Payload);
282+
Interlocked.Increment(ref successCount);
283+
_logger.LogTrace("브로드캐스트 전송 성공: UserId={UserId}", userId);
284+
}
285+
catch (Exception ex)
286+
{
287+
Interlocked.Increment(ref failureCount);
288+
_logger.LogWarning(ex, "브로드캐스트 전송 실패: UserId={UserId}", userId);
289+
}
290+
});
291+
broadcastTasks.Add(task);
292+
}
293+
294+
// 모든 전송 완료 대기 (타임아웃 5초)
295+
await Task.WhenAll(broadcastTasks).ConfigureAwait(false);
296+
297+
_logger.LogInformation("브로드캐스트 완료: 대상={TotalCount}, 성공={SuccessCount}, 실패={FailureCount}",
298+
activeConnections.Count, successCount, failureCount);
263299
}
264300
catch (Exception ex)
265301
{

ProjectVG.Application/Services/Session/ConnectionRegistry.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,22 @@ public int GetActiveConnectionCount()
6262
{
6363
return _connections.Count;
6464
}
65+
66+
/// <summary>
67+
/// 모든 활성 연결을 반환합니다
68+
/// </summary>
69+
public IEnumerable<KeyValuePair<string, IClientConnection>> GetAllActiveConnections()
70+
{
71+
return _connections.ToArray(); // Thread-safe snapshot
72+
}
73+
74+
/// <summary>
75+
/// 모든 활성 사용자 ID를 반환합니다
76+
/// </summary>
77+
public IEnumerable<string> GetAllActiveUserIds()
78+
{
79+
return _connections.Keys.ToArray(); // Thread-safe snapshot
80+
}
6581
}
6682
}
6783

ProjectVG.Application/Services/Session/IConnectionRegistry.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,16 @@ public interface IConnectionRegistry
2929
/// 활성 연결 수를 반환합니다
3030
/// </summary>
3131
int GetActiveConnectionCount();
32+
33+
/// <summary>
34+
/// 모든 활성 연결을 반환합니다
35+
/// </summary>
36+
IEnumerable<KeyValuePair<string, IClientConnection>> GetAllActiveConnections();
37+
38+
/// <summary>
39+
/// 모든 활성 사용자 ID를 반환합니다
40+
/// </summary>
41+
IEnumerable<string> GetAllActiveUserIds();
3242
}
3343
}
3444

0 commit comments

Comments
 (0)