Skip to content

Commit 6c6c566

Browse files
committed
First attempt to improve NIO example
1 parent 95446b1 commit 6c6c566

File tree

7 files changed

+198
-183
lines changed

7 files changed

+198
-183
lines changed

Sources/SWIMNIOExample/Coding.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ extension SWIM.NIOPeer: Codable {
3333
guard let channel = decoder.userInfo[.channelUserInfoKey] as? Channel else {
3434
fatalError("Expected channelUserInfoKey to be present in userInfo, unable to decode SWIM.NIOPeer!")
3535
}
36-
self.init(node: node, channel: channel)
36+
try self.init(node: node, channel: channel)
3737
}
3838

3939
public nonisolated func encode(to encoder: Encoder) throws {

Sources/SWIMNIOExample/NIOPeer.swift

Lines changed: 49 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,20 @@ public extension SWIM {
2626
self.swimNode
2727
}
2828

29-
internal let channel: Channel
29+
internal let channel: NIOAsyncChannel<SWIM.Message, SWIMNIOWriteCommand>
3030

31-
public init(node: Node, channel: Channel) {
31+
public init(
32+
node: Node,
33+
channel: any Channel
34+
) throws {
35+
self.swimNode = node
36+
self.channel = try .init(wrappingChannelSynchronously: channel)
37+
}
38+
39+
public init(
40+
node: Node,
41+
channel: NIOAsyncChannel<SWIM.Message, SWIMNIOWriteCommand>
42+
) {
3243
self.swimNode = node
3344
self.channel = channel
3445
}
@@ -39,27 +50,22 @@ public extension SWIM {
3950
timeout: Swift.Duration,
4051
sequenceNumber: SWIM.SequenceNumber
4152
) async throws -> PingResponse<SWIM.NIOPeer, SWIM.NIOPeer> {
42-
try await withCheckedThrowingContinuation { continuation in
53+
try await channel.executeThenClose { [swimNode] inbound, outbound in
4354
let message = SWIM.Message.ping(replyTo: origin, payload: payload, sequenceNumber: sequenceNumber)
44-
let command = SWIMNIOWriteCommand(message: message, to: self.swimNode, replyTimeout: timeout.toNIO, replyCallback: { reply in
45-
switch reply {
46-
case .success(.response(.nack(_, _))):
47-
continuation.resume(throwing: SWIMNIOIllegalMessageTypeError("Unexpected .nack reply to .ping message! Was: \(reply)"))
48-
49-
case .success(.response(let pingResponse)):
55+
let command = SWIMNIOWriteCommand(message: message, to: swimNode, replyTimeout: timeout)
56+
try await outbound.write(command)
57+
for try await message in inbound {
58+
switch message {
59+
case .response(.nack(_, _)):
60+
throw SWIMNIOIllegalMessageTypeError("Unexpected .nack reply to .ping message! Was: \(message)")
61+
case .response(let pingResponse):
5062
assert(sequenceNumber == pingResponse.sequenceNumber, "callback invoked with not matching sequence number! Submitted with \(sequenceNumber) but invoked with \(pingResponse.sequenceNumber)!")
51-
continuation.resume(returning: pingResponse)
52-
53-
case .failure(let error):
54-
continuation.resume(throwing: error)
55-
56-
case .success(let other):
57-
continuation.resume(throwing:
58-
SWIMNIOIllegalMessageTypeError("Unexpected message, got: [\(other)]:\(reflecting: type(of: other)) while expected \(PingResponse<SWIM.NIOPeer, SWIM.NIOPeer>.self)"))
63+
return pingResponse
64+
default:
65+
throw SWIMNIOIllegalMessageTypeError("Unexpected message, got: [\(message)]:\(reflecting: type(of: message)) while expected \(PingResponse<SWIM.NIOPeer, SWIM.NIOPeer>.self)")
5966
}
60-
})
61-
62-
self.channel.writeAndFlush(command, promise: nil)
67+
}
68+
throw SWIMNIOIllegalMessageTypeError("Unexpected message, got: [\(message)]:\(reflecting: type(of: message)) while expected \(PingResponse<SWIM.NIOPeer, SWIM.NIOPeer>.self)")
6369
}
6470
}
6571

@@ -70,23 +76,20 @@ public extension SWIM {
7076
timeout: Duration,
7177
sequenceNumber: SWIM.SequenceNumber
7278
) async throws -> PingResponse<SWIM.NIOPeer, SWIM.NIOPeer> {
73-
try await withCheckedThrowingContinuation { continuation in
79+
try await channel.executeThenClose { [node] inbound, outbound in
7480
let message = SWIM.Message.pingRequest(target: target, replyTo: origin, payload: payload, sequenceNumber: sequenceNumber)
75-
let command = SWIMNIOWriteCommand(message: message, to: self.node, replyTimeout: timeout.toNIO, replyCallback: { reply in
76-
switch reply {
77-
case .success(.response(let pingResponse)):
81+
let command = SWIMNIOWriteCommand(message: message, to: node, replyTimeout: timeout)
82+
try await outbound.write(command)
83+
for try await message in inbound {
84+
switch message {
85+
case .response(let pingResponse):
7886
assert(sequenceNumber == pingResponse.sequenceNumber, "callback invoked with not matching sequence number! Submitted with \(sequenceNumber) but invoked with \(pingResponse.sequenceNumber)!")
79-
continuation.resume(returning: pingResponse)
80-
81-
case .failure(let error):
82-
continuation.resume(throwing: error)
83-
84-
case .success(let other):
85-
continuation.resume(throwing: SWIMNIOIllegalMessageTypeError("Unexpected message, got: \(other) while expected \(PingResponse<SWIM.NIOPeer, SWIM.NIOPeer>.self)"))
87+
return pingResponse
88+
default:
89+
throw SWIMNIOIllegalMessageTypeError("Unexpected message, got: [\(message)]:\(reflecting: type(of: message)) while expected \(PingResponse<SWIM.NIOPeer, SWIM.NIOPeer>.self)")
8690
}
87-
})
88-
89-
self.channel.writeAndFlush(command, promise: nil)
91+
}
92+
throw SWIMNIOIllegalMessageTypeError("Unexpected message, got: [\(message)]:\(reflecting: type(of: message)) while expected \(PingResponse<SWIM.NIOPeer, SWIM.NIOPeer>.self)")
9093
}
9194
}
9295

@@ -97,19 +100,27 @@ public extension SWIM {
97100
payload: GossipPayload<SWIM.NIOPeer>?
98101
) {
99102
let message = SWIM.Message.response(.ack(target: target, incarnation: incarnation, payload: payload, sequenceNumber: sequenceNumber))
100-
let command = SWIMNIOWriteCommand(message: message, to: self.node, replyTimeout: .seconds(0), replyCallback: nil)
103+
let command = SWIMNIOWriteCommand(message: message, to: self.node, replyTimeout: .seconds(0))
101104

102-
self.channel.writeAndFlush(command, promise: nil)
105+
Task {
106+
try await channel.executeThenClose { inbound, outbound in
107+
try await outbound.write(command)
108+
}
109+
}
103110
}
104111

105112
public func nack(
106113
acknowledging sequenceNumber: SWIM.SequenceNumber,
107114
target: SWIM.NIOPeer
108115
) {
109116
let message = SWIM.Message.response(.nack(target: target, sequenceNumber: sequenceNumber))
110-
let command = SWIMNIOWriteCommand(message: message, to: self.node, replyTimeout: .seconds(0), replyCallback: nil)
117+
let command = SWIMNIOWriteCommand(message: message, to: self.node, replyTimeout: .seconds(0))
111118

112-
self.channel.writeAndFlush(command, promise: nil)
119+
Task {
120+
try await channel.executeThenClose { inbound, outbound in
121+
try await outbound.write(command)
122+
}
123+
}
113124
}
114125

115126
public nonisolated var description: String {

Sources/SWIMNIOExample/SWIMNIOHandler.swift

Lines changed: 61 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ public final class SWIMNIOHandler: ChannelDuplexHandler, Sendable {
4747
set { self._metrics.withLock { $0 = newValue } }
4848
}
4949

50-
private let _pendingReplyCallbacks: Mutex<[PendingResponseCallbackIdentifier: (@Sendable (Result<SWIM.Message, Error>) -> Void)]> = .init([:])
51-
var pendingReplyCallbacks: [PendingResponseCallbackIdentifier: (@Sendable (Result<SWIM.Message, Error>) -> Void)] {
50+
private let _pendingReplyCallbacks: Mutex<[PendingResponseCallbackIdentifier: Task<SWIM.Message, Never>]> = .init([:])
51+
var pendingReplyCallbacks: [PendingResponseCallbackIdentifier: Task<SWIM.Message, Never>] {
5252
get { self._pendingReplyCallbacks.withLock { $0 } }
5353
set { self._pendingReplyCallbacks.withLock { $0 = newValue } }
5454
}
@@ -68,7 +68,7 @@ public final class SWIMNIOHandler: ChannelDuplexHandler, Sendable {
6868
var settings = self.settings
6969
let node = self.settings.swim.node ?? Node(protocol: "udp", host: hostIP, port: hostPort, uid: .random(in: 0 ..< UInt64.max))
7070
settings.swim.node = node
71-
self.shell = SWIMNIOShell(
71+
self.shell = try! SWIMNIOShell(
7272
node: node,
7373
settings: settings,
7474
channel: context.channel,
@@ -107,34 +107,33 @@ public final class SWIMNIOHandler: ChannelDuplexHandler, Sendable {
107107
// TODO: note that this impl does not handle "new node on same host/port" yet
108108

109109
// register and manage reply callback ------------------------------
110-
if let replyCallback = writeCommand.replyCallback {
111-
let sequenceNumber = writeCommand.message.sequenceNumber
112-
#if DEBUG
113-
let callbackKey = PendingResponseCallbackIdentifier(peerAddress: writeCommand.recipient, sequenceNumber: sequenceNumber, inResponseTo: writeCommand.message)
114-
#else
115-
let callbackKey = PendingResponseCallbackIdentifier(peerAddress: writeCommand.recipient, sequenceNumber: sequenceNumber)
116-
#endif
117-
118-
let timeoutTask = context.eventLoop.scheduleTask(in: writeCommand.replyTimeout) {
119-
if let callback = self.pendingReplyCallbacks.removeValue(forKey: callbackKey) {
120-
callback(.failure(
121-
SWIMNIOTimeoutError(
122-
timeout: writeCommand.replyTimeout,
123-
message: "Timeout of [\(callbackKey)], no reply to [\(writeCommand.message.messageCaseDescription)] after \(writeCommand.replyTimeout.prettyDescription())"
124-
)
125-
))
126-
} // else, task fired already (should have been removed)
127-
}
128-
129-
self.log.trace("Store callback: \(callbackKey)", metadata: [
130-
"message": "\(writeCommand.message)",
131-
"pending/callbacks": Logger.MetadataValue.array(self.pendingReplyCallbacks.map { "\($0)" }),
132-
])
133-
self.pendingReplyCallbacks[callbackKey] = { reply in
134-
timeoutTask.cancel() // when we trigger the callback, we should also cancel the timeout task
135-
replyCallback(reply) // successful reply received
136-
}
137-
}
110+
// if let replyCallback = writeCommand.replyCallback {
111+
// let sequenceNumber = writeCommand.message.sequenceNumber
112+
// #if DEBUG
113+
// let callbackKey = PendingResponseCallbackIdentifier(peerAddress: writeCommand.recipient, sequenceNumber: sequenceNumber, inResponseTo: writeCommand.message)
114+
// #else
115+
// let callbackKey = PendingResponseCallbackIdentifier(peerAddress: writeCommand.recipient, sequenceNumber: sequenceNumber)
116+
// #endif
117+
//
118+
// let timeoutTask = Task {
119+
// try await Task.sleep(for: writeCommand.replyTimeout)
120+
// if let task = self.pendingReplyCallbacks.removeValue(forKey: callbackKey) {
121+
// throw SWIMNIOTimeoutError(
122+
// timeout: writeCommand.replyTimeout,
123+
// message: "Timeout of [\(callbackKey)], no reply to [\(writeCommand.message.messageCaseDescription)] after \(writeCommand.replyTimeout.prettyDescription())"
124+
// )
125+
// } // else, task fired already (should have been removed)
126+
// }
127+
//
128+
// self.log.trace("Store callback: \(callbackKey)", metadata: [
129+
// "message": "\(writeCommand.message)",
130+
// "pending/callbacks": Logger.MetadataValue.array(self.pendingReplyCallbacks.map { "\($0)" }),
131+
// ])
132+
// self.pendingReplyCallbacks[callbackKey] = Task { reply in
133+
// timeoutTask.cancel()
134+
// try await replyCallback(reply) // successful reply received
135+
// }
136+
// }
138137

139138
// serialize & send message ----------------------------------------
140139
let buffer = try self.serialize(message: writeCommand.message, using: context.channel.allocator)
@@ -167,26 +166,26 @@ public final class SWIMNIOHandler: ChannelDuplexHandler, Sendable {
167166

168167
if message.isResponse {
169168
// if it's a reply, invoke the pending callback ------
170-
// TODO: move into the shell: https://github.com/apple/swift-cluster-membership/issues/41
171-
#if DEBUG
172-
let callbackKey = PendingResponseCallbackIdentifier(peerAddress: remoteAddress, sequenceNumber: message.sequenceNumber, inResponseTo: nil)
173-
#else
174-
let callbackKey = PendingResponseCallbackIdentifier(peerAddress: remoteAddress, sequenceNumber: message.sequenceNumber)
175-
#endif
176-
177-
if let index = self.pendingReplyCallbacks.index(forKey: callbackKey) {
178-
let (storedKey, callback) = self.pendingReplyCallbacks.remove(at: index)
179-
// TODO: UIDs of nodes matter
180-
self.log.trace("Received response, key: \(callbackKey); Invoking callback...", metadata: [
181-
"pending/callbacks": Logger.MetadataValue.array(self.pendingReplyCallbacks.map { "\($0)" }),
182-
])
183-
self.metrics?.pingResponseTime.recordNanoseconds(storedKey.nanosecondsSinceCallbackStored().nanoseconds)
184-
callback(.success(message))
185-
} else {
186-
self.log.trace("No callback for \(callbackKey); It may have been removed due to a timeout already.", metadata: [
187-
"pending callbacks": Logger.MetadataValue.array(self.pendingReplyCallbacks.map { "\($0)" }),
188-
])
189-
}
169+
// // TODO: move into the shell: https://github.com/apple/swift-cluster-membership/issues/41
170+
// #if DEBUG
171+
// let callbackKey = PendingResponseCallbackIdentifier(peerAddress: remoteAddress, sequenceNumber: message.sequenceNumber, inResponseTo: nil)
172+
// #else
173+
// let callbackKey = PendingResponseCallbackIdentifier(peerAddress: remoteAddress, sequenceNumber: message.sequenceNumber)
174+
// #endif
175+
//
176+
// if let index = self.pendingReplyCallbacks.index(forKey: callbackKey) {
177+
// let (storedKey, callback) = self.pendingReplyCallbacks.remove(at: index)
178+
// // TODO: UIDs of nodes matter
179+
// self.log.trace("Received response, key: \(callbackKey); Invoking callback...", metadata: [
180+
// "pending/callbacks": Logger.MetadataValue.array(self.pendingReplyCallbacks.map { "\($0)" }),
181+
// ])
182+
// self.metrics?.pingResponseTime.recordNanoseconds(storedKey.nanosecondsSinceCallbackStored().nanoseconds)
183+
// callback(.success(message))
184+
// } else {
185+
// self.log.trace("No callback for \(callbackKey); It may have been removed due to a timeout already.", metadata: [
186+
// "pending callbacks": Logger.MetadataValue.array(self.pendingReplyCallbacks.map { "\($0)" }),
187+
// ])
188+
// }
190189
} else {
191190
// deliver to the shell ------------------------------
192191
self.shell.receiveMessage(message: message)
@@ -252,17 +251,22 @@ public struct SWIMNIOWriteCommand: Sendable {
252251
/// Address of recipient peer where the message should be written to.
253252
public let recipient: SocketAddress
254253

255-
/// If the `replyCallback` is set, what timeout should be set for a reply to come back from the peer.
256-
public let replyTimeout: NIO.TimeAmount
257-
/// Callback to be invoked (calling into the SWIMNIOShell) when a reply to this message arrives.
258-
public let replyCallback: (@Sendable (Result<SWIM.Message, Error>) -> Void)?
254+
// /// If the `replyCallback` is set, what timeout should be set for a reply to come back from the peer.
255+
public let replyTimeout: Duration
256+
// /// Callback to be invoked (calling into the SWIMNIOShell) when a reply to this message arrives.
257+
// public let replyCallback: (@Sendable (SWIM.Message) async throws -> Void)?
259258

260259
/// Create a write command.
261-
public init(message: SWIM.Message, to recipient: Node, replyTimeout: TimeAmount, replyCallback: (@Sendable (Result<SWIM.Message, Error>) -> Void)?) {
260+
public init(
261+
message: SWIM.Message,
262+
to recipient: Node,
263+
replyTimeout: Duration
264+
// replyCallback: (@Sendable (SWIM.Message) async throws -> Void)?
265+
) {
262266
self.message = message
263267
self.recipient = try! .init(ipAddress: recipient.host, port: recipient.port) // try!-safe since the host/port is always safe
264268
self.replyTimeout = replyTimeout
265-
self.replyCallback = replyCallback
269+
// self.replyCallback = replyCallback
266270
}
267271
}
268272

0 commit comments

Comments
 (0)