Skip to content

Commit f3ed057

Browse files
committed
sql/bulksst: coordinate distributed SST metadata merge with CombineFileInfo
Implements CombineFileInfo(), a coordinator utility that aggregates SST metadata from distributed workers and determines merge task spans based on sampled keys. The function combines SST file metadata from multiple workers and uses their row samples to split schema spans into merge task spans. This will be used by the new distributed merge pipeline. Resolves: #156662 Epic: CRDB-48845 Release note: none Co-authored by: @jeffswenson
1 parent 5b5000c commit f3ed057

File tree

4 files changed

+551
-0
lines changed

4 files changed

+551
-0
lines changed

pkg/sql/bulksst/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ go_proto_library(
2222
go_library(
2323
name = "bulksst",
2424
srcs = [
25+
"combine_file_info.go",
2526
"sst_file_allocator.go",
2627
"sst_writer.go",
2728
],
@@ -35,11 +36,13 @@ go_library(
3536
"//pkg/roachpb",
3637
"//pkg/settings",
3738
"//pkg/settings/cluster",
39+
"//pkg/sql/execinfrapb",
3840
"//pkg/storage",
3941
"//pkg/util/hlc",
4042
"//pkg/util/log",
4143
"//pkg/util/randutil",
4244
"//pkg/util/timeutil",
45+
"@com_github_cockroachdb_errors//:errors",
4346
"@com_github_cockroachdb_pebble//objstorage",
4447
"@com_github_cockroachdb_pebble//objstorage/objstorageprovider",
4548
],
@@ -48,6 +51,7 @@ go_library(
4851
go_test(
4952
name = "bulksst_test",
5053
srcs = [
54+
"combine_file_info_test.go",
5155
"main_test.go",
5256
"sst_file_allocator_test.go",
5357
"sst_writer_test.go",
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package bulksst
7+
8+
import (
9+
"bytes"
10+
"slices"
11+
12+
"github.com/cockroachdb/cockroach/pkg/roachpb"
13+
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
14+
"github.com/cockroachdb/errors"
15+
)
16+
17+
// CombineFileInfo combines SST file metadata and determines merge task spans based on key samples.
18+
func CombineFileInfo(
19+
files []SSTFiles, schemaSpans []roachpb.Span,
20+
) ([]execinfrapb.BulkMergeSpec_SST, []roachpb.Span, error) {
21+
// Validate that schema spans are properly ordered. This is a critical
22+
// precondition for the algorithm to work correctly.
23+
if err := validateSchemaSpansOrdered(schemaSpans); err != nil {
24+
return nil, nil, err
25+
}
26+
27+
result := make([]execinfrapb.BulkMergeSpec_SST, 0)
28+
samples := make([]roachpb.Key, 0)
29+
for _, file := range files {
30+
for _, sst := range file.SST {
31+
result = append(result, execinfrapb.BulkMergeSpec_SST{
32+
StartKey: string(sst.StartKey),
33+
EndKey: string(sst.EndKey),
34+
URI: sst.URI,
35+
})
36+
}
37+
for _, sample := range file.RowSamples {
38+
samples = append(samples, roachpb.Key(sample))
39+
}
40+
}
41+
// Sort samples to ensure merge spans are non-overlapping and contiguous.
42+
// Samples are collected from multiple workers and arrive in arbitrary order.
43+
// getMergeSpans uses these samples as split points to create merge task spans.
44+
// If samples are unsorted (e.g., ["k", "d", "a"]), getMergeSpans would create
45+
// overlapping spans that cause the same keys to be processed multiple times,
46+
// resulting in duplicate data in the output SSTs.
47+
slices.SortFunc(samples, func(i, j roachpb.Key) int {
48+
return bytes.Compare(i, j)
49+
})
50+
51+
mergeSpans, err := getMergeSpans(schemaSpans, samples)
52+
if err != nil {
53+
return nil, nil, err
54+
}
55+
56+
return result, mergeSpans, nil
57+
}
58+
59+
// getMergeSpans determines which spans should be used as merge tasks. The
60+
// output spans must fully cover the input spans. The samples are used to
61+
// determine where schema spans should be split.
62+
//
63+
// Precondition: schemaSpans must be sorted by start key and non-overlapping.
64+
// This precondition is validated in validateSchemaSpansOrdered.
65+
func getMergeSpans(schemaSpans []roachpb.Span, sortedSample []roachpb.Key) ([]roachpb.Span, error) {
66+
result := make([]roachpb.Span, 0, len(schemaSpans)+len(sortedSample))
67+
68+
for _, span := range schemaSpans {
69+
samples, consumed := getCoveredSamples(span, sortedSample)
70+
71+
// Validate: if we consumed more samples than we returned, it means some
72+
// samples were outside this span (either before it or in a gap). Since
73+
// schema spans are processed in order, any sample before this span indicates
74+
// a sample not covered by the schema spans.
75+
if consumed > len(samples) {
76+
// Find the first skipped sample for a clear error message
77+
for i := 0; i < consumed; i++ {
78+
if i >= len(samples) || !sortedSample[i].Equal(samples[i]) {
79+
return nil, errors.AssertionFailedf(
80+
"sample %q is before schema span [%q, %q); this indicates samples were collected for keys outside schema spans",
81+
sortedSample[i], span.Key, span.EndKey)
82+
}
83+
}
84+
}
85+
86+
sortedSample = sortedSample[consumed:]
87+
88+
startKey := span.Key
89+
for _, sample := range samples {
90+
// Skip samples that would create invalid (zero-length) spans.
91+
// This handles duplicates and samples at span boundaries.
92+
if bytes.Compare(sample, startKey) <= 0 {
93+
continue
94+
}
95+
result = append(result, roachpb.Span{
96+
Key: startKey,
97+
EndKey: sample,
98+
})
99+
startKey = sample
100+
}
101+
result = append(result, roachpb.Span{
102+
Key: startKey,
103+
EndKey: span.EndKey,
104+
})
105+
}
106+
107+
// Validate that all samples were contained within schema spans. Any remaining
108+
// samples indicate they were collected after the last span.
109+
if len(sortedSample) > 0 {
110+
return nil, errors.AssertionFailedf(
111+
"samples outside schema spans: %d samples remain after processing all spans, first uncovered sample: %q",
112+
len(sortedSample), sortedSample[0])
113+
}
114+
115+
return result, nil
116+
}
117+
118+
// getCoveredSamples returns the samples within the given span and the total
119+
// number of samples consumed (including any that were before the span start).
120+
func getCoveredSamples(schemaSpan roachpb.Span, sortedSamples []roachpb.Key) ([]roachpb.Key, int) {
121+
// Count how many samples are before the span start.
122+
// Since sortedSamples are sorted and schema spans are processed in order,
123+
// samples before this span's start either:
124+
// 1. Should have been covered by a previous span, or
125+
// 2. Fall in a gap between spans.
126+
startIdx := 0
127+
for startIdx < len(sortedSamples) && bytes.Compare(sortedSamples[startIdx], schemaSpan.Key) < 0 {
128+
startIdx++
129+
}
130+
131+
// Find samples within this span: [schemaSpan.Key, schemaSpan.EndKey)
132+
endIdx := startIdx
133+
for endIdx < len(sortedSamples) && bytes.Compare(sortedSamples[endIdx], schemaSpan.EndKey) < 0 {
134+
endIdx++
135+
}
136+
137+
return sortedSamples[startIdx:endIdx], endIdx
138+
}
139+
140+
// validateSchemaSpansOrdered checks that schema spans are sorted by start key
141+
// and non-overlapping. This is a precondition for getMergeSpans to work correctly.
142+
func validateSchemaSpansOrdered(schemaSpans []roachpb.Span) error {
143+
for i := 1; i < len(schemaSpans); i++ {
144+
if bytes.Compare(schemaSpans[i-1].Key, schemaSpans[i].Key) >= 0 {
145+
return errors.AssertionFailedf(
146+
"schema spans not ordered: span %d [%q, %q) >= span %d [%q, %q)",
147+
i-1, schemaSpans[i-1].Key, schemaSpans[i-1].EndKey,
148+
i, schemaSpans[i].Key, schemaSpans[i].EndKey)
149+
}
150+
if bytes.Compare(schemaSpans[i-1].EndKey, schemaSpans[i].Key) > 0 {
151+
return errors.AssertionFailedf(
152+
"schema spans overlapping: span %d ends at %q but span %d starts at %q",
153+
i-1, schemaSpans[i-1].EndKey, i, schemaSpans[i].Key)
154+
}
155+
}
156+
return nil
157+
}

0 commit comments

Comments
 (0)