Skip to content

Commit df805be

Browse files
committed
fix #2507
- support multi-item MESSAGE broadcasts - implement CLIENT ID tracking (only on internal API for now, pending design)
1 parent 40c2918 commit df805be

File tree

9 files changed

+135
-14
lines changed

9 files changed

+135
-14
lines changed

src/StackExchange.Redis/ConnectionMultiplexer.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2338,5 +2338,8 @@ private Task[] QuitAllServers()
23382338
}
23392339
return quits;
23402340
}
2341+
2342+
long? IInternalConnectionMultiplexer.GetConnectionId(EndPoint endpoint, ConnectionType type)
2343+
=> GetServerEndPoint(endpoint)?.GetBridge(type)?.ConnectionId;
23412344
}
23422345
}

src/StackExchange.Redis/Interfaces/IConnectionMultiplexer.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
1-
using System;
2-
using System.Collections.Generic;
1+
using StackExchange.Redis.Maintenance;
2+
using StackExchange.Redis.Profiling;
3+
using System;
34
using System.IO;
45
using System.Net;
56
using System.Threading.Tasks;
6-
using StackExchange.Redis.Maintenance;
7-
using StackExchange.Redis.Profiling;
87

98
namespace StackExchange.Redis
109
{
@@ -17,6 +16,8 @@ internal interface IInternalConnectionMultiplexer : IConnectionMultiplexer
1716
ReadOnlySpan<ServerEndPoint> GetServerSnapshot();
1817

1918
ConfigurationOptions RawConfig { get; }
19+
20+
long? GetConnectionId(EndPoint endPoint, ConnectionType type);
2021
}
2122

2223
/// <summary>

src/StackExchange.Redis/PhysicalBridge.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ internal sealed class PhysicalBridge : IDisposable
5959

6060
private volatile int state = (int)State.Disconnected;
6161

62+
internal long? ConnectionId => physical?.ConnectionId;
63+
6264
#if NETCOREAPP
6365
private readonly SemaphoreSlim _singleWriterMutex = new(1,1);
6466
#else

src/StackExchange.Redis/PhysicalConnection.cs

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ private static readonly Message
5555

5656
private long bytesLastResult;
5757
private long bytesInBuffer;
58+
internal long? ConnectionId { get; set; }
5859

5960
internal void GetBytes(out long sent, out long received)
6061
{
@@ -1581,10 +1582,19 @@ private void MatchResult(in RawResult result)
15811582
// invoke the handlers
15821583
var channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal);
15831584
Trace("MESSAGE: " + channel);
1584-
if (!channel.IsNull && TryGetPubSubPayload(items[2], out var payload))
1585+
if (!channel.IsNull)
15851586
{
1586-
_readStatus = ReadStatus.InvokePubSub;
1587-
muxer.OnMessage(channel, channel, payload);
1587+
if (TryGetPubSubPayload(items[2], out var payload))
1588+
{
1589+
_readStatus = ReadStatus.InvokePubSub;
1590+
muxer.OnMessage(channel, channel, payload);
1591+
}
1592+
// could be multi-message: https://github.com/StackExchange/StackExchange.Redis/issues/2507
1593+
else if (TryGetMultiPubSubPayload(items[2], out var payloads))
1594+
{
1595+
_readStatus = ReadStatus.InvokePubSub;
1596+
muxer.OnMessage(channel, channel, payloads);
1597+
}
15881598
}
15891599
return; // AND STOP PROCESSING!
15901600
}
@@ -1594,11 +1604,20 @@ private void MatchResult(in RawResult result)
15941604

15951605
var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal);
15961606
Trace("PMESSAGE: " + channel);
1597-
if (!channel.IsNull && TryGetPubSubPayload(items[3], out var payload))
1607+
if (!channel.IsNull)
15981608
{
1599-
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern);
1600-
_readStatus = ReadStatus.InvokePubSub;
1601-
muxer.OnMessage(sub, channel, payload);
1609+
if (TryGetPubSubPayload(items[3], out var payload))
1610+
{
1611+
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern);
1612+
_readStatus = ReadStatus.InvokePubSub;
1613+
muxer.OnMessage(sub, channel, payload);
1614+
}
1615+
else if (TryGetMultiPubSubPayload(items[3], out var payloads))
1616+
{
1617+
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern);
1618+
_readStatus = ReadStatus.InvokePubSub;
1619+
muxer.OnMessage(sub, channel, payloads);
1620+
}
16021621
}
16031622
return; // AND STOP PROCESSING!
16041623
}
@@ -1647,6 +1666,17 @@ static bool TryGetPubSubPayload(in RawResult value, out RedisValue parsed, bool
16471666
parsed = default;
16481667
return false;
16491668
}
1669+
1670+
static bool TryGetMultiPubSubPayload(in RawResult value, out Sequence<RawResult> parsed)
1671+
{
1672+
if (value.Type == ResultType.MultiBulk && value.ItemsCount != 0)
1673+
{
1674+
parsed = value.GetItems();
1675+
return true;
1676+
}
1677+
parsed = default;
1678+
return false;
1679+
}
16501680
}
16511681

16521682
private volatile Message? _activeMessage;

src/StackExchange.Redis/RedisSubscriber.cs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Threading;
66
using System.Threading.Tasks;
77
using Pipelines.Sockets.Unofficial;
8+
using Pipelines.Sockets.Unofficial.Arenas;
89
using static StackExchange.Redis.ConnectionMultiplexer;
910

1011
namespace StackExchange.Redis
@@ -92,6 +93,24 @@ internal void OnMessage(in RedisChannel subscription, in RedisChannel channel, i
9293
}
9394
}
9495

96+
internal void OnMessage(in RedisChannel subscription, in RedisChannel channel, Sequence<RawResult> payload)
97+
{
98+
if (payload.IsSingleSegment)
99+
{
100+
foreach (var message in payload.FirstSpan)
101+
{
102+
OnMessage(subscription, channel, message.AsRedisValue());
103+
}
104+
}
105+
else
106+
{
107+
foreach (var message in payload)
108+
{
109+
OnMessage(subscription, channel, message.AsRedisValue());
110+
}
111+
}
112+
}
113+
95114
/// <summary>
96115
/// Updates all subscriptions re-evaluating their state.
97116
/// This clears the current server if it's not connected, prepping them to reconnect.

src/StackExchange.Redis/ResultProcessor.cs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ public static readonly MultiStreamProcessor
5656
public static readonly ResultProcessor<long>
5757
Int64 = new Int64Processor(),
5858
PubSubNumSub = new PubSubNumSubProcessor(),
59-
Int64DefaultNegativeOne = new Int64DefaultValueProcessor(-1);
59+
Int64DefaultNegativeOne = new Int64DefaultValueProcessor(-1),
60+
ClientId = new ClientIdProcessor();
6061

6162
public static readonly ResultProcessor<double?>
6263
NullableDouble = new NullableDoubleProcessor();
@@ -960,8 +961,7 @@ private sealed class ClusterNodesProcessor : ResultProcessor<ClusterConfiguratio
960961
{
961962
internal static ClusterConfiguration Parse(PhysicalConnection connection, string nodes)
962963
{
963-
var bridge = connection.BridgeCouldBeNull;
964-
if (bridge == null) throw new ObjectDisposedException(connection.ToString());
964+
var bridge = connection.BridgeCouldBeNull ?? throw new ObjectDisposedException(connection.ToString());
965965
var server = bridge.ServerEndPoint;
966966
var config = new ClusterConfiguration(bridge.Multiplexer.ServerSelectionStrategy, nodes, server.EndPoint);
967967
server.SetClusterConfiguration(config);
@@ -1218,6 +1218,28 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
12181218
}
12191219
}
12201220

1221+
private class ClientIdProcessor : ResultProcessor<long>
1222+
{
1223+
protected override bool SetResultCore(PhysicalConnection connection, Message message, in RawResult result)
1224+
{
1225+
switch (result.Type)
1226+
{
1227+
case ResultType.Integer:
1228+
case ResultType.SimpleString:
1229+
case ResultType.BulkString:
1230+
long i64;
1231+
if (result.TryGetInt64(out i64))
1232+
{
1233+
SetResult(message, i64);
1234+
connection.ConnectionId = i64;
1235+
return true;
1236+
}
1237+
break;
1238+
}
1239+
return false;
1240+
}
1241+
}
1242+
12211243
private class PubSubNumSubProcessor : Int64Processor
12221244
{
12231245
protected override bool SetResultCore(PhysicalConnection connection, Message message, in RawResult result)

src/StackExchange.Redis/ServerEndPoint.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -960,6 +960,9 @@ private async Task HandshakeAsync(PhysicalConnection connection, LogProxy? log)
960960
await WriteDirectOrQueueFireAndForgetAsync(connection, msg, ResultProcessor.DemandOK).ForAwait();
961961
}
962962
}
963+
msg = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.CLIENT, RedisLiterals.ID);
964+
msg.SetInternalCall();
965+
await WriteDirectOrQueueFireAndForgetAsync(connection, msg, ResultProcessor.ClientId).ForAwait();
963966
}
964967

965968
var bridge = connection.BridgeCouldBeNull;

tests/StackExchange.Redis.Tests/Helpers/SharedConnectionFixture.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,8 @@ public void ExportConfiguration(Stream destination, ExportOptions options = Expo
183183
=> _inner.ExportConfiguration(destination, options);
184184

185185
public override string ToString() => _inner.ToString();
186+
long? IInternalConnectionMultiplexer.GetConnectionId(EndPoint endPoint, ConnectionType type)
187+
=> _inner.GetConnectionId(endPoint, type);
186188
}
187189

188190
public void Dispose()
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
using System.Collections.Generic;
2+
using System.Linq;
3+
using System.Threading.Tasks;
4+
using Xunit;
5+
using Xunit.Abstractions;
6+
7+
namespace StackExchange.Redis.Tests.Issues
8+
{
9+
public class Issue2507 : TestBase
10+
{
11+
public Issue2507(ITestOutputHelper output, SharedConnectionFixture? fixture = null)
12+
: base(output, fixture) { }
13+
14+
[Fact]
15+
public async Task Execute()
16+
{
17+
using var conn = Create();
18+
var db = conn.GetDatabase();
19+
var pubsub = conn.GetSubscriber();
20+
var queue = await pubsub.SubscribeAsync(RedisChannel.Literal("__redis__:invalidate"));
21+
await Task.Delay(100);
22+
var connectionId = conn.GetConnectionId(conn.GetEndPoints().Single(), ConnectionType.Subscription);
23+
if (connectionId is null) Skip.Inconclusive("Connection id not available");
24+
await db.StringSetAsync(new KeyValuePair<RedisKey, RedisValue>[] { new("abc", "def"), new("ghi", "jkl"), new("mno", "pqr") });
25+
// this is not supported, but: we want it to at least not fail
26+
await db.ExecuteAsync("CLIENT", "TRACKING", "on", "REDIRECT", connectionId!.Value, "BCAST");
27+
await db.KeyDeleteAsync(new RedisKey[] { "abc", "ghi", "mno" });
28+
await Task.Delay(100);
29+
queue.Unsubscribe();
30+
Assert.True(queue.TryRead(out var message));
31+
Assert.Equal("abc", message.Message);
32+
Assert.True(queue.TryRead(out message));
33+
Assert.Equal("ghi", message.Message);
34+
Assert.True(queue.TryRead(out message));
35+
Assert.Equal("mno", message.Message);
36+
Assert.False(queue.TryRead(out message));
37+
}
38+
}
39+
}

0 commit comments

Comments
 (0)