Skip to content

Commit f06dc3a

Browse files
slashmofabianfett
authored andcommitted
Add Distributed Tracing support behind new trait
Signed-off-by: Moritz Lang <16192401+slashmo@users.noreply.github.com>
1 parent 37ea80d commit f06dc3a

File tree

6 files changed

+520
-5
lines changed

6 files changed

+520
-5
lines changed

Package.swift

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@ let package = Package(
1919
],
2020
traits: [
2121
.trait(name: "ServiceLifecycleSupport"),
22-
.default(enabledTraits: ["ServiceLifecycleSupport"]),
22+
.trait(name: "DistributedTracingSupport"),
23+
.default(enabledTraits: ["ServiceLifecycleSupport", "DistributedTracingSupport"]),
2324
],
2425
dependencies: [
2526
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.0"),
2627
.package(url: "https://github.com/apple/swift-collections.git", from: "1.1.4"),
2728
.package(url: "https://github.com/apple/swift-log.git", from: "1.6.3"),
29+
.package(url: "https://github.com/apple/swift-distributed-tracing.git", from: "1.0.0"),
2830
.package(url: "https://github.com/apple/swift-nio.git", from: "2.81.0"),
2931
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.29.0"),
3032
.package(url: "https://github.com/apple/swift-nio-transport-services.git", from: "1.23.0"),
@@ -37,6 +39,7 @@ let package = Package(
3739
.byName(name: "_ValkeyConnectionPool"),
3840
.product(name: "DequeModule", package: "swift-collections"),
3941
.product(name: "Logging", package: "swift-log"),
42+
.product(name: "Tracing", package: "swift-distributed-tracing", condition: .when(traits: ["DistributedTracingSupport"])),
4043
.product(name: "NIOCore", package: "swift-nio"),
4144
.product(name: "NIOPosix", package: "swift-nio"),
4245
.product(name: "NIOSSL", package: "swift-nio-ssl"),

Sources/Valkey/Connection/ValkeyConnection.swift

Lines changed: 89 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
4444
@usableFromInline
4545
let channelHandler: ValkeyChannelHandler
4646
let configuration: ValkeyConnectionConfiguration
47+
@usableFromInline
48+
let address: (hostOrSocketPath: String, port: Int?)?
4749
let isClosed: Atomic<Bool>
4850

4951
/// Initialize connection
@@ -52,6 +54,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
5254
connectionID: ID,
5355
channelHandler: ValkeyChannelHandler,
5456
configuration: ValkeyConnectionConfiguration,
57+
address: ValkeyServerAddress?,
5558
logger: Logger
5659
) {
5760
self.unownedExecutor = channel.eventLoop.executor.asUnownedSerialExecutor()
@@ -60,6 +63,14 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
6063
self.configuration = configuration
6164
self.id = connectionID
6265
self.logger = logger
66+
switch address?.value {
67+
case let .hostname(host, port):
68+
self.address = (host, port)
69+
case let .unixDomainSocket(path):
70+
self.address = (path, nil)
71+
case nil:
72+
self.address = nil
73+
}
6374
self.isClosed = .init(false)
6475
}
6576

@@ -163,10 +174,19 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
163174

164175
@inlinable
165176
func _execute<Command: ValkeyCommand>(command: Command) async throws -> RESPToken {
166-
try await withSpan(Command.name, ofKind: .client) { span in
167-
span.attributes["db.system.name"] = "valkey"
177+
#if DistributedTracingSupport
178+
let span = startSpan(Command.name, ofKind: .client)
179+
defer { span.end() }
180+
181+
span.updateAttributes { attributes in
182+
attributes["db.operation.name"] = Command.name
183+
applyCommonAttributes(to: &attributes)
184+
}
185+
#endif
186+
187+
let requestID = Self.requestIDGenerator.next()
168188

169-
let requestID = Self.requestIDGenerator.next()
189+
do {
170190
return try await withTaskCancellationHandler {
171191
if Task.isCancelled {
172192
throw ValkeyClientError(.cancelled)
@@ -177,6 +197,25 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
177197
} onCancel: {
178198
self.cancel(requestID: requestID)
179199
}
200+
} catch let error as ValkeyClientError {
201+
#if DistributedTracingSupport
202+
span.recordError(error)
203+
if let message = error.message {
204+
var prefixEndIndex = message.startIndex
205+
while prefixEndIndex < message.endIndex, message[prefixEndIndex] != " " {
206+
message.formIndex(after: &prefixEndIndex)
207+
}
208+
let prefix = message[message.startIndex ..< prefixEndIndex]
209+
span.attributes["db.response.status_code"] = "\(prefix)"
210+
span.setStatus(SpanStatus(code: .error))
211+
}
212+
#endif
213+
throw error
214+
} catch {
215+
#if DistributedTracingSupport
216+
span.recordError(error)
217+
#endif
218+
throw error
180219
}
181220
}
182221

@@ -191,8 +230,42 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
191230
public func execute<each Command: ValkeyCommand>(
192231
_ commands: repeat each Command
193232
) async -> sending (repeat Result<(each Command).Response, Error>) {
233+
#if DistributedTracingSupport
234+
let span = startSpan("MULTI", ofKind: .client)
235+
defer { span.end() }
236+
237+
// We want to suffix the `db.operation.name` if all pipelined commands are of the same type.
238+
var commandName: String?
239+
var operationNameSuffix: String?
240+
var commandCount = 0
241+
242+
for command in repeat each commands {
243+
commandCount += 1
244+
if commandName == nil {
245+
commandName = Swift.type(of: command).name
246+
operationNameSuffix = commandName
247+
} else if commandName != Swift.type(of: command).name {
248+
// We should only add a suffix if all commands in the transaction are the same.
249+
operationNameSuffix = nil
250+
}
251+
}
252+
let operationName = operationNameSuffix.map { "MULTI \($0)" } ?? "MULTI"
253+
254+
span.updateAttributes { attributes in
255+
attributes["db.operation.name"] = operationName
256+
attributes["db.operation.batch.size"] = commandCount > 1 ? commandCount : nil
257+
applyCommonAttributes(to: &attributes)
258+
}
259+
#endif
260+
194261
func convert<Response: RESPTokenDecodable>(_ result: Result<RESPToken, Error>, to: Response.Type) -> Result<Response, Error> {
195-
result.flatMap {
262+
#if DistributedTracingSupport
263+
if case .failure(let error) = result {
264+
span.recordError(error)
265+
}
266+
#endif
267+
268+
return result.flatMap {
196269
do {
197270
return try .success(Response(fromRESP: $0))
198271
} catch {
@@ -227,6 +300,16 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
227300
}
228301
}
229302

303+
@usableFromInline
304+
func applyCommonAttributes(to attributes: inout SpanAttributes) {
305+
// TODO: Should this be redis as recommended by OTel semconv or valkey as seen in valkey-go?
306+
attributes["db.system.name"] = "valkey"
307+
attributes["network.peer.address"] = channel.remoteAddress?.ipAddress
308+
attributes["network.peer.port"] = channel.remoteAddress?.port
309+
attributes["server.address"] = address?.hostOrSocketPath
310+
attributes["server.port"] = address?.port == 6379 ? nil : address?.port
311+
}
312+
230313
@usableFromInline
231314
nonisolated func cancel(requestID: Int) {
232315
self.channel.eventLoop.execute {
@@ -292,6 +375,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
292375
connectionID: connectionID,
293376
channelHandler: handler,
294377
configuration: configuration,
378+
address: address,
295379
logger: logger
296380
)
297381
}
@@ -327,6 +411,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
327411
connectionID: 0,
328412
channelHandler: handler,
329413
configuration: configuration,
414+
address: .hostname("127.0.0.1", port: 6379),
330415
logger: logger
331416
)
332417
return channel.connect(to: try SocketAddress(ipAddress: "127.0.0.1", port: 6379)).map {

Sources/Valkey/ValkeyConnectionFactory.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ package final class ValkeyConnectionFactory: Sendable {
9191
connectionID: connectionID,
9292
channelHandler: channelHandler,
9393
configuration: connectionConfig,
94+
address: nil,
9495
logger: logger
9596
)
9697
}.get()

Tests/IntegrationTests/ValkeyTests.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ struct GeneratedCommands {
7474
func testValkeyCommand() async throws {
7575
struct GET: ValkeyCommand {
7676
typealias Response = String?
77+
static let name = "GET"
7778

7879
static let name = "GET"
7980

0 commit comments

Comments
 (0)