Skip to content

Commit 2f1be23

Browse files
authored
Add Exchange version (#277)
* Add Exchange version Implement #153 Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent a9ee96a commit 2f1be23

File tree

6 files changed

+168
-4
lines changed

6 files changed

+168
-4
lines changed

RabbitMQ.Stream.Client/Client.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,12 @@ public async Task<RouteQueryResponse> QueryRoute(string superStream, string rout
364364
new RouteQueryRequest(corr, superStream, routingKey)).ConfigureAwait(false);
365365
}
366366

367+
public async Task<CommandVersionsResponse> ExchangeVersions()
368+
{
369+
return await Request<CommandVersionsRequest, CommandVersionsResponse>(corr =>
370+
new CommandVersionsRequest(corr)).ConfigureAwait(false);
371+
}
372+
367373
private async ValueTask<TOut> Request<TIn, TOut>(Func<uint, TIn> request, TimeSpan? timeout = null)
368374
where TIn : struct, ICommand where TOut : struct, ICommand
369375
{
@@ -553,6 +559,10 @@ private void HandleCorrelatedCommand(ushort tag, ref ReadOnlySequence<byte> fram
553559
RouteQueryResponse.Read(frame, out var routeQueryResponse);
554560
HandleCorrelatedResponse(routeQueryResponse);
555561
break;
562+
case CommandVersionsResponse.Key:
563+
CommandVersionsResponse.Read(frame, out var commandVersionsResponse);
564+
HandleCorrelatedResponse(commandVersionsResponse);
565+
break;
556566
default:
557567
if (MemoryMarshal.TryGetArray(frame.First, out var segment))
558568
{
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
// Copyright (c) 2007-2023 VMware, Inc.
4+
5+
using System;
6+
7+
namespace RabbitMQ.Stream.Client;
8+
9+
public readonly struct CommandVersionsRequest : ICommand
10+
{
11+
private const ushort Key = 0x001b;
12+
private readonly uint _correlationId;
13+
private readonly ICommandVersions[] _commands = { };
14+
15+
public CommandVersionsRequest(uint correlationId)
16+
{
17+
_correlationId = correlationId;
18+
}
19+
20+
public int SizeNeeded
21+
{
22+
get
23+
{
24+
var size = 2 + 2 + 4
25+
+ 4 + // _commands.Length
26+
_commands.Length * (2 + 2 + 2);
27+
return size;
28+
}
29+
}
30+
31+
public int Write(Span<byte> span)
32+
{
33+
var offset = WireFormatting.WriteUInt16(span, Key);
34+
offset += WireFormatting.WriteUInt16(span[offset..], ((ICommand)this).Version);
35+
offset += WireFormatting.WriteUInt32(span[offset..], _correlationId);
36+
offset += WireFormatting.WriteInt32(span[offset..], _commands.Length);
37+
38+
foreach (var iCommandVersions in _commands)
39+
{
40+
offset += WireFormatting.WriteUInt16(span[offset..], iCommandVersions.Command);
41+
offset += WireFormatting.WriteUInt16(span[offset..], iCommandVersions.MinVersion);
42+
offset += WireFormatting.WriteUInt16(span[offset..], iCommandVersions.MaxVersion);
43+
}
44+
45+
return offset;
46+
}
47+
}
48+
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
// Copyright (c) 2007-2023 VMware, Inc.
4+
5+
using System;
6+
using System.Buffers;
7+
using System.Collections.Generic;
8+
9+
namespace RabbitMQ.Stream.Client;
10+
11+
public struct CommandVersionsResponse : ICommand
12+
{
13+
internal const ushort Key = 0x001b;
14+
15+
private CommandVersionsResponse(uint correlationId, ResponseCode responseCode, List<ICommandVersions> commands)
16+
{
17+
CorrelationId = correlationId;
18+
ResponseCode = responseCode;
19+
Commands = commands;
20+
}
21+
22+
public int SizeNeeded { get => throw new NotImplementedException(); }
23+
public int Write(Span<byte> span) => throw new NotImplementedException();
24+
25+
public uint CorrelationId { get; }
26+
public ResponseCode ResponseCode { get; }
27+
28+
public List<ICommandVersions> Commands { get; }
29+
30+
internal static int Read(ReadOnlySequence<byte> frame, out CommandVersionsResponse command)
31+
{
32+
var offset = WireFormatting.ReadUInt16(frame, out _);
33+
offset += WireFormatting.ReadUInt16(frame.Slice(offset), out _);
34+
offset += WireFormatting.ReadUInt32(frame.Slice(offset), out var correlation);
35+
offset += WireFormatting.ReadUInt16(frame.Slice(offset), out var responseCode);
36+
offset += WireFormatting.ReadUInt32(frame.Slice(offset), out var commandLen);
37+
var commands = new List<ICommandVersions>();
38+
39+
for (var i = 0; i < commandLen; i++)
40+
{
41+
offset += WireFormatting.ReadUInt16(frame.Slice(offset), out var commandKey);
42+
offset += WireFormatting.ReadUInt16(frame.Slice(offset), out var minVersion);
43+
offset += WireFormatting.ReadUInt16(frame.Slice(offset), out var maxVersion);
44+
commands.Add(new CommandVersions(commandKey, minVersion, maxVersion));
45+
}
46+
47+
command = new CommandVersionsResponse(correlation, (ResponseCode)responseCode, commands);
48+
return offset;
49+
}
50+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
// Copyright (c) 2007-2023 VMware, Inc.
4+
5+
namespace RabbitMQ.Stream.Client;
6+
7+
public interface ICommandVersions
8+
{
9+
public ushort MaxVersion { get; }
10+
public ushort MinVersion { get; }
11+
public ushort Command { get; }
12+
}
13+
14+
public class CommandVersions : ICommandVersions
15+
{
16+
public CommandVersions(ushort command, ushort minVersion, ushort maxVersion)
17+
{
18+
Command = command;
19+
MinVersion = minVersion;
20+
MaxVersion = maxVersion;
21+
}
22+
23+
public ushort MaxVersion { get; }
24+
public ushort MinVersion { get; }
25+
public ushort Command { get; }
26+
}

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,33 @@ RabbitMQ.Stream.Client.AuthMechanismNotSupportedException
99
RabbitMQ.Stream.Client.AuthMechanismNotSupportedException.AuthMechanismNotSupportedException(string s) -> void
1010
RabbitMQ.Stream.Client.Chunk.Data.get -> System.Memory<byte>
1111
RabbitMQ.Stream.Client.Chunk.MagicVersion.get -> byte
12+
RabbitMQ.Stream.Client.Client.ExchangeVersions() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.CommandVersionsResponse>
1213
RabbitMQ.Stream.Client.Client.QueryRoute(string superStream, string routingKey) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.RouteQueryResponse>
1314
RabbitMQ.Stream.Client.Client.StreamStats(string stream) -> System.Threading.Tasks.ValueTask<RabbitMQ.Stream.Client.StreamStatsResponse>
1415
RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.get -> RabbitMQ.Stream.Client.AuthMechanism
1516
RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.set -> void
17+
RabbitMQ.Stream.Client.CommandVersions
18+
RabbitMQ.Stream.Client.CommandVersions.Command.get -> ushort
19+
RabbitMQ.Stream.Client.CommandVersions.CommandVersions(ushort command, ushort minVersion, ushort maxVersion) -> void
20+
RabbitMQ.Stream.Client.CommandVersions.MaxVersion.get -> ushort
21+
RabbitMQ.Stream.Client.CommandVersions.MinVersion.get -> ushort
22+
RabbitMQ.Stream.Client.CommandVersionsRequest
23+
RabbitMQ.Stream.Client.CommandVersionsRequest.CommandVersionsRequest() -> void
24+
RabbitMQ.Stream.Client.CommandVersionsRequest.CommandVersionsRequest(uint correlationId) -> void
25+
RabbitMQ.Stream.Client.CommandVersionsRequest.SizeNeeded.get -> int
26+
RabbitMQ.Stream.Client.CommandVersionsRequest.Write(System.Span<byte> span) -> int
27+
RabbitMQ.Stream.Client.CommandVersionsResponse
28+
RabbitMQ.Stream.Client.CommandVersionsResponse.Commands.get -> System.Collections.Generic.List<RabbitMQ.Stream.Client.ICommandVersions>
29+
RabbitMQ.Stream.Client.CommandVersionsResponse.CommandVersionsResponse() -> void
30+
RabbitMQ.Stream.Client.CommandVersionsResponse.CorrelationId.get -> uint
31+
RabbitMQ.Stream.Client.CommandVersionsResponse.ResponseCode.get -> RabbitMQ.Stream.Client.ResponseCode
32+
RabbitMQ.Stream.Client.CommandVersionsResponse.SizeNeeded.get -> int
33+
RabbitMQ.Stream.Client.CommandVersionsResponse.Write(System.Span<byte> span) -> int
1634
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
35+
RabbitMQ.Stream.Client.ICommandVersions
36+
RabbitMQ.Stream.Client.ICommandVersions.Command.get -> ushort
37+
RabbitMQ.Stream.Client.ICommandVersions.MaxVersion.get -> ushort
38+
RabbitMQ.Stream.Client.ICommandVersions.MinVersion.get -> ushort
1739
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.get -> ushort
1840
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.set -> void
1941
RabbitMQ.Stream.Client.IRoutingStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>

Tests/ClientTests.cs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -119,10 +119,7 @@ public async void PublishShouldError()
119119
await client.CreateStream(stream, new Dictionary<string, string>());
120120
var testPassed = new TaskCompletionSource<bool>();
121121

122-
Action<ReadOnlyMemory<ulong>> confirmed = (pubIds) =>
123-
{
124-
testPassed.SetResult(false);
125-
};
122+
Action<ReadOnlyMemory<ulong>> confirmed = (pubIds) => { testPassed.SetResult(false); };
126123

127124
Action<(ulong, ResponseCode)[]> errored = (errors) =>
128125
{
@@ -415,5 +412,16 @@ await Assert.ThrowsAsync<VirtualHostAccessFailureException>(
415412
async () => { await Client.Create(clientParameters); }
416413
);
417414
}
415+
416+
[Fact]
417+
public async void ExchangeVersionCommandsShouldNotBeEmpty()
418+
{
419+
var clientParameters = new ClientParameters { };
420+
var client = await Client.Create(clientParameters);
421+
var response = await client.ExchangeVersions();
422+
Assert.Equal(ResponseCode.Ok, response.ResponseCode);
423+
Assert.True(response.Commands.Count > 0);
424+
await client.Close("done");
425+
}
418426
}
419427
}

0 commit comments

Comments
 (0)