Skip to content

Commit 134a826

Browse files
Move SETINFO calls to ChannelHandler so they can be written along with HELLO (#172)
* Revert "Call CLIENT SETINFO on initialisation of connection (#170)" This reverts commit 5106ebf. Signed-off-by: Adam Fowler <adamfowler71@gmail.com> * Add support for sending additional command in handshake - Add ignore case to ValkeyPromise for commands whose results we don't care about - Add deque of pending commands to ConnectedState - Push remaining pending commands to active state once we receive the hello Signed-off-by: Adam Fowler <adamfowler71@gmail.com> * Add globals for library name and version Signed-off-by: Adam Fowler <adamfowler71@gmail.com> * Split hello from other pending commands Move version to Version.swift Rename ValkeyPromise.ignore to forget Signed-off-by: Adam Fowler <adamfowler71@gmail.com> * Fix benchmark Signed-off-by: Adam Fowler <adamfowler71@gmail.com> * Update Sources/Valkey/Connection/ValkeyChannelHandler.swift Co-authored-by: Fabian Fett <fabianfett@apple.com> Signed-off-by: Adam Fowler <adamfowler71@gmail.com> * Cannot cancel while in connected state Signed-off-by: Adam Fowler <adamfowler71@gmail.com> * Add create and drop benchmark Signed-off-by: Adam Fowler <adamfowler71@gmail.com> --------- Signed-off-by: Adam Fowler <adamfowler71@gmail.com> Co-authored-by: Fabian Fett <fabianfett@apple.com>
1 parent e169d7f commit 134a826

File tree

8 files changed

+84
-86
lines changed

8 files changed

+84
-86
lines changed

Sources/Valkey/Connection/ValkeyChannelHandler+stateMachine.swift

Lines changed: 27 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,8 @@ extension ValkeyChannelHandler {
6565
@usableFromInline
6666
struct ConnectedState {
6767
let context: Context
68-
var pendingHelloCommand: PendingCommand
69-
70-
func cancel(requestID: Int) -> PendingCommand? {
71-
if pendingHelloCommand.requestID == requestID {
72-
return pendingHelloCommand
73-
}
74-
return nil
75-
}
68+
let pendingHelloCommand: PendingCommand
69+
var pendingCommands: Deque<PendingCommand>
7670
}
7771

7872
init() {
@@ -85,11 +79,11 @@ extension ValkeyChannelHandler {
8579

8680
/// handler has become active
8781
@usableFromInline
88-
mutating func setConnected(context: Context, pendingHelloCommand: PendingCommand) {
82+
mutating func setConnected(context: Context, pendingHelloCommand: PendingCommand, pendingCommands: Deque<PendingCommand>) {
8983
switch consume self.state {
9084
case .initialized:
9185
self = .connected(
92-
.init(context: context, pendingHelloCommand: pendingHelloCommand)
86+
.init(context: context, pendingHelloCommand: pendingHelloCommand, pendingCommands: pendingCommands)
9387
)
9488
case .connected:
9589
preconditionFailure("Cannot set connected state when state is connected")
@@ -162,7 +156,7 @@ extension ValkeyChannelHandler {
162156
self = .closed(error)
163157
return .respondAndClose(state.pendingHelloCommand, error)
164158
default:
165-
self = .active(.init(context: state.context, pendingCommands: .init()))
159+
self = .active(.init(context: state.context, pendingCommands: state.pendingCommands))
166160
return .respond(state.pendingHelloCommand, .cancel)
167161
}
168162
case .active(var state):
@@ -204,8 +198,12 @@ extension ValkeyChannelHandler {
204198
self = .closed(nil)
205199
return .respondAndClose(command, nil)
206200
}
207-
case .closed:
208-
preconditionFailure("Cannot receive command on closed connection")
201+
case .closed(let error):
202+
guard let error else {
203+
preconditionFailure("Cannot receive command on closed connection with no error")
204+
}
205+
self = .closed(error)
206+
return .closeWithError(error)
209207
}
210208
}
211209

@@ -233,7 +231,7 @@ extension ValkeyChannelHandler {
233231
return .done
234232
case .closing(let state):
235233
self = .closing(state)
236-
return .done
234+
return .reportedClosed(nil)
237235
case .closed(let error):
238236
self = .closed(error)
239237
return .reportedClosed(error)
@@ -255,7 +253,9 @@ extension ValkeyChannelHandler {
255253
case .connected(let state):
256254
if state.pendingHelloCommand.deadline <= now {
257255
self = .closed(ValkeyClientError(.timeout))
258-
return .failPendingCommandsAndClose(state.context, [state.pendingHelloCommand])
256+
var pendingCommands = state.pendingCommands
257+
pendingCommands.prepend(state.pendingHelloCommand)
258+
return .failPendingCommandsAndClose(state.context, pendingCommands)
259259
} else {
260260
self = .connected(state)
261261
return .reschedule(state.pendingHelloCommand.deadline)
@@ -296,24 +296,14 @@ extension ValkeyChannelHandler {
296296
case doNothing
297297
}
298298

299-
/// handler wants to send a command
299+
/// handler wants to cancel a command
300300
@usableFromInline
301301
mutating func cancel(requestID: Int) -> CancelAction {
302302
switch consume self.state {
303303
case .initialized:
304304
preconditionFailure("Cannot cancel when initialized")
305-
case .connected(let state):
306-
if let command = state.cancel(requestID: requestID) {
307-
self = .closed(CancellationError())
308-
return .failPendingCommandsAndClose(
309-
state.context,
310-
cancel: [command],
311-
closeConnectionDueToCancel: []
312-
)
313-
} else {
314-
self = .connected(state)
315-
return .doNothing
316-
}
305+
case .connected:
306+
preconditionFailure("Cannot cancel while in connected state")
317307
case .active(let state):
318308
let (cancel, closeConnectionDueToCancel) = state.cancel(requestID: requestID)
319309
if cancel.count > 0 {
@@ -360,7 +350,9 @@ extension ValkeyChannelHandler {
360350
self = .closed(nil)
361351
return .doNothing
362352
case .connected(let state):
363-
self = .closing(.init(context: state.context, pendingCommands: [state.pendingHelloCommand]))
353+
var pendingCommands = state.pendingCommands
354+
pendingCommands.prepend(state.pendingHelloCommand)
355+
self = .closing(.init(context: state.context, pendingCommands: pendingCommands))
364356
return .waitForPendingCommands(state.context)
365357
case .active(let state):
366358
if state.pendingCommands.count > 0 {
@@ -393,7 +385,9 @@ extension ValkeyChannelHandler {
393385
return .doNothing
394386
case .connected(let state):
395387
self = .closed(nil)
396-
return .failPendingCommandsAndClose(state.context, [state.pendingHelloCommand])
388+
var pendingCommands = state.pendingCommands
389+
pendingCommands.prepend(state.pendingHelloCommand)
390+
return .failPendingCommandsAndClose(state.context, state.pendingCommands)
397391
case .active(let state):
398392
self = .closed(nil)
399393
return .failPendingCommandsAndClose(state.context, state.pendingCommands)
@@ -421,7 +415,9 @@ extension ValkeyChannelHandler {
421415
return .doNothing
422416
case .connected(let state):
423417
self = .closed(nil)
424-
return .failPendingCommandsAndSubscriptions([state.pendingHelloCommand])
418+
var pendingCommands = state.pendingCommands
419+
pendingCommands.prepend(state.pendingHelloCommand)
420+
return .failPendingCommandsAndSubscriptions(state.pendingCommands)
425421
case .active(let state):
426422
self = .closed(nil)
427423
return .failPendingCommandsAndSubscriptions(state.pendingCommands)

Sources/Valkey/Connection/ValkeyChannelHandler.swift

Lines changed: 16 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -159,33 +159,6 @@ final class ValkeyChannelHandler: ChannelInboundHandler {
159159
}
160160
}
161161

162-
/// Write valkey command/commands to channel
163-
/// - Parameters:
164-
/// - request: Valkey command request
165-
/// - promise: Promise to fulfill when command is complete
166-
@inlinable
167-
func writeAndForget<Command: ValkeyCommand>(command: Command, requestID: Int) {
168-
self.eventLoop.assertInEventLoop()
169-
let pendingCommand = PendingCommand(
170-
promise: .forget,
171-
requestID: requestID,
172-
deadline: .now() + self.configuration.commandTimeout
173-
)
174-
switch self.stateMachine.sendCommand(pendingCommand) {
175-
case .sendCommand(let context):
176-
self.encoder.reset()
177-
command.encode(into: &self.encoder)
178-
let buffer = self.encoder.buffer
179-
context.writeAndFlush(self.wrapOutboundOut(buffer), promise: nil)
180-
if self.deadlineCallback == nil {
181-
self.scheduleDeadlineCallback(deadline: .now() + self.configuration.commandTimeout)
182-
}
183-
184-
case .throwError:
185-
break
186-
}
187-
}
188-
189162
@usableFromInline
190163
func write(request: ValkeyRequest) {
191164
self.eventLoop.assertInEventLoop()
@@ -307,25 +280,35 @@ final class ValkeyChannelHandler: ChannelInboundHandler {
307280
@usableFromInline
308281
func setConnected(context: ChannelHandlerContext) {
309282
// Send initial HELLO command
310-
let command = HELLO(
283+
let helloCommand = HELLO(
311284
arguments: .init(
312285
protover: 3,
313286
auth: configuration.authentication.map { .init(username: $0.username, password: $0.password) },
314287
clientname: configuration.clientName
315288
)
316289
)
290+
// set client info
291+
let clientInfoLibName = CLIENT.SETINFO(attr: .libname(valkeySwiftLibraryName))
292+
let clientInfoLibVersion = CLIENT.SETINFO(attr: .libver(valkeySwiftLibraryVersion))
293+
317294
self.encoder.reset()
318-
command.encode(into: &self.encoder)
319-
let buffer = self.encoder.buffer
295+
helloCommand.encode(into: &self.encoder)
296+
clientInfoLibName.encode(into: &self.encoder)
297+
clientInfoLibVersion.encode(into: &self.encoder)
320298

321299
let promise = eventLoop.makePromise(of: RESPToken.self)
300+
322301
let deadline = .now() + self.configuration.commandTimeout
323-
context.writeAndFlush(self.wrapOutboundOut(buffer), promise: nil)
324-
scheduleDeadlineCallback(deadline: deadline)
302+
context.writeAndFlush(self.wrapOutboundOut(self.encoder.buffer), promise: nil)
303+
self.scheduleDeadlineCallback(deadline: deadline)
325304

326305
self.stateMachine.setConnected(
327306
context: context,
328-
pendingHelloCommand: .init(promise: .nio(promise), requestID: 0, deadline: deadline)
307+
pendingHelloCommand: .init(promise: .nio(promise), requestID: 0, deadline: deadline),
308+
pendingCommands: [
309+
.init(promise: .forget, requestID: 0, deadline: deadline), // CLIENT.SETINFO libname
310+
.init(promise: .forget, requestID: 0, deadline: deadline), // CLIENT.SETINFO libver
311+
]
329312
)
330313
}
331314

Sources/Valkey/Connection/ValkeyConnection.swift

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
132132
}
133133
}
134134
let connection = try await future.get()
135-
try await connection.initialHandshake()
135+
try await connection.waitOnActive()
136136
return connection
137137
}
138138

@@ -144,10 +144,8 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
144144
self.channel.close(mode: .all, promise: nil)
145145
}
146146

147-
func initialHandshake() async throws {
147+
func waitOnActive() async throws {
148148
try await self.channelHandler.waitOnActive().get()
149-
self.executeAndForget(command: CLIENT.SETINFO(attr: .libname(valkeySwiftLibraryName)))
150-
self.executeAndForget(command: CLIENT.SETINFO(attr: .libver(valkeySwiftLibraryVersion)))
151149
}
152150

153151
/// Send RESP command to Valkey connection
@@ -174,10 +172,6 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
174172
}
175173
}
176174

177-
func executeAndForget<Command: ValkeyCommand>(command: Command) {
178-
self.channelHandler.writeAndForget(command: command, requestID: Self.requestIDGenerator.next())
179-
}
180-
181175
/// Pipeline a series of commands to Valkey connection
182176
///
183177
/// Once all the responses for the commands have been received the function returns

Sources/Valkey/ValkeyConnectionFactory.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ package final class ValkeyConnectionFactory: Sendable {
9494
logger: logger
9595
)
9696
}.get()
97-
try await connection.initialHandshake()
97+
try await connection.waitOnActive()
9898
return connection
9999
}
100100
}

Sources/Valkey/Version.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,7 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15+
/// library name reported to server using CLIENT SETINFO
1516
package let valkeySwiftLibraryName = "valkey-swift"
17+
/// library version reported to server using CLIENT SETINFO
1618
package let valkeySwiftLibraryVersion = "0.1.0"

Tests/ValkeyTests/Utils/NIOAsyncTestingChannel+hello.swift

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,15 @@ import Testing
2121
extension NIOAsyncTestingChannel {
2222
func processHello() async throws {
2323
let hello = try await self.waitForOutboundWrite(as: ByteBuffer.self)
24-
#expect(hello == RESPToken(.array([.bulkString("HELLO"), .bulkString("3")])).base)
24+
var expectedBuffer = ByteBuffer()
25+
expectedBuffer.writeImmutableBuffer(RESPToken(.array([.bulkString("HELLO"), .bulkString("3")])).base)
26+
expectedBuffer.writeImmutableBuffer(
27+
RESPToken(.array([.bulkString("CLIENT"), .bulkString("SETINFO"), .bulkString("lib-name"), .bulkString(valkeySwiftLibraryName)])).base
28+
)
29+
expectedBuffer.writeImmutableBuffer(
30+
RESPToken(.array([.bulkString("CLIENT"), .bulkString("SETINFO"), .bulkString("lib-ver"), .bulkString(valkeySwiftLibraryVersion)])).base
31+
)
32+
#expect(hello == expectedBuffer)
2533
try await self.writeInbound(
2634
RESPToken(
2735
.map([
@@ -35,5 +43,7 @@ extension NIOAsyncTestingChannel {
3543
])
3644
).base
3745
)
46+
try await self.writeInbound(RESPToken.ok.base)
47+
try await self.writeInbound(RESPToken.ok.base)
3848
}
3949
}

Tests/ValkeyTests/ValkeyChannelHandlerStateMachineTests.swift

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,12 @@ struct ValkeyChannelHandlerStateMachineTests {
183183
}
184184
expect(
185185
stateMachine.state
186-
== .closing(.init(context: "testGracefulShutdown", pendingCommands: [.init(promise: .nio(promise), requestID: 23, deadline: .now())]))
186+
== .closing(
187+
.init(
188+
context: "testGracefulShutdown",
189+
pendingCommands: [.init(promise: .nio(promise), requestID: 23, deadline: .now())]
190+
)
191+
)
187192
)
188193
switch stateMachine.receivedResponse(token: .ok) {
189194
case .respondAndClose(let command, let error):
@@ -218,7 +223,10 @@ struct ValkeyChannelHandlerStateMachineTests {
218223
expect(
219224
stateMachine.state
220225
== .closing(
221-
.init(context: "testClosedClosingState", pendingCommands: [.init(promise: .nio(promise), requestID: 17, deadline: .now())])
226+
.init(
227+
context: "testClosedClosingState",
228+
pendingCommands: [.init(promise: .nio(promise), requestID: 17, deadline: .now())]
229+
)
222230
)
223231
)
224232
switch stateMachine.setClosed() {
@@ -460,7 +468,7 @@ extension ValkeyChannelHandler.StateMachine<String>.State {
460468
case .connected(let lhs):
461469
switch rhs {
462470
case .connected(let rhs):
463-
return lhs.context == rhs.context && lhs.pendingHelloCommand.requestID == rhs.pendingHelloCommand.requestID
471+
return lhs.context == rhs.context && lhs.pendingCommands.map { $0.requestID } == rhs.pendingCommands.map { $0.requestID }
464472
default:
465473
return false
466474
}
@@ -535,7 +543,8 @@ extension ValkeyChannelHandler.StateMachine {
535543
let promise = EmbeddedEventLoop().makePromise(of: RESPToken.self)
536544
self.setConnected(
537545
context: context,
538-
pendingHelloCommand: .init(promise: .nio(promise), requestID: 0, deadline: .now() + .seconds(30))
546+
pendingHelloCommand: .init(promise: .nio(promise), requestID: 0, deadline: .now() + .seconds(30)),
547+
pendingCommands: []
539548
)
540549
}
541550

Tests/ValkeyTests/ValkeyConnectionTests.swift

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,9 @@ struct ConnectionTests {
4646
let logger = Logger(label: "test")
4747
_ = try await ValkeyConnection.setupChannelAndConnect(channel, configuration: .init(), logger: logger)
4848

49-
let outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self)
50-
#expect(outbound == RESPToken(.command(["HELLO", "3"])).base)
49+
var outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self)
50+
let hello3 = RESPToken(.command(["HELLO", "3"])).base
51+
#expect(outbound.readSlice(length: hello3.readableBytes) == hello3)
5152
}
5253

5354
@Test
@@ -57,8 +58,9 @@ struct ConnectionTests {
5758
let logger = Logger(label: "test")
5859
_ = try await ValkeyConnection.setupChannelAndConnect(channel, configuration: .init(), logger: logger)
5960

60-
let outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self)
61-
#expect(outbound == RESPToken(.command(["HELLO", "3"])).base)
61+
var outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self)
62+
let hello3 = RESPToken(.command(["HELLO", "3"])).base
63+
#expect(outbound.readSlice(length: hello3.readableBytes) == hello3)
6264
await #expect(throws: ValkeyClientError(.commandError, message: "Not supported")) {
6365
try await channel.writeInbound(RESPToken(.bulkError("Not supported")).base)
6466
}
@@ -79,8 +81,9 @@ struct ConnectionTests {
7981
logger: logger
8082
)
8183

82-
let outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self)
83-
#expect(outbound == RESPToken(.command(["HELLO", "3", "AUTH", "john", "smith"])).base)
84+
var outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self)
85+
let hello3 = RESPToken(.command(["HELLO", "3", "AUTH", "john", "smith"])).base
86+
#expect(outbound.readSlice(length: hello3.readableBytes) == hello3)
8487
}
8588

8689
@Test
@@ -95,8 +98,9 @@ struct ConnectionTests {
9598
logger: logger
9699
)
97100

98-
let outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self)
99-
#expect(outbound == RESPToken(.command(["HELLO", "3", "SETNAME", "Testing"])).base)
101+
var outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self)
102+
let hello3 = RESPToken(.command(["HELLO", "3", "SETNAME", "Testing"])).base
103+
#expect(outbound.readSlice(length: hello3.readableBytes) == hello3)
100104
}
101105

102106
@Test

0 commit comments

Comments
 (0)