Skip to content

Commit baff60c

Browse files
fabianfettslashmo
authored andcommitted
Improve tracing support
Signed-off-by: Fabian Fett <fabianfett@apple.com>
1 parent a63dc6b commit baff60c

File tree

5 files changed

+109
-51
lines changed

5 files changed

+109
-51
lines changed

Benchmarks/ValkeyBenchmarks/ValkeyConnectionBenchmark.swift

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,18 @@ import Logging
1212
import NIOCore
1313
import NIOPosix
1414
import Synchronization
15+
#if DistributedTracingSupport
16+
import Tracing
17+
#endif
1518
import Valkey
1619

1720
@available(valkeySwift 1.0, *)
1821
func connectionBenchmarks() {
1922
makeConnectionCreateAndDropBenchmark()
2023
makeConnectionGETBenchmark()
24+
#if DistributedTracingSupport
25+
makeConnectionGETNoOpTracerBenchmark()
26+
#endif
2127
makeConnectionPipelineBenchmark()
2228
}
2329

@@ -58,9 +64,45 @@ func makeConnectionGETBenchmark() -> Benchmark? {
5864
return Benchmark("Connection: GET benchmark", configuration: .init(metrics: defaultMetrics, scalingFactor: .kilo)) { benchmark in
5965
let port = serverMutex.withLock { $0 }!.localAddress!.port!
6066
let logger = Logger(label: "test")
67+
#if DistributedTracingSupport
68+
// explicitly set tracer to nil, if trait is enabled
69+
var configuration = ValkeyConnectionConfiguration()
70+
configuration.tracing.tracer = nil
71+
#endif
6172
try await ValkeyConnection.withConnection(
6273
address: .hostname("127.0.0.1", port: port),
63-
configuration: .init(),
74+
configuration: configuration,
75+
logger: logger
76+
) { connection in
77+
benchmark.startMeasurement()
78+
for _ in benchmark.scaledIterations {
79+
let foo = try await connection.get("foo")
80+
precondition(foo.map { String(buffer: $0) } == "Bar")
81+
}
82+
benchmark.stopMeasurement()
83+
}
84+
} setup: {
85+
let server = try await makeLocalServer()
86+
serverMutex.withLock { $0 = server }
87+
} teardown: {
88+
try await serverMutex.withLock { $0 }?.close().get()
89+
}
90+
}
91+
92+
#if DistributedTracingSupport
93+
@available(valkeySwift 1.0, *)
94+
@discardableResult
95+
func makeConnectionGETNoOpTracerBenchmark() -> Benchmark? {
96+
let serverMutex = Mutex<(any Channel)?>(nil)
97+
98+
return Benchmark("Connection: GET benchmark – NoOpTracer", configuration: .init(metrics: defaultMetrics, scalingFactor: .kilo)) { benchmark in
99+
let port = serverMutex.withLock { $0 }!.localAddress!.port!
100+
let logger = Logger(label: "test")
101+
var configuration = ValkeyConnectionConfiguration()
102+
configuration.tracing.tracer = NoOpTracer()
103+
try await ValkeyConnection.withConnection(
104+
address: .hostname("127.0.0.1", port: port),
105+
configuration: configuration,
64106
logger: logger
65107
) { connection in
66108
benchmark.startMeasurement()
@@ -77,6 +119,7 @@ func makeConnectionGETBenchmark() -> Benchmark? {
77119
try await serverMutex.withLock { $0 }?.close().get()
78120
}
79121
}
122+
#endif
80123

81124
@available(valkeySwift 1.0, *)
82125
@discardableResult

Sources/Valkey/Connection/ValkeyConnection.swift

Lines changed: 22 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
3333
public let id: ID
3434
/// Logger used by Server
3535
let logger: Logger
36+
#if DistributedTracingSupport
37+
@usableFromInline
38+
let tracer: (any Tracer)?
39+
#endif
3640
@usableFromInline
3741
let channel: any Channel
3842
@usableFromInline
@@ -57,6 +61,9 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
5761
self.configuration = configuration
5862
self.id = connectionID
5963
self.logger = logger
64+
#if DistributedTracingSupport
65+
self.tracer = configuration.tracing.tracer
66+
#endif
6067
switch address?.value {
6168
case let .hostname(host, port):
6269
self.address = (host, port)
@@ -169,12 +176,11 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
169176
@inlinable
170177
func _execute<Command: ValkeyCommand>(command: Command) async throws -> RESPToken {
171178
#if DistributedTracingSupport
172-
let span = startSpan(Command.name, ofKind: .client)
173-
defer { span.end() }
179+
let span = self.tracer?.startSpan(Command.name, ofKind: .client)
180+
defer { span?.end() }
174181

175-
span.updateAttributes { attributes in
176-
attributes["db.operation.name"] = Command.name
177-
applyCommonAttributes(to: &attributes)
182+
span?.updateAttributes { attributes in
183+
self.applyCommonAttributes(to: &attributes, commandName: Command.name)
178184
}
179185
#endif
180186

@@ -193,21 +199,21 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
193199
}
194200
} catch let error as ValkeyClientError {
195201
#if DistributedTracingSupport
196-
span.recordError(error)
202+
span?.recordError(error)
197203
if let message = error.message {
198204
var prefixEndIndex = message.startIndex
199205
while prefixEndIndex < message.endIndex, message[prefixEndIndex] != " " {
200206
message.formIndex(after: &prefixEndIndex)
201207
}
202208
let prefix = message[message.startIndex..<prefixEndIndex]
203-
span.attributes["db.response.status_code"] = "\(prefix)"
204-
span.setStatus(SpanStatus(code: .error))
209+
span?.attributes["db.response.status_code"] = "\(prefix)"
210+
span?.setStatus(SpanStatus(code: .error))
205211
}
206212
#endif
207213
throw error
208214
} catch {
209215
#if DistributedTracingSupport
210-
span.recordError(error)
216+
span?.recordError(error)
211217
#endif
212218
throw error
213219
}
@@ -224,41 +230,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
224230
public func execute<each Command: ValkeyCommand>(
225231
_ commands: repeat each Command
226232
) async -> sending (repeat Result<(each Command).Response, Error>) {
227-
#if DistributedTracingSupport
228-
let span = startSpan("MULTI", ofKind: .client)
229-
defer { span.end() }
230-
231-
// We want to suffix the `db.operation.name` if all pipelined commands are of the same type.
232-
var commandName: String?
233-
var operationNameSuffix: String?
234-
var commandCount = 0
235-
236-
for command in repeat each commands {
237-
commandCount += 1
238-
if commandName == nil {
239-
commandName = Swift.type(of: command).name
240-
operationNameSuffix = commandName
241-
} else if commandName != Swift.type(of: command).name {
242-
// We should only add a suffix if all commands in the transaction are the same.
243-
operationNameSuffix = nil
244-
}
245-
}
246-
let operationName = operationNameSuffix.map { "MULTI \($0)" } ?? "MULTI"
247-
248-
span.updateAttributes { attributes in
249-
attributes["db.operation.name"] = operationName
250-
attributes["db.operation.batch.size"] = commandCount > 1 ? commandCount : nil
251-
applyCommonAttributes(to: &attributes)
252-
}
253-
#endif
254-
255233
func convert<Response: RESPTokenDecodable>(_ result: Result<RESPToken, Error>, to: Response.Type) -> Result<Response, Error> {
256-
#if DistributedTracingSupport
257-
if case .failure(let error) = result {
258-
span.recordError(error)
259-
}
260-
#endif
261-
262234
return result.flatMap {
263235
do {
264236
return try .success(Response(fromRESP: $0))
@@ -295,12 +267,13 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
295267
}
296268

297269
@usableFromInline
298-
func applyCommonAttributes(to attributes: inout SpanAttributes) {
299-
attributes["db.system.name"] = "valkey"
300-
attributes["network.peer.address"] = channel.remoteAddress?.ipAddress
301-
attributes["network.peer.port"] = channel.remoteAddress?.port
302-
attributes["server.address"] = address?.hostOrSocketPath
303-
attributes["server.port"] = address?.port == 6379 ? nil : address?.port
270+
func applyCommonAttributes(to attributes: inout SpanAttributes, commandName: String) {
271+
attributes[self.configuration.tracing.attributeNames.databaseOperationName] = commandName
272+
attributes[self.configuration.tracing.attributeNames.databaseSystemName] = self.configuration.tracing.attributeValue.databaseSystem
273+
attributes[self.configuration.tracing.attributeNames.networkPeerAddress] = channel.remoteAddress?.ipAddress
274+
attributes[self.configuration.tracing.attributeNames.networkPeerPort] = channel.remoteAddress?.port
275+
attributes[self.configuration.tracing.attributeNames.serverAddress] = address?.hostOrSocketPath
276+
attributes[self.configuration.tracing.attributeNames.serverPort] = address?.port == 6379 ? nil : address?.port
304277
}
305278

306279
@usableFromInline

Sources/Valkey/Connection/ValkeyConnectionConfiguration.swift

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
//
88

99
import NIOSSL
10+
#if DistributedTracingSupport
11+
import Tracing
12+
#endif
1013

1114
/// A configuration object that defines how to connect to a Valkey server.
1215
///
@@ -112,6 +115,10 @@ public struct ValkeyConnectionConfiguration: Sendable {
112115
/// Default value is `nil` (no client name is set).
113116
public var clientName: String?
114117

118+
#if DistributedTracingSupport
119+
public var tracing: ValkeyTracingConfiguration = .init()
120+
#endif
121+
115122
/// Creates a new Valkey connection configuration.
116123
///
117124
/// Use this initializer to create a configuration object that can be used to establish
@@ -137,3 +144,26 @@ public struct ValkeyConnectionConfiguration: Sendable {
137144
self.clientName = clientName
138145
}
139146
}
147+
148+
#if DistributedTracingSupport
149+
public struct ValkeyTracingConfiguration: Sendable {
150+
151+
public var tracer: (any Tracer)? = InstrumentationSystem.tracer
152+
153+
public var attributeNames: AttributeNames = .init()
154+
public var attributeValue: AttributeValues = .init()
155+
156+
public struct AttributeNames: Sendable {
157+
public var databaseOperationName: String = "db.operation.name"
158+
public var databaseSystemName: String = "db.system.name"
159+
public var networkPeerAddress: String = "network.peer.address"
160+
public var networkPeerPort: String = "network.peer.port"
161+
public var serverAddress: String = "server.address"
162+
public var serverPort: String = "server.port"
163+
}
164+
165+
public struct AttributeValues: Sendable {
166+
public var databaseSystem: String = "valkey"
167+
}
168+
}
169+
#endif

Sources/Valkey/ValkeyClientConfiguration.swift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,10 @@ public struct ValkeyClientConfiguration: Sendable {
118118
/// The TLS to use for the Valkey connection.
119119
public var tls: TLS
120120

121+
#if DistributedTracingSupport
122+
public var tracing: ValkeyTracingConfiguration = .init()
123+
#endif
124+
121125
/// Creates a Valkey client connection configuration.
122126
///
123127
/// - Parameters:

Sources/Valkey/ValkeyConnectionFactory.swift

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ package final class ValkeyConnectionFactory: Sendable {
103103
try await .enable(self.cache!.getSSLContext(), tlsServerName: clientName)
104104
}
105105

106-
return ValkeyConnectionConfiguration(
106+
let newConfig = ValkeyConnectionConfiguration(
107107
authentication: self.configuration.authentication.flatMap {
108108
.init(username: $0.username, password: $0.password)
109109
},
@@ -112,5 +112,13 @@ package final class ValkeyConnectionFactory: Sendable {
112112
tls: tls,
113113
clientName: nil
114114
)
115+
116+
#if DistributedTracingSupport
117+
var mConfig = newConfig
118+
mConfig.tracing = self.configuration.tracing
119+
return mConfig
120+
#else
121+
return newConfig
122+
#endif
115123
}
116124
}

0 commit comments

Comments
 (0)