Skip to content

Commit e0ffddd

Browse files
Cleanup of RPC Project. Extracted Metadata encoder into separate file. Cleanup of Service sample code.
1 parent 15dcff7 commit e0ffddd

File tree

8 files changed

+36
-194
lines changed

8 files changed

+36
-194
lines changed

RSocket.RPC.Tests/RSocketServiceTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public void ServerBasicTest()
3636

3737

3838
[System.Runtime.CompilerServices.CompilerGenerated]
39-
public class TestService : RSocketService<TestService>
39+
public class TestService : RSocketService
4040
{
4141
private const string SERVICE = nameof(TestService);
4242
public TestService(RSocketClient client) : base(client) { }

RSocket.RPC/IRSocketRPCStream.cs

Lines changed: 0 additions & 9 deletions
This file was deleted.

RSocket.RPC/RSocketRPCClient.cs renamed to RSocket.RPC/RSocketService.Metadata.cs

Lines changed: 3 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,20 @@
11
using System;
22
using System.Buffers;
33
using System.Text;
4-
using System.Threading.Tasks;
5-
using RSocket;
64

75
namespace RSocket.RPC
86
{
9-
//TODO This has probably dissolved into the Service base class at this point.
10-
public class RSocketRPCClient
7+
partial class RSocketService
118
{
12-
readonly RSocketClient Client;
13-
14-
public RSocketRPCClient(RSocketClient client) { Client = client; }
15-
16-
public Task RequestChannel(IRSocketRPCStream stream, string service, string method, ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default, ReadOnlySequence<byte> tracing = default, int initial = RSocketClient.INITIALDEFAULT) => Client.RequestChannel(new RemoteProcedureCall(stream), data, new RemoteProcedureCall.RemoteProcedureCallMetadata(service, method, metadata, tracing), initial);
17-
public Task RequestStream(IRSocketRPCStream stream, string service, string method, ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default, ReadOnlySequence<byte> tracing = default, int initial = RSocketClient.INITIALDEFAULT) => Client.RequestStream(new RemoteProcedureCall(stream), data, new RemoteProcedureCall.RemoteProcedureCallMetadata(service, method, metadata, tracing), initial);
18-
public Task RequestFireAndForget(IRSocketRPCStream stream, string service, string method, ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default, ReadOnlySequence<byte> tracing = default) => Client.RequestFireAndForget(new RemoteProcedureCall(stream), data, new RemoteProcedureCall.RemoteProcedureCallMetadata(service, method, metadata, tracing));
19-
public Task RequestResponse(IRSocketRPCStream stream, string service, string method, ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default, ReadOnlySequence<byte> tracing = default) => Client.RequestResponse(new RemoteProcedureCall(stream), data, new RemoteProcedureCall.RemoteProcedureCallMetadata(service, method, metadata, tracing));
20-
}
21-
22-
public sealed class RemoteProcedureCall : IRSocketStream
23-
{
24-
private readonly IRSocketRPCStream Stream;
25-
26-
public RemoteProcedureCall(IRSocketRPCStream stream) { Stream = stream; }
27-
28-
public void OnCompleted() => Stream.OnCompleted();
29-
public void OnError(Exception error) => Stream.OnError(error);
30-
public void OnNext((ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data) value)
31-
{
32-
var rpc = new RemoteProcedureCallMetadata(value.metadata);
33-
Stream.OnNext((rpc.Service, rpc.Method, rpc.Metadata, value.data, rpc.Tracing));
34-
}
35-
36-
public ref struct RemoteProcedureCallMetadata //SPEC: https://github.com/rsocket/rsocket-rpc-java/blob/master/rsocket-rpc-core/src/main/java/io/rsocket/rpc/frames/Metadata.java
9+
ref struct RemoteProcedureCallMetadata //SPEC: https://github.com/rsocket/rsocket-rpc-java/blob/master/rsocket-rpc-core/src/main/java/io/rsocket/rpc/frames/Metadata.java
3710
{
3811
public const UInt16 VERSION = 1;
3912

4013
public string Service;
4114
public string Method;
4215
public ReadOnlySequence<byte> Tracing;
4316
public ReadOnlySequence<byte> Metadata;
44-
static public readonly Encoding DefaultEncoding = new UTF8Encoding(encoderShouldEmitUTF8Identifier: false); //TODO SPEC: Check on the Encoding.
17+
static public readonly Encoding DefaultEncoding = new UTF8Encoding(encoderShouldEmitUTF8Identifier: false);
4518
public int Length => sizeof(UInt16) + sizeof(UInt16) + DefaultEncoding.GetByteCount(Service) + sizeof(UInt16) + DefaultEncoding.GetByteCount(Method) + sizeof(UInt16) + (int)Tracing.Length + (int)Metadata.Length;
4619

4720
public RemoteProcedureCallMetadata(string service, string method, ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> tracing) { Service = service; Method = method; Metadata = metadata; Tracing = tracing; }

RSocket.RPC/RSocketService.cs

Lines changed: 13 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -7,47 +7,38 @@
77

88
namespace RSocket.RPC
99
{
10-
public abstract class RSocketService<T> : IRSocketStream
10+
public abstract partial class RSocketService
1111
{
1212
private readonly RSocketClient Client;
1313

1414
public RSocketService(RSocketClient client) { Client = client; }
1515

16-
protected void __RequestFireAndForget(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default) { Client.RequestFireAndForget(null, data, metadata); }
17-
18-
protected async Task<TResult> __RequestFireAndForget<TMessage, TResult>(string service, string method, TMessage message, Func<TMessage, byte[]> intransform, Func<byte[], TResult> outtransform, ReadOnlySequence<byte> metadata = default, ReadOnlySequence<byte> tracing = default) =>
19-
outtransform((await __RequestFireAndForget(service, method, new ReadOnlySequence<byte>(intransform(message)), metadata, tracing)).ToArray());
20-
21-
protected async Task<TResult> __RequestFireAndForget<TMessage, TResult>(string service, string method, TMessage message, Func<TMessage, ReadOnlySequence<byte>> intransform, Func<ReadOnlySequence<byte>, TResult> outtransform, ReadOnlySequence<byte> metadata = default, ReadOnlySequence<byte> tracing = default) =>
22-
outtransform(await __RequestFireAndForget(service, method, intransform(message), metadata, tracing));
23-
24-
protected async Task<ReadOnlySequence<byte>> __RequestFireAndForget(string service, string method, ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default, ReadOnlySequence<byte> tracing = default)
16+
protected Task __RequestFireAndForget<TMessage>(TMessage message, Func<TMessage, byte[]> intransform, ReadOnlySequence<byte> metadata = default, ReadOnlySequence<byte> tracing = default, string service = default, [CallerMemberName]string method = default) => __RequestFireAndForget(new ReadOnlySequence<byte>(intransform(message)), metadata, tracing, service: service, method: method);
17+
protected Task __RequestFireAndForget<TMessage>(TMessage message, Func<TMessage, ReadOnlySequence<byte>> intransform, ReadOnlySequence<byte> metadata = default, ReadOnlySequence<byte> tracing = default, string service = default, [CallerMemberName]string method = default) => __RequestFireAndForget(intransform(message), metadata, tracing, service: service, method: method);
18+
protected async Task __RequestFireAndForget(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default, ReadOnlySequence<byte> tracing = default, string service = default, [CallerMemberName]string method = default)
2519
{
2620
var receiver = new Receiver();
27-
await Client.RequestResponse(receiver, data, new RemoteProcedureCall.RemoteProcedureCallMetadata(service, method, metadata, tracing));
28-
return await receiver.Task.ConfigureAwait(false);
21+
await Client.RequestFireAndForget(receiver, data, new RemoteProcedureCallMetadata(service, method, metadata, tracing)).ConfigureAwait(false);
22+
receiver.TrySetResult(default);
23+
await receiver.Awaitable;
2924
}
3025

3126

32-
protected async Task<TResult> __RequestResponse<TMessage, TResult>(TMessage message, Func<TMessage, byte[]> intransform, Func<byte[], TResult> outtransform, ReadOnlySequence<byte> metadata = default, ReadOnlySequence<byte> tracing = default, string service = default, [CallerMemberName]string method = default) =>
33-
outtransform((await __RequestResponse(new ReadOnlySequence<byte>(intransform(message)), metadata, tracing, service: service, method: method)).ToArray());
34-
35-
protected async Task<TResult> __RequestResponse<TMessage, TResult>(TMessage message, Func<TMessage, ReadOnlySequence<byte>> intransform, Func<ReadOnlySequence<byte>, TResult> outtransform, ReadOnlySequence<byte> metadata = default, ReadOnlySequence<byte> tracing = default, string service = default, [CallerMemberName]string method = default) =>
36-
outtransform(await __RequestResponse(intransform(message), metadata, tracing, service: service, method: method));
37-
27+
protected async Task<TResult> __RequestResponse<TMessage, TResult>(TMessage message, Func<TMessage, byte[]> intransform, Func<byte[], TResult> outtransform, ReadOnlySequence<byte> metadata = default, ReadOnlySequence<byte> tracing = default, string service = default, [CallerMemberName]string method = default) => outtransform((await __RequestResponse(new ReadOnlySequence<byte>(intransform(message)), metadata, tracing, service: service, method: method)).ToArray());
28+
protected async Task<TResult> __RequestResponse<TMessage, TResult>(TMessage message, Func<TMessage, ReadOnlySequence<byte>> intransform, Func<ReadOnlySequence<byte>, TResult> outtransform, ReadOnlySequence<byte> metadata = default, ReadOnlySequence<byte> tracing = default, string service = default, [CallerMemberName]string method = default) => outtransform(await __RequestResponse(intransform(message), metadata, tracing, service: service, method: method));
3829
protected async Task<ReadOnlySequence<byte>> __RequestResponse(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default, ReadOnlySequence<byte> tracing = default, string service = default, [CallerMemberName]string method = default)
3930
{
4031
var receiver = new Receiver();
41-
await Client.RequestResponse(receiver, data, new RemoteProcedureCall.RemoteProcedureCallMetadata(service, method, metadata, tracing));
42-
return await receiver.Task.ConfigureAwait(false);
32+
await Client.RequestResponse(receiver, data, new RemoteProcedureCallMetadata(service, method, metadata, tracing));
33+
return await receiver.Awaitable;
4334
}
4435

4536
//TODO Ask about semantics of this - should it execute the server call before subscription?
4637

4738
protected async Task<ReadOnlySequence<byte>> __RequestStream<TResult>(string service, string method, ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default, ReadOnlySequence<byte> tracing = default)
4839
{
4940
var receiver = new Receiver();
50-
await Client.RequestStream(receiver, data, new RemoteProcedureCall.RemoteProcedureCallMetadata(service, method, metadata, tracing), initial: 3); //TODO Policy!!
41+
await Client.RequestStream(receiver, data, new RemoteProcedureCallMetadata(service, method, metadata, tracing), initial: 3); //TODO Policy!!
5142
return await receiver.Task.ConfigureAwait(false);
5243
}
5344

@@ -56,42 +47,12 @@ protected async Task<ReadOnlySequence<byte>> __RequestStream<TResult>(string ser
5647
//protected void RequestChannel(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default) { Client.RequestChannel(null, data, metadata); } //TODO Initial?
5748

5849

59-
//private class Receiver : Receiver<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)>, IRSocketStream { }
60-
6150
private class Receiver : TaskCompletionSource<ReadOnlySequence<byte>>, IRSocketStream
6251
{
6352
public void OnCompleted() { }
6453
public void OnError(Exception error) => base.SetException(error);
6554
public void OnNext((ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data) value) => base.SetResult(value.data);
55+
public ConfiguredTaskAwaitable<ReadOnlySequence<byte>> Awaitable => base.Task.ConfigureAwait(false);
6656
}
67-
68-
69-
public void Dispatch()
70-
{
71-
}
72-
73-
//If a Completion arrives before the first dispatch, forward directly.
74-
void IObserver<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)>.OnCompleted()
75-
{
76-
throw new NotImplementedException();
77-
}
78-
79-
void IObserver<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)>.OnError(Exception error)
80-
{
81-
throw new NotImplementedException();
82-
}
83-
84-
void IObserver<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)>.OnNext((ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data) value)
85-
{
86-
throw new NotImplementedException();
87-
}
88-
89-
90-
private Lazy<List<System.Reflection.MethodInfo>> Methods = new Lazy<List<System.Reflection.MethodInfo>>(() => GetMethods());
91-
92-
static public List<System.Reflection.MethodInfo> GetMethods() => (
93-
from method in typeof(T).GetMethods()
94-
where method.IsPublic && method.DeclaringType == typeof(T)
95-
select method).ToList();
9657
}
9758
}

RSocket.sln

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RSocket.RPC", "RSocket.RPC\
2020
EndProject
2121
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{E01A7BD9-16F7-4B6D-B273-F5FE5C319259}"
2222
ProjectSection(SolutionItems) = preProject
23+
.gitignore = .gitignore
2324
.travis.yml = .travis.yml
2425
EndProjectSection
2526
EndProject
2627
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RSocketRPCSample", "RSocketRPCSample\RSocketRPCSample.csproj", "{8B113782-0E9C-4E3F-A1F4-865F01EE12DD}"
2728
EndProject
28-
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RSocket.RPC.Tests", "RSocket.RPC.Tests\RSocket.RPC.Tests.csproj", "{BD527265-1280-469C-9BF5-AC51E4474254}"
29+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RSocket.RPC.Tests", "RSocket.RPC.Tests\RSocket.RPC.Tests.csproj", "{BD527265-1280-469C-9BF5-AC51E4474254}"
2930
EndProject
3031
Global
3132
GlobalSection(SolutionConfigurationPlatforms) = preSolution

RSocketRPCSample/EchoService.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@ interface IEchoService
2222
}
2323

2424
[System.Runtime.CompilerServices.CompilerGenerated]
25-
public class EchoService : RSocketService<EchoService>, IEchoService
25+
public class EchoService : RSocketService, IEchoService
2626
{
2727
private const string SERVICE = "io.rsocket.rpc.echo" + "." + nameof(EchoService);
2828

2929
public EchoService(RSocketClient client) : base(client) { }
3030

31+
public Task fireAndForget(Google.Protobuf.WellKnownTypes.BytesValue message, ReadOnlySequence<byte> metadata = default) => __RequestFireAndForget(message, Google.Protobuf.MessageExtensions.ToByteArray, metadata, service: SERVICE);
32+
3133
public Task<Google.Protobuf.WellKnownTypes.BytesValue> requestResponse(Google.Protobuf.WellKnownTypes.BytesValue message, ReadOnlySequence<byte> metadata = default) => __RequestResponse(message, Google.Protobuf.MessageExtensions.ToByteArray, Google.Protobuf.WellKnownTypes.BytesValue.Parser.ParseFrom, metadata, service: SERVICE);
3234

3335
//public void fireAndForget(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default) { Client.RequestFireAndForget(null, data, metadata); }

RSocketRPCSample/Program.cs

Lines changed: 13 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -15,96 +15,31 @@ namespace RSocketRPCSample
1515

1616
class Program
1717
{
18-
//TODO Connection Cleanup on Unsubscribe/failure/etc
19-
//TODO General Error handling -> OnError
20-
21-
private class ConsoleStream : IRSocketRPCStream
22-
{
23-
public void OnCompleted()
24-
{
25-
Console.WriteLine($"Request.OnCompleted");
26-
}
27-
28-
public void OnError(Exception error)
29-
{
30-
Console.WriteLine($"Request.OnError: {error.ToString()}");
31-
}
32-
33-
public void OnNext((string Service, string Method, ReadOnlySequence<byte> Metadata, ReadOnlySequence<byte> Data, ReadOnlySequence<byte> Tracing) value)
34-
{
35-
var data = Encoding.UTF8.GetString(value.Data.ToArray());
36-
Console.WriteLine($"Request.OnNext {value.Service}.{value.Method}({data}) [{value.Metadata.Length}, {value.Tracing.Length}]");
37-
}
38-
}
39-
4018
static async Task Main(string[] args)
4119
{
42-
var methods = EchoService.GetMethods();
43-
methods.ForEach(method => Console.WriteLine($"Service Method: {method.Name}"));
44-
45-
//var client = new RSocketClient(new RSocketWebSocketClient("ws://rsocket-demo.herokuapp.com/ws")); //await client.RequestStream("peace", initial: 2);
46-
//var client = new RSocketClient(new RSocketWebSocketClient("ws://localhost:9092/"));
47-
20+
//Create a new Client.
4821
var client = new RSocketClient(
4922
new WebSocketTransport("ws://localhost:9092/"));
50-
//new SocketTransport("tcp://localhost:9091/"));
51-
52-
await client.ConnectAsync();
53-
54-
55-
var data = new ReadOnlySequence<byte>(Encoding.UTF8.GetBytes("TEST VALUES!"));
23+
// new SocketTransport("tcp://localhost:9091/"));
5624

25+
//Bind a Service to this Client.
5726
var service = new EchoService(client);
5827

59-
var result = await service.requestResponse(new BytesValue() { Value = ByteString.CopyFromUtf8("TEST VALUES!!") });
60-
61-
Console.WriteLine($"Result: {result.ToString()}");
62-
63-
//var rpcclient = new RSocketRPCClient(client);
64-
65-
//await rpcclient.RequestStream(new ConsoleStream(), "EchoService", "requestStream", data);
66-
67-
Console.ReadKey();
68-
69-
//Console.WriteLine("Requesting Demo Stream...");
70-
71-
//var obj = new Person() { Id = 1234, Name = "Someone Person", Address = new Address() { Line1 = "123 Any Street", Line2 = "Somewhere, LOC" } };
72-
//var meta = new Person() { Id = 567, Name = "Meta Person", Address = new Address() { Line1 = "", Line2 = "" } };
73-
//var req = new ProtobufNetSerializer().Serialize(obj).ToArray(); // Encoding.UTF8.GetBytes(Newtonsoft.Json.JsonConvert.SerializeObject(obj));
74-
// //TODO req is awkward here, probably need to have incoming and return types...
75-
76-
////var rr = client.RequestStream<Person, Person, Person, Person>(data: obj);
77-
78-
//var personclient = client.Of<Person, Person>();
79-
//var stream = from data in personclient.RequestStream(obj, meta, initial: 3)
80-
// //where value.StartsWith("q")
81-
// select data.Data;
82-
83-
//using (stream.Subscribe(
84-
// onNext: value => Console.WriteLine($"Demo.OnNext===>{value}"), onCompleted: () => Console.WriteLine($"Demo.OnComplete!\n")))
85-
86-
//using (personclient.RequestChannel(obj).Subscribe(
87-
// onNext: value => Console.WriteLine($"RequestChannel.OnNext ===>{value}"), onCompleted: () => Console.WriteLine($"RequestChannel.OnComplete!\n")))
28+
//Connect to a Server and establish communications.
29+
await client.ConnectAsync();
8830

89-
//using (personclient.RequestStream(obj).Subscribe(
90-
// onNext: value => Console.WriteLine($"RequestStream.OnNext ===>{value}"), onCompleted: () => Console.WriteLine($"RequestStream.OnComplete!\n")))
31+
//Make a service method call with no return to the server.
32+
await service.fireAndForget(new BytesValue() { Value = ByteString.CopyFromUtf8($"{nameof(EchoService.fireAndForget)}: Calling service...") });
9133

92-
//using (personclient.RequestResponse(obj).Subscribe(
93-
// onNext: value => Console.WriteLine($"RequestResponse.OnNext ===>{value}"), onCompleted: () => Console.WriteLine($"RequestResponse.OnComplete!\n")))
34+
//Make a service method call returning a single value.
35+
var result = await service.requestResponse(new BytesValue() { Value = ByteString.CopyFromUtf8($"{nameof(EchoService.requestResponse)}: Calling service...") });
36+
Console.WriteLine($"Sample Result: {result.Value.ToStringUtf8()}");
9437

95-
//using (personclient.RequestFireAndForget(obj).Subscribe(
96-
// onNext: value => Console.WriteLine($"RequestFireAndForget.OnNext ===>{value}"), onCompleted: () => Console.WriteLine($"RequestFireAndForget.OnComplete!\n")))
97-
//{
98-
// Console.ReadKey();
99-
//}
10038

10139

102-
//var sender = from index in Observable.Interval(TimeSpan.FromSeconds(1)) select new Person() { Id = (int)index, Name = $"Person #{index:0000}" };
103-
//using (personclient.RequestChannel(obj).Subscribe(
104-
// onNext: value => Console.WriteLine($"RequestChannel.OnNext ===>{value}"), onCompleted: () => Console.WriteLine($"RequestChannel.OnComplete!")))
105-
//{
106-
// Console.ReadKey();
107-
//}
40+
//Wait for a keypress to end session.
41+
Console.WriteLine($"Press any key to continue...");
42+
Console.ReadKey();
10843
}
10944
}
11045
}

0 commit comments

Comments
 (0)