Skip to content

Commit 3ed2978

Browse files
authored
Handle too many heart missing with reconnection (#394)
Remove the update status from the Dispose() function. It is unnecessary since the disposal can be called only from the Client close() function where the status is set. Add a function to convert the connection close reason string to the Reliable.ChangeStatusReason Enum. This mapping should be improved to version 2.0, where we can introduce breaking changes. In version 1.x, we need the conversion function. --------- Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent b648639 commit 3ed2978

File tree

10 files changed

+74
-39
lines changed

10 files changed

+74
-39
lines changed

RabbitMQ.Stream.Client/Client.cs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,8 @@ private async Task OnConnectionClosed(string reason)
194194
if (ConnectionClosed != null)
195195
{
196196
var t = ConnectionClosed?.Invoke(reason)!;
197-
await t.ConfigureAwait(false);
197+
if (t != null)
198+
await t.ConfigureAwait(false);
198199
}
199200
}
200201

@@ -765,7 +766,7 @@ private async ValueTask<bool> ConsumerUpdateResponse(uint rCorrelationId, IOffse
765766
return await Publish(new ConsumerUpdateRequest(rCorrelationId, offsetSpecification)).ConfigureAwait(false);
766767
}
767768

768-
public async Task<CloseResponse> Close(string reason)
769+
private async Task<CloseResponse> Close(string reason, string closedStatus)
769770
{
770771
if (IsClosed)
771772
{
@@ -775,7 +776,7 @@ public async Task<CloseResponse> Close(string reason)
775776
InternalClose();
776777
try
777778
{
778-
_connection.UpdateCloseStatus(ConnectionClosedReason.Normal);
779+
_connection.UpdateCloseStatus(closedStatus);
779780
var result =
780781
await Request<CloseRequest, CloseResponse>(corr => new CloseRequest(corr, reason),
781782
TimeSpan.FromSeconds(10)).ConfigureAwait(false);
@@ -799,6 +800,11 @@ public async Task<CloseResponse> Close(string reason)
799800
return new CloseResponse(0, ResponseCode.Ok);
800801
}
801802

803+
public async Task<CloseResponse> Close(string reason)
804+
{
805+
return await Close(reason, ConnectionClosedReason.Normal).ConfigureAwait(false);
806+
}
807+
802808
// _poolSemaphore is introduced here: https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/328
803809
// the MaybeClose can be called in different threads so we need to protect the pool
804810
// the pool itself is thread safe but we need to protect the flow to be sure that the

RabbitMQ.Stream.Client/Connection.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ internal static class ConnectionClosedReason
1818
{
1919
public const string Normal = "TCP connection closed normal";
2020
public const string Unexpected = "TCP connection closed unexpected";
21+
public const string MissingHeartbeat = "TCP connection closed missing heartbeat";
22+
2123
}
2224

2325
public class Connection : IDisposable
@@ -240,7 +242,6 @@ public void Dispose()
240242
{
241243
try
242244
{
243-
UpdateCloseStatus(ConnectionClosedReason.Normal);
244245
if (!_cancelTokenSource.IsCancellationRequested)
245246
{
246247
_cancelTokenSource.Cancel();

RabbitMQ.Stream.Client/HeartBeatHandler.cs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@ public class HeartBeatHandler
1919
private uint _missedHeartbeat;
2020

2121
private readonly Func<ValueTask<bool>> _sendHeartbeatFunc;
22-
private readonly Func<string, Task<CloseResponse>> _close;
22+
private readonly Func<string, string, Task<CloseResponse>> _close;
2323
private readonly int _heartbeat;
2424
private readonly ILogger<HeartBeatHandler> _logger;
2525

2626
public HeartBeatHandler(Func<ValueTask<bool>> sendHeartbeatFunc,
27-
Func<string, Task<CloseResponse>> close,
27+
Func<string, string, Task<CloseResponse>> close,
2828
int heartbeat,
2929
ILogger<HeartBeatHandler> logger = null
3030
)
@@ -77,7 +77,13 @@ private async Task PerformHeartBeatAsync()
7777
// client will be closed
7878
_logger.LogCritical("Too many heartbeats missed: {MissedHeartbeatCounter}", _missedHeartbeat);
7979
Close();
80-
await _close($"Too many heartbeats missed: {_missedHeartbeat}. Client connection will be closed.").ConfigureAwait(false);
80+
// The heartbeat is missed for x times the client will be closed with the reason Unexpected
81+
// In this way the ReliableProducer / ReliableConsumer will be able to handle the close reason
82+
// and reconnect the client
83+
// Even it is not a perfect solution, it is a good way to handle the case to avoid to introduce breaking changes
84+
// we need to review all the status and the close reason on the version 2.0
85+
await _close($"Too many heartbeats missed: {_missedHeartbeat}. Client connection will be closed.",
86+
ConnectionClosedReason.MissingHeartbeat).ConfigureAwait(false);
8187
}
8288

8389
internal void UpdateHeartBeat()

RabbitMQ.Stream.Client/PublicAPI.Shipped.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,6 @@ RabbitMQ.Stream.Client.Hash.Murmur3.Seed.get -> uint
308308
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy
309309
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.HashRoutingMurmurStrategy(System.Func<RabbitMQ.Stream.Client.Message, string> routingKeyExtractor) -> void
310310
RabbitMQ.Stream.Client.HeartBeatHandler
311-
RabbitMQ.Stream.Client.HeartBeatHandler.HeartBeatHandler(System.Func<System.Threading.Tasks.ValueTask<bool>> sendHeartbeatFunc, System.Func<string, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.CloseResponse>> close, int heartbeat, Microsoft.Extensions.Logging.ILogger<RabbitMQ.Stream.Client.HeartBeatHandler> logger = null) -> void
312311
RabbitMQ.Stream.Client.IClient
313312
RabbitMQ.Stream.Client.IClient.Close(string reason) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.CloseResponse>
314313
RabbitMQ.Stream.Client.IClient.ConnectionProperties.get -> System.Collections.Generic.IDictionary<string, string>

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ RabbitMQ.Stream.Client.EntityCommonConfig.Identifier.set -> void
137137
RabbitMQ.Stream.Client.EntityCommonConfig.MetadataHandler.get -> System.Func<RabbitMQ.Stream.Client.MetaDataUpdate, System.Threading.Tasks.Task>
138138
RabbitMQ.Stream.Client.EntityCommonConfig.MetadataHandler.set -> void
139139
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
140+
RabbitMQ.Stream.Client.HeartBeatHandler.HeartBeatHandler(System.Func<System.Threading.Tasks.ValueTask<bool>> sendHeartbeatFunc, System.Func<string, string, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.CloseResponse>> close, int heartbeat, Microsoft.Extensions.Logging.ILogger<RabbitMQ.Stream.Client.HeartBeatHandler> logger = null) -> void
140141
RabbitMQ.Stream.Client.IClient.ClientId.get -> string
141142
RabbitMQ.Stream.Client.IClient.ClientId.init -> void
142143
RabbitMQ.Stream.Client.IClient.Consumers.get -> System.Collections.Generic.IDictionary<byte, (string, RabbitMQ.Stream.Client.ConsumerEvents)>
@@ -243,6 +244,7 @@ RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
243244
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.BoolFailure = 5 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
244245
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.ClosedByStrategyPolicy = 4 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
245246
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.ClosedByUser = 3 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
247+
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.DisconnectedByTooManyHeartbeatMissing = 6 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
246248
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.MetaDataUpdate = 2 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
247249
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.None = 0 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
248250
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.UnexpectedlyDisconnected = 1 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
@@ -367,6 +369,7 @@ static RabbitMQ.Stream.Client.RawProducer.Create(RabbitMQ.Stream.Client.ClientPa
367369
static RabbitMQ.Stream.Client.RawSuperStreamConsumer.Create(RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig, System.Collections.Generic.IDictionary<string, RabbitMQ.Stream.Client.StreamInfo> streamInfos, RabbitMQ.Stream.Client.ClientParameters clientParameters, Microsoft.Extensions.Logging.ILogger logger = null) -> RabbitMQ.Stream.Client.ISuperStreamConsumer
368370
static RabbitMQ.Stream.Client.RawSuperStreamProducer.Create(RabbitMQ.Stream.Client.RawSuperStreamProducerConfig rawSuperStreamProducerConfig, System.Collections.Generic.IDictionary<string, RabbitMQ.Stream.Client.StreamInfo> streamInfos, RabbitMQ.Stream.Client.ClientParameters clientParameters, Microsoft.Extensions.Logging.ILogger logger = null) -> RabbitMQ.Stream.Client.ISuperStreamProducer
369371
static RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Create(RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig producerConfig, Microsoft.Extensions.Logging.ILogger<RabbitMQ.Stream.Client.Reliable.Producer> logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer>
372+
static RabbitMQ.Stream.Client.Reliable.ReliableBase.FromConnectionClosedReasonToStatusReason(string connectionClosedReason) -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
370373
static RabbitMQ.Stream.Client.Reliable.ReliableBase.RandomWait() -> System.Threading.Tasks.Task
371374
static RabbitMQ.Stream.Client.RoutingHelper<T>.LookupLeaderConnection(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.StreamInfo metaDataInfo, RabbitMQ.Stream.Client.ConnectionsPool pool, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IClient>
372375
static RabbitMQ.Stream.Client.RoutingHelper<T>.LookupLeaderOrRandomReplicasConnection(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.StreamInfo metaDataInfo, RabbitMQ.Stream.Client.ConnectionsPool pool, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IClient>

RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ private async Task<IConsumer> StandardConsumer(bool boot)
6363
return;
6464

6565
await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream,
66-
ChangeStatusReason.UnexpectedlyDisconnected).ConfigureAwait(false);
66+
FromConnectionClosedReasonToStatusReason(closeReason)).ConfigureAwait(false);
6767
},
6868
MetadataHandler = async _ =>
6969
{
@@ -131,8 +131,7 @@ private async Task<IConsumer> SuperConsumer(bool boot)
131131

132132
var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition;
133133
await OnEntityClosed(_consumerConfig.StreamSystem, partitionStream, r,
134-
ChangeStatusReason.UnexpectedlyDisconnected)
135-
.ConfigureAwait(false);
134+
FromConnectionClosedReasonToStatusReason(closeReason)).ConfigureAwait(false);
136135
},
137136
MetadataHandler = async update =>
138137
{

RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ private async Task<IProducer> SuperStreamProducer(bool boot)
5151
return;
5252
var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition;
5353
await OnEntityClosed(_producerConfig.StreamSystem, partitionStream, r,
54-
ChangeStatusReason.UnexpectedlyDisconnected)
54+
ReliableBase.FromConnectionClosedReasonToStatusReason(closeReason))
5555
.ConfigureAwait(false);
5656
},
5757
MetadataHandler = async update =>
@@ -116,7 +116,7 @@ await OnEntityClosed(_producerConfig.StreamSystem, _producerConfig.Stream,
116116
return;
117117

118118
await OnEntityClosed(_producerConfig.StreamSystem, _producerConfig.Stream,
119-
ChangeStatusReason.UnexpectedlyDisconnected).ConfigureAwait(false);
119+
ReliableBase.FromConnectionClosedReasonToStatusReason(closeReason)).ConfigureAwait(false);
120120
},
121121
ConfirmHandler = confirmation =>
122122
{

RabbitMQ.Stream.Client/Reliable/ReliableBase.cs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ public enum ChangeStatusReason
3434
MetaDataUpdate,
3535
ClosedByUser,
3636
ClosedByStrategyPolicy,
37-
BoolFailure
37+
BoolFailure,
38+
DisconnectedByTooManyHeartbeatMissing,
3839
}
3940

4041
public record ReliableConfig
@@ -109,6 +110,23 @@ public abstract class ReliableBase
109110
protected abstract ILogger BaseLogger { get; }
110111
private ReliableConfig _reliableConfig;
111112

113+
/// <summary>
114+
/// The function to convert the string ConnectionClosedReason to the ChangeStatusReason enum
115+
///
116+
/// </summary>
117+
/// <param name="connectionClosedReason"></param>
118+
/// <returns></returns>
119+
/// <exception cref="ArgumentOutOfRangeException"></exception>
120+
protected static ChangeStatusReason FromConnectionClosedReasonToStatusReason(string connectionClosedReason)
121+
{
122+
// Can be removed on the version 2.0 when the ConnectionClosedReason will be an enum as well
123+
return connectionClosedReason switch
124+
{
125+
ConnectionClosedReason.MissingHeartbeat => ChangeStatusReason.DisconnectedByTooManyHeartbeatMissing,
126+
ConnectionClosedReason.Unexpected => ChangeStatusReason.UnexpectedlyDisconnected,
127+
_ => throw new ArgumentOutOfRangeException(nameof(connectionClosedReason), connectionClosedReason, null)
128+
};
129+
}
112130
protected static async Task RandomWait()
113131
{
114132
await Task.Delay(Consts.RandomMid()).ConfigureAwait(false);

Tests/UnitTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ public void HeartBeatRaiseClose()
393393
var testPassed = new TaskCompletionSource<bool>();
394394
var hBeatHandler = new HeartBeatHandler(
395395
() => default,
396-
s =>
396+
(s, r) =>
397397
{
398398
testPassed.SetResult(true);
399399
return null;
@@ -412,7 +412,7 @@ public void HeartBeatZeroNotRaisesClose()
412412
// the HeartBeatHandler is disabled by default
413413
var hBeatHandler = new HeartBeatHandler(
414414
() => default,
415-
s => null,
415+
(s, r) => null,
416416
0);
417417
Assert.False(hBeatHandler.IsActive());
418418
}

0 commit comments

Comments
 (0)