Skip to content

Commit dd4cfcf

Browse files
craig[bot]arjunmahishi
andcommitted
Merge #151306
151306: pkg/cli (tsdump): add delta calculation for counter metrics r=arjunmahishi a=arjunmahishi Introduce a CumulativeToDeltaProcessor to handle delta calculations for counter metrics in the Datadog writer. This ensures parity between tsdump counters and cockroach cloud counter. Changes: - Added `CumulativeToDeltaProcessor` to `datadogWriter`. - Updated `dump` method to process counter metrics using the calculator. - `resolveMetricType` now needs to create the metric type map for both datadog and `datadog-init` modes - Added unit tests for delta calculation, reset detection, and cross-batch persistence. - The delta processing is gated behind `COCKROACH_TSDUMP_DELTA`. It's enabled when `COCKROACH_TSDUMP_DELTA=1` is set in the env. - Increase the default for `--upload-workers` to 75 insted of 50. This is to compensate for the added overhead due to delta calculation. This is tested with a 10GB tsdump upload to make sure there are no 4xx from datadog Epic: None Jira: CRDB-52597 Release note: None --- **Datadog** _No need for using `monotonic_diff`_ <img width="2618" height="2528" alt="image" src="https://github.com/user-attachments/assets/fb038e6a-7911-4c45-a8b9-88c74f0c5119" /> **Same metric on DB console** <img width="2030" height="844" alt="image" src="https://github.com/user-attachments/assets/0687d139-7a98-4f85-9f82-160a4c5844bf" /> Co-authored-by: Arjun Mahishi <arjun.mahishi@gmail.com>
2 parents b802a86 + 1e65606 commit dd4cfcf

File tree

7 files changed

+353
-23
lines changed

7 files changed

+353
-23
lines changed

pkg/cli/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ go_library(
2727
"declarative_corpus.go",
2828
"declarative_print_rules.go",
2929
"decode.go",
30+
"delta_calculator.go",
3031
"demo.go",
3132
"demo_telemetry.go",
3233
"doctor.go",
@@ -472,6 +473,7 @@ go_test(
472473
"@com_github_cockroachdb_errors//:errors",
473474
"@com_github_cockroachdb_errors//oserror",
474475
"@com_github_cockroachdb_pebble//vfs",
476+
"@com_github_datadog_datadog_api_client_go_v2//api/datadog",
475477
"@com_github_datadog_datadog_api_client_go_v2//api/datadogV2",
476478
"@com_github_google_pprof//profile",
477479
"@com_github_pmezard_go_difflib//difflib",

pkg/cli/debug.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1617,7 +1617,7 @@ func init() {
16171617
f.StringVar(&debugTimeSeriesDumpOpts.userName, "user-name", "", "name of the user to perform datadog upload")
16181618
f.StringVar(&debugTimeSeriesDumpOpts.storeToNodeMapYAMLFile, "store-to-node-map-file", "", "yaml file path which contains the mapping of store ID to node ID for datadog upload.")
16191619
f.BoolVar(&debugTimeSeriesDumpOpts.dryRun, "dry-run", false, "run in dry-run mode without making any actual uploads")
1620-
f.IntVar(&debugTimeSeriesDumpOpts.noOfUploadWorkers, "upload-workers", 50, "number of workers to upload the time series data in parallel")
1620+
f.IntVar(&debugTimeSeriesDumpOpts.noOfUploadWorkers, "upload-workers", 75, "number of workers to upload the time series data in parallel")
16211621
f.BoolVar(&debugTimeSeriesDumpOpts.retryFailedRequests, "retry-failed-requests", false, "retry previously failed requests from file")
16221622

16231623
f = debugSendKVBatchCmd.Flags()

pkg/cli/delta_calculator.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package cli
7+
8+
import (
9+
"hash/fnv"
10+
"sort"
11+
12+
"github.com/DataDog/datadog-api-client-go/v2/api/datadogV2"
13+
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
14+
)
15+
16+
var hashDelimiter = []byte{'|'}
17+
18+
type metricState struct {
19+
previousValue float64
20+
firstSeen bool
21+
}
22+
23+
// CumulativeToDeltaProcessor maintains state for counter metrics to calculate deltas.
24+
// It is concurrency-safe and maintains accurate delta calculations across
25+
// different data chunks by tracking previous values per metric+tags combination.
26+
type CumulativeToDeltaProcessor struct {
27+
mu syncutil.RWMutex
28+
states map[uint64]*metricState // keyed by hash of metric name and tags
29+
}
30+
31+
func NewCumulativeToDeltaProcessor() *CumulativeToDeltaProcessor {
32+
return &CumulativeToDeltaProcessor{
33+
states: make(map[uint64]*metricState),
34+
}
35+
}
36+
37+
// getMetricKey generates a hash-based key for the metric+tags combination.
38+
// This assumes tags are provided in a consistent order (which they are in tsdump_upload.go
39+
// where per-series tags are added first, followed by global tags in deterministic order).
40+
func (dc *CumulativeToDeltaProcessor) getMetricKey(metricName string, tags []string) uint64 {
41+
hash := fnv.New64a()
42+
_, _ = hash.Write([]byte(metricName))
43+
_, _ = hash.Write(hashDelimiter)
44+
for _, tag := range tags {
45+
_, _ = hash.Write([]byte(tag))
46+
_, _ = hash.Write(hashDelimiter)
47+
}
48+
return hash.Sum64()
49+
}
50+
51+
// processCounterMetric converts cumulative counter values to deltas between consecutive points
52+
// by modifying the series in-place. First point keeps original value. Subsequent points
53+
// become delta from previous point. Counter resets (current < previous) are handled by
54+
// using current value as delta.
55+
//
56+
// The isSorted parameter indicates whether the points are already sorted by timestamp,
57+
// avoiding an unnecessary sort operation when the data is already in order.
58+
func (dc *CumulativeToDeltaProcessor) processCounterMetric(
59+
series *datadogV2.MetricSeries, isSorted bool,
60+
) error {
61+
if series.Type == nil || *series.Type != datadogV2.METRICINTAKETYPE_COUNT {
62+
return nil
63+
}
64+
65+
// sort the points by timestamp if not already sorted
66+
if !isSorted {
67+
sort.Slice(series.Points, func(i, j int) bool {
68+
return *series.Points[i].Timestamp < *series.Points[j].Timestamp
69+
})
70+
}
71+
72+
metricKey := dc.getMetricKey(series.Metric, series.Tags)
73+
74+
dc.mu.Lock()
75+
defer dc.mu.Unlock()
76+
77+
state, exists := dc.states[metricKey]
78+
if !exists {
79+
state = &metricState{firstSeen: true}
80+
dc.states[metricKey] = state
81+
}
82+
83+
for i := range series.Points {
84+
point := &series.Points[i]
85+
currentValue := *point.Value
86+
if state.firstSeen {
87+
state.previousValue = currentValue
88+
state.firstSeen = false
89+
continue
90+
}
91+
92+
// calculate delta
93+
*point.Value = currentValue - state.previousValue
94+
if currentValue < state.previousValue {
95+
// if counter reset detected (e.g., process restart)
96+
// use the current value as the delta since last reset
97+
*point.Value = currentValue
98+
}
99+
100+
state.previousValue = currentValue
101+
}
102+
103+
return nil
104+
}

pkg/cli/testdata/tsdump_partial_upload_e2e

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@ cr.node.sql.query.count,2021-01-01T00:00:10Z,1,102.3
66
cr.store.rocksdb.block.cache.usage,2021-01-01T00:00:00Z,2,75.2
77
----
88
----
9-
{"series":[{"interval":10,"metric":"crdb.tsdump.admission.admitted.elastic-cpu","points":[{"timestamp":1748248320,"value":1}],"tags":["node_id:1","cluster_type:SELF_HOSTED","cluster_label:test-cluster","cluster_id:test-cluster-id","zendesk_ticket:zd-test","org_name:test-org","user_name:test-user","upload_id:","upload_timestamp:2024-11-14 00:00:00","upload_year:2024","upload_month:11","upload_day:14"],"type":0}]}
9+
{"series":[{"interval":10,"metric":"crdb.tsdump.admission.admitted.elastic-cpu","points":[{"timestamp":1748248320,"value":1}],"tags":["node_id:1","cluster_type:SELF_HOSTED","cluster_label:test-cluster","cluster_id:test-cluster-id","zendesk_ticket:zd-test","org_name:test-org","user_name:test-user","upload_id:","upload_timestamp:2024-11-14 00:00:00","upload_year:2024","upload_month:11","upload_day:14"],"type":1}]}
1010

11-
{"series":[{"interval":10,"metric":"crdb.tsdump.rocksdb.block.cache.usage","points":[{"timestamp":1609459200,"value":75.2}],"tags":["store:2","cluster_type:SELF_HOSTED","cluster_label:test-cluster","cluster_id:test-cluster-id","zendesk_ticket:zd-test","org_name:test-org","user_name:test-user","upload_id:","upload_timestamp:2024-11-14 00:00:00","upload_year:2024","upload_month:11","upload_day:14"],"type":0}]}
11+
{"series":[{"interval":10,"metric":"crdb.tsdump.rocksdb.block.cache.usage","points":[{"timestamp":1609459200,"value":75.2}],"tags":["store:2","cluster_type:SELF_HOSTED","cluster_label:test-cluster","cluster_id:test-cluster-id","zendesk_ticket:zd-test","org_name:test-org","user_name:test-user","upload_id:","upload_timestamp:2024-11-14 00:00:00","upload_year:2024","upload_month:11","upload_day:14"],"type":3}]}
1212

1313
[{"ddsource":"tsdump_upload","ddtags":"cluster_type:SELF_HOSTED,cluster_label:test-cluster,cluster_id:test-cluster-id,zendesk_ticket:zd-test,org_name:test-org,user_name:test-user,upload_id:,upload_timestamp:2024-11-14 00:00:00,upload_year:2024,upload_month:11,upload_day:14,series_uploaded:4","dry_run":"false","duration":"0","estimated_cost":"0.000186986301369863","hostname":"hostname","message":"tsdump upload completed: uploaded 4 series overall","series_uploaded":"4","service":"tsdump_upload","success":"false"}]
1414

15-
{"metric_series":[{"interval":10,"metric":"crdb.tsdump.sql.query.count","points":[{"timestamp":1609459200,"value":100.5}],"tags":["node_id:1","cluster_type:SELF_HOSTED","cluster_label:test-cluster","cluster_id:test-cluster-id","zendesk_ticket:zd-test","org_name:test-org","user_name:test-user","upload_id:","upload_timestamp:2024-11-14 00:00:00","upload_year:2024","upload_month:11","upload_day:14"],"type":0}],"upload_id":"","timestamp":"2024-11-14T00:00:00Z","error":"409 Conflict"}
16-
{"metric_series":[{"interval":10,"metric":"crdb.tsdump.sql.query.count","points":[{"timestamp":1609459210,"value":102.3}],"tags":["node_id:1","cluster_type:SELF_HOSTED","cluster_label:test-cluster","cluster_id:test-cluster-id","zendesk_ticket:zd-test","org_name:test-org","user_name:test-user","upload_id:","upload_timestamp:2024-11-14 00:00:00","upload_year:2024","upload_month:11","upload_day:14"],"type":0}],"upload_id":"","timestamp":"2024-11-14T00:00:00Z","error":"409 Conflict"}
15+
{"metric_series":[{"interval":10,"metric":"crdb.tsdump.sql.query.count","points":[{"timestamp":1609459200,"value":100.5}],"tags":["node_id:1","cluster_type:SELF_HOSTED","cluster_label:test-cluster","cluster_id:test-cluster-id","zendesk_ticket:zd-test","org_name:test-org","user_name:test-user","upload_id:","upload_timestamp:2024-11-14 00:00:00","upload_year:2024","upload_month:11","upload_day:14"],"type":1}],"upload_id":"","timestamp":"2024-11-14T00:00:00Z","error":"409 Conflict"}
16+
{"metric_series":[{"interval":10,"metric":"crdb.tsdump.sql.query.count","points":[{"timestamp":1609459210,"value":102.3}],"tags":["node_id:1","cluster_type:SELF_HOSTED","cluster_label:test-cluster","cluster_id:test-cluster-id","zendesk_ticket:zd-test","org_name:test-org","user_name:test-user","upload_id:","upload_timestamp:2024-11-14 00:00:00","upload_year:2024","upload_month:11","upload_day:14"],"type":1}],"upload_id":"","timestamp":"2024-11-14T00:00:00Z","error":"409 Conflict"}
1717
----
1818
----
1919

pkg/cli/testdata/tsdump_upload_e2e

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ cr.node.sql.query.count,2021-01-01T00:00:10Z,1,102.3
66
cr.store.rocksdb.block.cache.usage,2021-01-01T00:00:00Z,2,75.2
77
----
88
----
9-
{"series":[{"interval":10,"metric":"crdb.tsdump.admission.admitted.elastic-cpu","points":[{"timestamp":1748248320,"value":1}],"tags":["node_id:1","cluster_type:SELF_HOSTED","cluster_label:\"test-cluster\"","cluster_id:test-cluster-id","zendesk_ticket:zd-test","org_name:test-org","user_name:test-user","upload_id:\"test-cluster\"-20241114000000","upload_timestamp:2024-11-14 00:00:00","upload_year:2024","upload_month:11","upload_day:14"],"type":0},{"interval":10,"metric":"crdb.tsdump.sql.query.count","points":[{"timestamp":1609459200,"value":100.5}],"tags":["node_id:1","cluster_type:SELF_HOSTED","cluster_label:\"test-cluster\"","cluster_id:test-cluster-id","zendesk_ticket:zd-test","org_name:test-org","user_name:test-user","upload_id:\"test-cluster\"-20241114000000","upload_timestamp:2024-11-14 00:00:00","upload_year:2024","upload_month:11","upload_day:14"],"type":0},{"interval":10,"metric":"crdb.tsdump.sql.query.count","points":[{"timestamp":1609459210,"value":102.3}],"tags":["node_id:1","cluster_type:SELF_HOSTED","cluster_label:\"test-cluster\"","cluster_id:test-cluster-id","zendesk_ticket:zd-test","org_name:test-org","user_name:test-user","upload_id:\"test-cluster\"-20241114000000","upload_timestamp:2024-11-14 00:00:00","upload_year:2024","upload_month:11","upload_day:14"],"type":0},{"interval":10,"metric":"crdb.tsdump.rocksdb.block.cache.usage","points":[{"timestamp":1609459200,"value":75.2}],"tags":["store:2","cluster_type:SELF_HOSTED","cluster_label:\"test-cluster\"","cluster_id:test-cluster-id","zendesk_ticket:zd-test","org_name:test-org","user_name:test-user","upload_id:\"test-cluster\"-20241114000000","upload_timestamp:2024-11-14 00:00:00","upload_year:2024","upload_month:11","upload_day:14"],"type":0}]}
9+
{"series":[{"interval":10,"metric":"crdb.tsdump.admission.admitted.elastic-cpu","points":[{"timestamp":1748248320,"value":1}],"tags":["node_id:1","cluster_type:SELF_HOSTED","cluster_label:\"test-cluster\"","cluster_id:test-cluster-id","zendesk_ticket:zd-test","org_name:test-org","user_name:test-user","upload_id:\"test-cluster\"-20241114000000","upload_timestamp:2024-11-14 00:00:00","upload_year:2024","upload_month:11","upload_day:14"],"type":1},{"interval":10,"metric":"crdb.tsdump.sql.query.count","points":[{"timestamp":1609459200,"value":100.5}],"tags":["node_id:1","cluster_type:SELF_HOSTED","cluster_label:\"test-cluster\"","cluster_id:test-cluster-id","zendesk_ticket:zd-test","org_name:test-org","user_name:test-user","upload_id:\"test-cluster\"-20241114000000","upload_timestamp:2024-11-14 00:00:00","upload_year:2024","upload_month:11","upload_day:14"],"type":1},{"interval":10,"metric":"crdb.tsdump.sql.query.count","points":[{"timestamp":1609459210,"value":102.3}],"tags":["node_id:1","cluster_type:SELF_HOSTED","cluster_label:\"test-cluster\"","cluster_id:test-cluster-id","zendesk_ticket:zd-test","org_name:test-org","user_name:test-user","upload_id:\"test-cluster\"-20241114000000","upload_timestamp:2024-11-14 00:00:00","upload_year:2024","upload_month:11","upload_day:14"],"type":1},{"interval":10,"metric":"crdb.tsdump.rocksdb.block.cache.usage","points":[{"timestamp":1609459200,"value":75.2}],"tags":["store:2","cluster_type:SELF_HOSTED","cluster_label:\"test-cluster\"","cluster_id:test-cluster-id","zendesk_ticket:zd-test","org_name:test-org","user_name:test-user","upload_id:\"test-cluster\"-20241114000000","upload_timestamp:2024-11-14 00:00:00","upload_year:2024","upload_month:11","upload_day:14"],"type":3}]}
1010

1111
[{"ddsource":"tsdump_upload","ddtags":"cluster_type:SELF_HOSTED,cluster_label:\"test-cluster\",cluster_id:test-cluster-id,zendesk_ticket:zd-test,org_name:test-org,user_name:test-user,upload_id:\"test-cluster\"-20241114000000,upload_timestamp:2024-11-14 00:00:00,upload_year:2024,upload_month:11,upload_day:14,series_uploaded:4","dry_run":"false","duration":"0","estimated_cost":"0.000186986301369863","hostname":"hostname","message":"tsdump upload completed: uploaded 4 series overall","series_uploaded":"4","service":"tsdump_upload","success":"true"}]
1212
----

pkg/cli/tsdump_upload.go

Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/cockroachdb/cockroach/pkg/base"
2727
"github.com/cockroachdb/cockroach/pkg/roachpb"
2828
"github.com/cockroachdb/cockroach/pkg/ts"
29+
"github.com/cockroachdb/cockroach/pkg/util/envutil"
2930
"github.com/cockroachdb/cockroach/pkg/util/retry"
3031
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
3132
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
@@ -38,6 +39,7 @@ const (
3839
UploadStatusPartialSuccess = "Partial Success"
3940
UploadStatusFailure = "Failed"
4041
nodeKey = "node_id"
42+
processDeltaEnvVar = "COCKROACH_TSDUMP_DELTA"
4143
)
4244

4345
var (
@@ -117,6 +119,8 @@ type datadogWriter struct {
117119
fileMutex syncutil.Mutex
118120
// hasFailedRequestsInUpload tracks if any failed requests were saved during upload
119121
hasFailedRequestsInUpload bool
122+
// cumulativeToDeltaProcessor is used to convert cumulative counter metrics to delta metrics
123+
cumulativeToDeltaProcessor *CumulativeToDeltaProcessor
120124
}
121125

122126
func makeDatadogWriter(
@@ -130,17 +134,12 @@ func makeDatadogWriter(
130134
) (*datadogWriter, error) {
131135
currentTime := getCurrentTime()
132136

133-
var metricTypeMap map[string]string
134-
if init {
135-
// we only need to load the metric types map when the command is
136-
// datadogInit. It's ok to keep it nil otherwise.
137-
var err error
138-
metricTypeMap, err = loadMetricTypesMap(context.Background())
139-
if err != nil {
140-
fmt.Printf(
141-
"error loading metric types map: %v\nThis may lead to some metrics not behaving correctly on Datadog.\n", err)
142-
}
137+
metricTypeMap, err := loadMetricTypesMap(context.Background())
138+
if err != nil {
139+
fmt.Printf(
140+
"error loading metric types map: %v\nThis may lead to some metrics not behaving correctly on Datadog.\n", err)
143141
}
142+
144143
ctx := context.WithValue(
145144
context.Background(),
146145
datadog.ContextAPIKeys,
@@ -184,6 +183,7 @@ func makeDatadogWriter(
184183
metricTypeMap: metricTypeMap,
185184
noOfUploadWorkers: noOfUploadWorkers,
186185
isPartialUploadOfFailedRequests: isPartialUploadOfFailedRequests,
186+
cumulativeToDeltaProcessor: NewCumulativeToDeltaProcessor(),
187187
}, nil
188188
}
189189

@@ -247,6 +247,8 @@ func (d *datadogWriter) dump(kv *roachpb.KeyValue) (*datadogV2.MetricSeries, err
247247
appendTag(series, nodeKey, "0")
248248
}
249249

250+
isSorted := true
251+
var previousTimestamp int64
250252
for i := 0; i < idata.SampleCount(); i++ {
251253
if idata.IsColumnar() {
252254
series.Points[i].Timestamp = datadog.PtrInt64(idata.TimestampForOffset(idata.Offset[i]) / 1_000_000_000)
@@ -256,7 +258,30 @@ func (d *datadogWriter) dump(kv *roachpb.KeyValue) (*datadogV2.MetricSeries, err
256258
series.Points[i].Value = datadog.PtrFloat64(idata.Samples[i].Sum)
257259
}
258260

261+
if !isSorted {
262+
// if we already found a point out of order, we can skip further checks
263+
continue
264+
}
265+
266+
// Check if timestamps are in ascending order. We cannot assume time series
267+
// data is sorted because:
268+
// 1. pkg/ts/tspb/timeseries.go ToInternal() explicitly states "returned slice will not be sorted"
269+
// 2. pkg/storage/pebble_merge.go sortAndDeduplicateRows/Columns shows storage merge
270+
// operations can result in out-of-order data before final sorting
271+
// 3. Data from different storage slabs may be interleaved during tsdump reads
272+
currentTimestamp := *series.Points[i].Timestamp
273+
if i > 0 && previousTimestamp > currentTimestamp {
274+
isSorted = false
275+
}
276+
previousTimestamp = currentTimestamp
277+
}
278+
279+
if envutil.EnvOrDefaultInt(processDeltaEnvVar, 0) == 1 {
280+
if err := d.cumulativeToDeltaProcessor.processCounterMetric(series, isSorted); err != nil {
281+
return nil, err
282+
}
259283
}
284+
260285
return series, nil
261286
}
262287

@@ -415,13 +440,6 @@ func (d *datadogWriter) retryFailedRequests(fileName string) error {
415440
}
416441

417442
func (d *datadogWriter) resolveMetricType(metricName string) *datadogV2.MetricIntakeType {
418-
if !d.init {
419-
// in this is not datadogInit command, we don't need to resolve the metric
420-
// type. We can just return DatadogSeriesTypeUnknown. Datadog only expects
421-
// us to send the type information only once.
422-
return datadogV2.METRICINTAKETYPE_UNSPECIFIED.Ptr()
423-
}
424-
425443
typeLookupKey := strings.TrimPrefix(metricName, "cr.store.")
426444
typeLookupKey = strings.TrimPrefix(typeLookupKey, "cr.node.")
427445
metricType := d.metricTypeMap[typeLookupKey]

0 commit comments

Comments
 (0)