@@ -51,8 +51,8 @@ public final class SWIMNIOShell: Sendable {
5151 }
5252
5353 /// Cancellable of the periodicPingTimer (if it was kicked off)
54- private let _nextPeriodicTickCancellable : Mutex < SWIMCancellable ? > = . init( . none)
55- private var nextPeriodicTickCancellable : SWIMCancellable ? {
54+ private let _nextPeriodicTickCancellable : Mutex < Task < Void , any Error > ? > = . init( . none)
55+ private var nextPeriodicTickCancellable : Task < Void , any Error > ? {
5656 get { _nextPeriodicTickCancellable. withLock { $0 } }
5757 set { _nextPeriodicTickCancellable. withLock { $0 = newValue } }
5858 }
@@ -66,7 +66,6 @@ public final class SWIMNIOShell: Sendable {
6666 self . settings = settings
6767
6868 self . channel = try . init( wrappingChannelSynchronously: channel)
69- // self.eventLoop = channel.
7069
7170 let myself = try SWIM . NIOPeer ( node: node, channel: channel)
7271 self . myself = myself
@@ -97,12 +96,6 @@ public final class SWIMNIOShell: Sendable {
9796 /// Upon shutdown the myself member is marked as `.dead`, although it should not be expected to spread this
9897 /// information to other nodes. It technically could, but it is not expected not required to.
9998 public func receiveShutdown( ) {
100- // guard self.eventLoop.inEventLoop else {
101- // return self.eventLoop.execute {
102- // self.receiveShutdown()
103- // }
104- // }
105-
10699 self . nextPeriodicTickCancellable? . cancel ( )
107100 switch self . swim. confirmDead ( peer: self . peer) {
108101 case . applied( let change) :
@@ -112,26 +105,10 @@ public final class SWIMNIOShell: Sendable {
112105 ( ) // ok
113106 }
114107 }
115-
116- /// Start a *single* timer, to run the passed task after given delay.
117- // @discardableResult
118- // private func schedule(delay: Duration, _ task: @Sendable @escaping () -> Void) -> SWIMCancellable {
119- // self.eventLoop.assertInEventLoop()
120-
121- // let scheduled: Scheduled<Void> = self.eventLoop.scheduleTask(in: delay.toNIO) { () in task() }
122- // return SWIMCancellable { scheduled.cancel() }
123- // }
124-
125108 // ==== ------------------------------------------------------------------------------------------------------------
126109 // MARK: Receiving messages
127110
128111 public func receiveMessage( message: SWIM . Message ) {
129- // guard self.eventLoop.inEventLoop else {
130- // return self.eventLoop.execute {
131- // self.receiveMessage(message: message)
132- // }
133- // }
134-
135112 self . tracelog ( . receive, message: " \( message) " )
136113
137114 switch message {
@@ -148,12 +125,6 @@ public final class SWIMNIOShell: Sendable {
148125
149126 /// Allows for typical local interactions with the shell
150127 public func receiveLocalMessage( message: SWIM . LocalMessage ) {
151- // guard self.eventLoop.inEventLoop else {
152- // return self.eventLoop.execute {
153- // self.receiveLocalMessage(message: message)
154- // }
155- // }
156-
157128 self . tracelog ( . receive, message: " \( message) " )
158129
159130 switch message {
@@ -166,12 +137,6 @@ public final class SWIMNIOShell: Sendable {
166137 }
167138
168139 private func receivePing( pingOrigin: SWIM . NIOPeer , payload: SWIM . GossipPayload < SWIM . NIOPeer > ? , sequenceNumber: SWIM . SequenceNumber ) {
169- // guard self.eventLoop.inEventLoop else {
170- // return self.eventLoop.execute {
171- // self.receivePing(pingOrigin: pingOrigin, payload: payload, sequenceNumber: sequenceNumber)
172- // }
173- // }
174-
175140 self . log. trace ( " Received ping@ \( sequenceNumber) " , metadata: self . swim. metadata ( [
176141 " swim/ping/pingOrigin " : " \( pingOrigin. swimNode) " ,
177142 " swim/ping/payload " : " \( String ( describing: payload) ) " ,
@@ -199,12 +164,6 @@ public final class SWIMNIOShell: Sendable {
199164 payload: SWIM . GossipPayload < SWIM . NIOPeer > ? ,
200165 sequenceNumber: SWIM . SequenceNumber
201166 ) {
202- // guard self.eventLoop.inEventLoop else {
203- // return self.eventLoop.execute {
204- // self.receivePingRequest(target: target, pingRequestOrigin: pingRequestOrigin, payload: payload, sequenceNumber: sequenceNumber)
205- // }
206- // }
207-
208167 self . log. trace ( " Received pingRequest " , metadata: [
209168 " swim/pingRequest/origin " : " \( pingRequestOrigin. node) " ,
210169 " swim/pingRequest/sequenceNumber " : " \( sequenceNumber) " ,
@@ -244,12 +203,6 @@ public final class SWIMNIOShell: Sendable {
244203 pingRequestOriginPeer: SWIM . NIOPeer ? ,
245204 pingRequestSequenceNumber: SWIM . SequenceNumber ?
246205 ) {
247- // guard self.eventLoop.inEventLoop else {
248- // return self.eventLoop.execute {
249- // self.receivePingResponse(response: response, pingRequestOriginPeer: pingRequestOriginPeer, pingRequestSequenceNumber: pingRequestSequenceNumber)
250- // }
251- // }
252-
253206 self . log. trace ( " Receive ping response: \( response) " , metadata: self . swim. metadata ( [
254207 " swim/pingRequest/origin " : " \( pingRequestOriginPeer, orElse: " nil " ) " ,
255208 " swim/pingRequest/sequenceNumber " : " \( pingRequestSequenceNumber, orElse: " nil " ) " ,
@@ -283,11 +236,6 @@ public final class SWIMNIOShell: Sendable {
283236 }
284237
285238 func receiveEveryPingRequestResponse( result: SWIM . PingResponse < SWIM . NIOPeer , SWIM . NIOPeer > , pingedPeer: SWIM . NIOPeer ) {
286- // guard self.eventLoop.inEventLoop else {
287- // return self.eventLoop.execute {
288- // self.receiveEveryPingRequestResponse(result: result, pingedPeer: pingedPeer)
289- // }
290- // }
291239 self . tracelog ( . receive( pinged: pingedPeer) , message: " \( result) " )
292240 let directives = self . swim. onEveryPingRequestResponse ( result, pinged: pingedPeer)
293241 if !directives. isEmpty {
@@ -301,12 +249,6 @@ public final class SWIMNIOShell: Sendable {
301249 }
302250
303251 func receivePingRequestResponse( result: SWIM . PingResponse < SWIM . NIOPeer , SWIM . NIOPeer > , pingedPeer: SWIM . NIOPeer ) {
304- // guard self.eventLoop.inEventLoop else {
305- // return self.eventLoop.execute {
306- // self.receivePingRequestResponse(result: result, pingedPeer: pingedPeer)
307- // }
308- // }
309-
310252 self . tracelog ( . receive( pinged: pingedPeer) , message: " \( result) " )
311253 // TODO: do we know here WHO replied to us actually? We know who they told us about (with the ping-req), could be useful to know
312254
@@ -394,7 +336,6 @@ public final class SWIMNIOShell: Sendable {
394336 // We are only interested in successful pings, as a single success tells us the node is
395337 // still alive. Therefore we propagate only the first success, but no failures.
396338 // The failure case is handled through the timeout of the whole operation.
397- // let firstSuccessPromise = self.eventLoop.makePromise(of: SWIM.PingResponse<SWIM.NIOPeer, SWIM.NIOPeer>.self)
398339 let pingTimeout = directive. timeout
399340 let target = directive. target
400341 let startedSendingPingRequestsSentAt : ContinuousClock . Instant = . now
@@ -415,7 +356,8 @@ public final class SWIMNIOShell: Sendable {
415356 target: target,
416357 payload: payload,
417358 from: self . peer,
418- timeout: pingTimeout, sequenceNumber: sequenceNumber
359+ timeout: pingTimeout,
360+ sequenceNumber: sequenceNumber
419361 )
420362
421363 // we only record successes
@@ -428,10 +370,19 @@ public final class SWIMNIOShell: Sendable {
428370 // While this has a slight timing implication on time timeout of the pings -- the node that is last
429371 // in the list that we ping, has slightly less time to fulfil the "total ping timeout"; as we set a total timeout on the entire `firstSuccess`.
430372 // In practice those timeouts will be relatively large (seconds) and the few millis here should not have a large impact on correctness.
431- // firstSuccessPromise.succeed(response)
373+ self . swim. metrics. shell. pingRequestResponseTimeFirst. record ( duration: startedSendingPingRequestsSentAt. duration ( to: . now) )
374+ self . receivePingRequestResponse ( result: response, pingedPeer: target)
432375 }
433376 } catch {
434- self . receiveEveryPingRequestResponse ( result: . timeout( target: target, pingRequestOrigin: self . myself, timeout: pingTimeout, sequenceNumber: sequenceNumber) , pingedPeer: target)
377+ self . receiveEveryPingRequestResponse (
378+ result: . timeout(
379+ target: target,
380+ pingRequestOrigin: self . myself,
381+ timeout: pingTimeout,
382+ sequenceNumber: sequenceNumber
383+ ) ,
384+ pingedPeer: target
385+ )
435386 // these are generally harmless thus we do not want to log them on higher levels
436387 self . log. trace ( " Failed pingRequest " , metadata: [
437388 " swim/target " : " \( target) " ,
@@ -443,19 +394,6 @@ public final class SWIMNIOShell: Sendable {
443394 }
444395 }
445396 }
446-
447- // // guaranteed to be on "our" EL
448- // firstSuccessPromise.futureResult.whenComplete { result in
449- // switch result {
450- // case .success(let response):
451- // self.swim.metrics.shell.pingRequestResponseTimeFirst.record(duration: startedSendingPingRequestsSentAt.duration(to: .now))
452- // self.receivePingRequestResponse(result: response, pingedPeer: target)
453- //
454- // case .failure(let error):
455- // self.log.debug("Failed to pingRequest via \(directive.requestDetails.count) peers", metadata: ["pingRequest/target": "\(target)", "error": "\(error)"])
456- // self.receivePingRequestResponse(result: .timeout(target: target, pingRequestOrigin: nil, timeout: pingTimeout, sequenceNumber: 0), pingedPeer: target) // sequence number does not matter
457- // }
458- // }
459397 }
460398
461399 // ==== ------------------------------------------------------------------------------------------------------------
@@ -465,8 +403,6 @@ public final class SWIMNIOShell: Sendable {
465403 ///
466404 /// This is the heart of the periodic gossip performed by SWIM.
467405 func handlePeriodicProtocolPeriodTick( ) {
468- // self.eventLoop.assertInEventLoop()
469-
470406 let directives = self . swim. onPeriodicPingTick ( )
471407 for directive in directives {
472408 switch directive {
@@ -480,23 +416,17 @@ public final class SWIMNIOShell: Sendable {
480416 }
481417
482418 case . scheduleNextTick( let delay) :
483- break
484- // self.nextPeriodicTickCancellable = self.schedule(delay : delay) {
485- // self.handlePeriodicProtocolPeriodTick()
486- // }
419+ self . nextPeriodicTickCancellable = Task {
420+ try await Task . sleep ( for : delay)
421+ self . handlePeriodicProtocolPeriodTick ( )
422+ }
487423 }
488424 }
489425 }
490426
491427 /// Extra functionality, allowing external callers to ask this swim shell to start monitoring a specific node.
492428 // TODO: Add some attempts:Int + maxAttempts: Int and handle them appropriately; https://github.com/apple/swift-cluster-membership/issues/32
493429 private func receiveStartMonitoring( node: Node ) {
494- // guard self.eventLoop.inEventLoop else {
495- // return self.eventLoop.execute {
496- // self.receiveStartMonitoring(node: node)
497- // }
498- // }
499-
500430 guard self . node. withoutUID != node. withoutUID else {
501431 return // no need to monitor ourselves, nor a replacement of us (if node is our replacement, we should have been dead already)
502432 }
0 commit comments