@@ -44,6 +44,8 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
4444 @usableFromInline
4545 let channelHandler : ValkeyChannelHandler
4646 let configuration : ValkeyConnectionConfiguration
47+ @usableFromInline
48+ let address : ( hostOrSocketPath: String , port: Int ? ) ?
4749 let isClosed : Atomic < Bool >
4850
4951 /// Initialize connection
@@ -52,6 +54,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
5254 connectionID: ID ,
5355 channelHandler: ValkeyChannelHandler ,
5456 configuration: ValkeyConnectionConfiguration ,
57+ address: ValkeyServerAddress ? ,
5558 logger: Logger
5659 ) {
5760 self . unownedExecutor = channel. eventLoop. executor. asUnownedSerialExecutor ( )
@@ -60,6 +63,14 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
6063 self . configuration = configuration
6164 self . id = connectionID
6265 self . logger = logger
66+ switch address? . value {
67+ case let . hostname( host, port) :
68+ self . address = ( host, port)
69+ case let . unixDomainSocket( path) :
70+ self . address = ( path, nil )
71+ case nil :
72+ self . address = nil
73+ }
6374 self . isClosed = . init( false )
6475 }
6576
@@ -165,10 +176,19 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
165176
166177 @inlinable
167178 func _execute< Command: ValkeyCommand > ( command: Command ) async throws -> RESPToken {
168- try await withSpan ( Command . name, ofKind: . client) { span in
169- span. attributes [ " db.system.name " ] = " valkey "
179+ #if DistributedTracingSupport
180+ let span = startSpan ( Command . name, ofKind: . client)
181+ defer { span. end ( ) }
182+
183+ span. updateAttributes { attributes in
184+ attributes [ " db.operation.name " ] = Command . name
185+ applyCommonAttributes ( to: & attributes)
186+ }
187+ #endif
188+
189+ let requestID = Self . requestIDGenerator. next ( )
170190
171- let requestID = Self . requestIDGenerator . next ( )
191+ do {
172192 return try await withTaskCancellationHandler {
173193 if Task . isCancelled {
174194 throw ValkeyClientError ( . cancelled)
@@ -179,6 +199,25 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
179199 } onCancel: {
180200 self . cancel ( requestID: requestID)
181201 }
202+ } catch let error as ValkeyClientError {
203+ #if DistributedTracingSupport
204+ span. recordError ( error)
205+ if let message = error. message {
206+ var prefixEndIndex = message. startIndex
207+ while prefixEndIndex < message. endIndex, message [ prefixEndIndex] != " " {
208+ message. formIndex ( after: & prefixEndIndex)
209+ }
210+ let prefix = message [ message. startIndex ..< prefixEndIndex]
211+ span. attributes [ " db.response.status_code " ] = " \( prefix) "
212+ span. setStatus ( SpanStatus ( code: . error) )
213+ }
214+ #endif
215+ throw error
216+ } catch {
217+ #if DistributedTracingSupport
218+ span. recordError ( error)
219+ #endif
220+ throw error
182221 }
183222 }
184223
@@ -197,8 +236,42 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
197236 public func execute< each Command : ValkeyCommand > (
198237 _ commands: repeat each Command
199238 ) async -> sending ( repeat Result < ( each Command ) . Response, Error > ) {
239+ #if DistributedTracingSupport
240+ let span = startSpan ( " MULTI " , ofKind: . client)
241+ defer { span. end ( ) }
242+
243+ // We want to suffix the `db.operation.name` if all pipelined commands are of the same type.
244+ var commandName : String ?
245+ var operationNameSuffix : String ?
246+ var commandCount = 0
247+
248+ for command in repeat each commands {
249+ commandCount += 1
250+ if commandName == nil {
251+ commandName = Swift . type ( of: command) . name
252+ operationNameSuffix = commandName
253+ } else if commandName != Swift . type ( of: command) . name {
254+ // We should only add a suffix if all commands in the transaction are the same.
255+ operationNameSuffix = nil
256+ }
257+ }
258+ let operationName = operationNameSuffix. map { " MULTI \( $0) " } ?? " MULTI "
259+
260+ span. updateAttributes { attributes in
261+ attributes [ " db.operation.name " ] = operationName
262+ attributes [ " db.operation.batch.size " ] = commandCount > 1 ? commandCount : nil
263+ applyCommonAttributes ( to: & attributes)
264+ }
265+ #endif
266+
200267 func convert< Response: RESPTokenDecodable > ( _ result: Result < RESPToken , Error > , to: Response . Type ) -> Result < Response , Error > {
201- result. flatMap {
268+ #if DistributedTracingSupport
269+ if case . failure( let error) = result {
270+ span. recordError ( error)
271+ }
272+ #endif
273+
274+ return result. flatMap {
202275 do {
203276 return try . success( Response ( fromRESP: $0) )
204277 } catch {
@@ -233,6 +306,16 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
233306 }
234307 }
235308
309+ @usableFromInline
310+ func applyCommonAttributes( to attributes: inout SpanAttributes ) {
311+ // TODO: Should this be redis as recommended by OTel semconv or valkey as seen in valkey-go?
312+ attributes [ " db.system.name " ] = " valkey "
313+ attributes [ " network.peer.address " ] = channel. remoteAddress? . ipAddress
314+ attributes [ " network.peer.port " ] = channel. remoteAddress? . port
315+ attributes [ " server.address " ] = address? . hostOrSocketPath
316+ attributes [ " server.port " ] = address? . port == 6379 ? nil : address? . port
317+ }
318+
236319 @usableFromInline
237320 nonisolated func cancel( requestID: Int ) {
238321 self . channel. eventLoop. execute {
@@ -298,6 +381,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
298381 connectionID: connectionID,
299382 channelHandler: handler,
300383 configuration: configuration,
384+ address: address,
301385 logger: logger
302386 )
303387 }
@@ -333,6 +417,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
333417 connectionID: 0 ,
334418 channelHandler: handler,
335419 configuration: configuration,
420+ address: . hostname( " 127.0.0.1 " , port: 6379 ) ,
336421 logger: logger
337422 )
338423 return channel. connect ( to: try SocketAddress ( ipAddress: " 127.0.0.1 " , port: 6379 ) ) . map {
0 commit comments