Skip to content

Commit 99fe662

Browse files
committed
First step to transform pending closures
1 parent 95446b1 commit 99fe662

File tree

3 files changed

+72
-77
lines changed

3 files changed

+72
-77
lines changed

Sources/SWIMNIOExample/NIOPeer.swift

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -41,23 +41,19 @@ public extension SWIM {
4141
) async throws -> PingResponse<SWIM.NIOPeer, SWIM.NIOPeer> {
4242
try await withCheckedThrowingContinuation { continuation in
4343
let message = SWIM.Message.ping(replyTo: origin, payload: payload, sequenceNumber: sequenceNumber)
44-
let command = SWIMNIOWriteCommand(message: message, to: self.swimNode, replyTimeout: timeout.toNIO, replyCallback: { reply in
44+
let command = SWIMNIOWriteCommand(message: message, to: self.swimNode, replyTimeout: timeout.toNIO) { reply in
4545
switch reply {
46-
case .success(.response(.nack(_, _))):
46+
case .success(.nack(_, _)):
4747
continuation.resume(throwing: SWIMNIOIllegalMessageTypeError("Unexpected .nack reply to .ping message! Was: \(reply)"))
48-
49-
case .success(.response(let pingResponse)):
48+
49+
case .success(let pingResponse):
5050
assert(sequenceNumber == pingResponse.sequenceNumber, "callback invoked with not matching sequence number! Submitted with \(sequenceNumber) but invoked with \(pingResponse.sequenceNumber)!")
5151
continuation.resume(returning: pingResponse)
52-
52+
5353
case .failure(let error):
5454
continuation.resume(throwing: error)
55-
56-
case .success(let other):
57-
continuation.resume(throwing:
58-
SWIMNIOIllegalMessageTypeError("Unexpected message, got: [\(other)]:\(reflecting: type(of: other)) while expected \(PingResponse<SWIM.NIOPeer, SWIM.NIOPeer>.self)"))
5955
}
60-
})
56+
}
6157

6258
self.channel.writeAndFlush(command, promise: nil)
6359
}
@@ -72,19 +68,16 @@ public extension SWIM {
7268
) async throws -> PingResponse<SWIM.NIOPeer, SWIM.NIOPeer> {
7369
try await withCheckedThrowingContinuation { continuation in
7470
let message = SWIM.Message.pingRequest(target: target, replyTo: origin, payload: payload, sequenceNumber: sequenceNumber)
75-
let command = SWIMNIOWriteCommand(message: message, to: self.node, replyTimeout: timeout.toNIO, replyCallback: { reply in
71+
let command = SWIMNIOWriteCommand(message: message, to: self.node, replyTimeout: timeout.toNIO) { reply in
7672
switch reply {
77-
case .success(.response(let pingResponse)):
73+
case .success(let pingResponse):
7874
assert(sequenceNumber == pingResponse.sequenceNumber, "callback invoked with not matching sequence number! Submitted with \(sequenceNumber) but invoked with \(pingResponse.sequenceNumber)!")
7975
continuation.resume(returning: pingResponse)
8076

8177
case .failure(let error):
8278
continuation.resume(throwing: error)
83-
84-
case .success(let other):
85-
continuation.resume(throwing: SWIMNIOIllegalMessageTypeError("Unexpected message, got: \(other) while expected \(PingResponse<SWIM.NIOPeer, SWIM.NIOPeer>.self)"))
8679
}
87-
})
80+
}
8881

8982
self.channel.writeAndFlush(command, promise: nil)
9083
}

Sources/SWIMNIOExample/SWIMNIOHandler.swift

Lines changed: 22 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,6 @@ public final class SWIMNIOHandler: ChannelDuplexHandler, Sendable {
4646
get { self._metrics.withLock { $0 } }
4747
set { self._metrics.withLock { $0 = newValue } }
4848
}
49-
50-
private let _pendingReplyCallbacks: Mutex<[PendingResponseCallbackIdentifier: (@Sendable (Result<SWIM.Message, Error>) -> Void)]> = .init([:])
51-
var pendingReplyCallbacks: [PendingResponseCallbackIdentifier: (@Sendable (Result<SWIM.Message, Error>) -> Void)] {
52-
get { self._pendingReplyCallbacks.withLock { $0 } }
53-
set { self._pendingReplyCallbacks.withLock { $0 = newValue } }
54-
}
5549

5650
public init(settings: SWIMNIO.Settings) {
5751
self.settings = settings
@@ -116,7 +110,7 @@ public final class SWIMNIOHandler: ChannelDuplexHandler, Sendable {
116110
#endif
117111

118112
let timeoutTask = context.eventLoop.scheduleTask(in: writeCommand.replyTimeout) {
119-
if let callback = self.pendingReplyCallbacks.removeValue(forKey: callbackKey) {
113+
if let callback = self.shell.pendingReplyCallbacks.removeValue(forKey: callbackKey) {
120114
callback(.failure(
121115
SWIMNIOTimeoutError(
122116
timeout: writeCommand.replyTimeout,
@@ -128,9 +122,9 @@ public final class SWIMNIOHandler: ChannelDuplexHandler, Sendable {
128122

129123
self.log.trace("Store callback: \(callbackKey)", metadata: [
130124
"message": "\(writeCommand.message)",
131-
"pending/callbacks": Logger.MetadataValue.array(self.pendingReplyCallbacks.map { "\($0)" }),
125+
"pending/callbacks": Logger.MetadataValue.array(self.shell.pendingReplyCallbacks.map { "\($0)" }),
132126
])
133-
self.pendingReplyCallbacks[callbackKey] = { reply in
127+
self.shell.pendingReplyCallbacks[callbackKey] = { @Sendable reply in
134128
timeoutTask.cancel() // when we trigger the callback, we should also cancel the timeout task
135129
replyCallback(reply) // successful reply received
136130
}
@@ -164,33 +158,11 @@ public final class SWIMNIOHandler: ChannelDuplexHandler, Sendable {
164158
"swim/message/type": "\(message.messageCaseDescription)",
165159
"swim/message": "\(message)",
166160
])
167-
168-
if message.isResponse {
169-
// if it's a reply, invoke the pending callback ------
170-
// TODO: move into the shell: https://github.com/apple/swift-cluster-membership/issues/41
171-
#if DEBUG
172-
let callbackKey = PendingResponseCallbackIdentifier(peerAddress: remoteAddress, sequenceNumber: message.sequenceNumber, inResponseTo: nil)
173-
#else
174-
let callbackKey = PendingResponseCallbackIdentifier(peerAddress: remoteAddress, sequenceNumber: message.sequenceNumber)
175-
#endif
176-
177-
if let index = self.pendingReplyCallbacks.index(forKey: callbackKey) {
178-
let (storedKey, callback) = self.pendingReplyCallbacks.remove(at: index)
179-
// TODO: UIDs of nodes matter
180-
self.log.trace("Received response, key: \(callbackKey); Invoking callback...", metadata: [
181-
"pending/callbacks": Logger.MetadataValue.array(self.pendingReplyCallbacks.map { "\($0)" }),
182-
])
183-
self.metrics?.pingResponseTime.recordNanoseconds(storedKey.nanosecondsSinceCallbackStored().nanoseconds)
184-
callback(.success(message))
185-
} else {
186-
self.log.trace("No callback for \(callbackKey); It may have been removed due to a timeout already.", metadata: [
187-
"pending callbacks": Logger.MetadataValue.array(self.pendingReplyCallbacks.map { "\($0)" }),
188-
])
189-
}
190-
} else {
191-
// deliver to the shell ------------------------------
192-
self.shell.receiveMessage(message: message)
193-
}
161+
// deliver to the shell ------------------------------
162+
self.shell.receiveMessage(
163+
message: message,
164+
from: remoteAddress
165+
)
194166
} catch {
195167
self.log.error("Read failed: \(error)", metadata: [
196168
"remoteAddress": "\(remoteAddress)",
@@ -255,10 +227,10 @@ public struct SWIMNIOWriteCommand: Sendable {
255227
/// If the `replyCallback` is set, what timeout should be set for a reply to come back from the peer.
256228
public let replyTimeout: NIO.TimeAmount
257229
/// Callback to be invoked (calling into the SWIMNIOShell) when a reply to this message arrives.
258-
public let replyCallback: (@Sendable (Result<SWIM.Message, Error>) -> Void)?
230+
public let replyCallback: (@Sendable (Result<SWIM.PingResponse<SWIM.NIOPeer, SWIM.NIOPeer>, Error>) -> Void)?
259231

260232
/// Create a write command.
261-
public init(message: SWIM.Message, to recipient: Node, replyTimeout: TimeAmount, replyCallback: (@Sendable (Result<SWIM.Message, Error>) -> Void)?) {
233+
public init(message: SWIM.Message, to recipient: Node, replyTimeout: TimeAmount, replyCallback: (@Sendable (Result<SWIM.PingResponse<SWIM.NIOPeer, SWIM.NIOPeer>, Error>) -> Void)?) {
262234
self.message = message
263235
self.recipient = try! .init(ipAddress: recipient.host, port: recipient.port) // try!-safe since the host/port is always safe
264236
self.replyTimeout = replyTimeout
@@ -303,6 +275,18 @@ struct PendingResponseCallbackIdentifier: Sendable, Hashable, CustomStringConver
303275
func nanosecondsSinceCallbackStored(now: ContinuousClock.Instant = .now) -> Duration {
304276
storedAt.duration(to: now)
305277
}
278+
279+
init(peerAddress: SocketAddress, sequenceNumber: SWIM.SequenceNumber, inResponseTo: SWIM.Message?) {
280+
self.peerAddress = peerAddress
281+
self.sequenceNumber = sequenceNumber
282+
self.inResponseTo = inResponseTo
283+
}
284+
285+
init(peer: Node, sequenceNumber: SWIM.SequenceNumber, inResponseTo: SWIM.Message?) {
286+
self.peerAddress = try! .init(ipAddress: peer.host, port: peer.port) // try!-safe since the host/port is always safe
287+
self.sequenceNumber = sequenceNumber
288+
self.inResponseTo = inResponseTo
289+
}
306290
}
307291

308292
// ==== ----------------------------------------------------------------------------------------------------------------

Sources/SWIMNIOExample/SWIMNIOShell.swift

Lines changed: 41 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ public final class SWIMNIOShell: Sendable {
5757
set { _nextPeriodicTickCancellable.withLock { $0 = newValue } }
5858
}
5959

60+
private let _pendingReplyCallbacks: Mutex<[PendingResponseCallbackIdentifier: (@Sendable (Result<SWIM.PingResponse<SWIM.NIOPeer, SWIM.NIOPeer>, Error>) -> Void)]> = .init([:])
61+
var pendingReplyCallbacks: [PendingResponseCallbackIdentifier: (@Sendable (Result<SWIM.PingResponse<SWIM.NIOPeer, SWIM.NIOPeer>, Error>) -> Void)] {
62+
get { self._pendingReplyCallbacks.withLock { $0 } }
63+
set { self._pendingReplyCallbacks.withLock { $0 = newValue } }
64+
}
65+
6066
internal init(
6167
node: Node,
6268
settings: SWIMNIO.Settings,
@@ -125,10 +131,10 @@ public final class SWIMNIOShell: Sendable {
125131
// ==== ------------------------------------------------------------------------------------------------------------
126132
// MARK: Receiving messages
127133

128-
public func receiveMessage(message: SWIM.Message) {
134+
public func receiveMessage(message: SWIM.Message, from address: SocketAddress) {
129135
guard self.eventLoop.inEventLoop else {
130136
return self.eventLoop.execute {
131-
self.receiveMessage(message: message)
137+
self.receiveMessage(message: message, from: address)
132138
}
133139
}
134140

@@ -141,8 +147,25 @@ public final class SWIMNIOShell: Sendable {
141147
case .pingRequest(let target, let pingRequestOrigin, let payload, let sequenceNumber):
142148
self.receivePingRequest(target: target, pingRequestOrigin: pingRequestOrigin, payload: payload, sequenceNumber: sequenceNumber)
143149

144-
case .response(let pingResponse):
145-
self.receivePingResponse(response: pingResponse, pingRequestOriginPeer: nil, pingRequestSequenceNumber: nil)
150+
case .response(let response):
151+
#if DEBUG
152+
let callbackKey = PendingResponseCallbackIdentifier(peerAddress: address, sequenceNumber: response.sequenceNumber, inResponseTo: nil)
153+
#else
154+
let callbackKey = PendingResponseCallbackIdentifier(peerAddress: address, sequenceNumber: response.sequenceNumber)
155+
#endif
156+
if let index = self.pendingReplyCallbacks.index(forKey: callbackKey) {
157+
let (storedKey, callback) = self.pendingReplyCallbacks.remove(at: index)
158+
// TODO: UIDs of nodes matter
159+
self.log.trace("Received response, key: \(callbackKey); Invoking callback...", metadata: [
160+
"pending/callbacks": Logger.MetadataValue.array(self.pendingReplyCallbacks.map { "\($0)" }),
161+
])
162+
self.swim.metrics.shell.pingResponseTime.recordNanoseconds(storedKey.nanosecondsSinceCallbackStored().nanoseconds)
163+
callback(.success(response))
164+
} else {
165+
self.log.trace("No callback for \(callbackKey); It may have been removed due to a timeout already.", metadata: [
166+
"pending callbacks": Logger.MetadataValue.array(self.pendingReplyCallbacks.map { "\($0)" }),
167+
])
168+
}
146169
}
147170
}
148171

@@ -241,8 +264,8 @@ public final class SWIMNIOShell: Sendable {
241264
/// - pingRequestOrigin: is set only when the ping that this is a reply to was originated as a `pingRequest`.
242265
func receivePingResponse(
243266
response: SWIM.PingResponse<SWIM.NIOPeer, SWIM.NIOPeer>,
244-
pingRequestOriginPeer: SWIM.NIOPeer?,
245-
pingRequestSequenceNumber: SWIM.SequenceNumber?
267+
pingRequestOriginPeer: SWIM.NIOPeer? = .none,
268+
pingRequestSequenceNumber: SWIM.SequenceNumber? = .none
246269
) {
247270
guard self.eventLoop.inEventLoop else {
248271
return self.eventLoop.execute {
@@ -257,7 +280,11 @@ public final class SWIMNIOShell: Sendable {
257280
"swim/response/sequenceNumber": "\(response.sequenceNumber)",
258281
]))
259282

260-
let directives = self.swim.onPingResponse(response: response, pingRequestOrigin: pingRequestOriginPeer, pingRequestSequenceNumber: pingRequestSequenceNumber)
283+
let directives = self.swim.onPingResponse(
284+
response: response,
285+
pingRequestOrigin: pingRequestOriginPeer,
286+
pingRequestSequenceNumber: pingRequestSequenceNumber
287+
)
261288
// optionally debug log all directives here
262289
directives.forEach { directive in
263290
switch directive {
@@ -394,7 +421,6 @@ public final class SWIMNIOShell: Sendable {
394421
// We are only interested in successful pings, as a single success tells us the node is
395422
// still alive. Therefore we propagate only the first success, but no failures.
396423
// The failure case is handled through the timeout of the whole operation.
397-
let firstSuccessPromise = self.eventLoop.makePromise(of: SWIM.PingResponse<SWIM.NIOPeer, SWIM.NIOPeer>.self)
398424
let pingTimeout = directive.timeout
399425
let target = directive.target
400426
let startedSendingPingRequestsSentAt: ContinuousClock.Instant = .now
@@ -415,7 +441,8 @@ public final class SWIMNIOShell: Sendable {
415441
target: target,
416442
payload: payload,
417443
from: self.peer,
418-
timeout: pingTimeout, sequenceNumber: sequenceNumber
444+
timeout: pingTimeout,
445+
sequenceNumber: sequenceNumber
419446
)
420447

421448
// we only record successes
@@ -428,7 +455,11 @@ public final class SWIMNIOShell: Sendable {
428455
// While this has a slight timing implication on time timeout of the pings -- the node that is last
429456
// in the list that we ping, has slightly less time to fulfil the "total ping timeout"; as we set a total timeout on the entire `firstSuccess`.
430457
// In practice those timeouts will be relatively large (seconds) and the few millis here should not have a large impact on correctness.
431-
firstSuccessPromise.succeed(response)
458+
self.eventLoop.execute {
459+
self.swim.metrics.shell.pingRequestResponseTimeFirst.record(duration: startedSendingPingRequestsSentAt.duration(to: .now))
460+
self.receivePingRequestResponse(result: response, pingedPeer: target)
461+
}
462+
432463
}
433464
} catch {
434465
self.receiveEveryPingRequestResponse(result: .timeout(target: target, pingRequestOrigin: self.myself, timeout: pingTimeout, sequenceNumber: sequenceNumber), pingedPeer: target)
@@ -443,19 +474,6 @@ public final class SWIMNIOShell: Sendable {
443474
}
444475
}
445476
}
446-
447-
// guaranteed to be on "our" EL
448-
firstSuccessPromise.futureResult.whenComplete { result in
449-
switch result {
450-
case .success(let response):
451-
self.swim.metrics.shell.pingRequestResponseTimeFirst.record(duration: startedSendingPingRequestsSentAt.duration(to: .now))
452-
self.receivePingRequestResponse(result: response, pingedPeer: target)
453-
454-
case .failure(let error):
455-
self.log.debug("Failed to pingRequest via \(directive.requestDetails.count) peers", metadata: ["pingRequest/target": "\(target)", "error": "\(error)"])
456-
self.receivePingRequestResponse(result: .timeout(target: target, pingRequestOrigin: nil, timeout: pingTimeout, sequenceNumber: 0), pingedPeer: target) // sequence number does not matter
457-
}
458-
}
459477
}
460478

461479
// ==== ------------------------------------------------------------------------------------------------------------

0 commit comments

Comments
 (0)