Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions pkg/kv/bulk/sst_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,3 +790,123 @@ func TestSSTBatcherCloseWithoutFlush(t *testing.T) {
// There should be no rows written since all the flushes were blocked and cancelled.
tdb.CheckQueryResults(t, `SELECT count(*) FROM kv`, [][]string{{"0"}})
}

func TestSSTBatcherSplitBetweenColumnFamilies(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)
s := tc.ApplicationLayer(0)
db := tc.ServerConn(0)
kvDB := s.DB()

tdb := sqlutils.MakeSQLRunner(db)

// Create a table with multiple column families and a secondary index.
tdb.Exec(t, `
CREATE TABLE t (
pk INT PRIMARY KEY,
a INT,
b STRING NOT NULL,
INDEX idx_a (a),
FAMILY f0 (pk, a),
FAMILY f1 (b)
)
`)

const numRows = 10

// Insert some rows.
for i := 1; i <= numRows; i++ {
tdb.Exec(t, `INSERT INTO t VALUES ($1, $2, $3)`, i, i*10, fmt.Sprintf("b-%d", i))
}

// Get the table descriptor to construct keys.
tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, s.Codec(), "defaultdb", "t")
tableID := tableDesc.GetID()

// The primary index has ID 1.
indexPrefix := s.Codec().IndexPrefix(uint32(tableID), 1)

// Create unsafe split keys between family 0 and family 1 for each row.
for pk := 1; pk <= numRows; pk++ {
pkKey := encoding.EncodeVarintAscending(append([]byte{}, indexPrefix...), int64(pk))
family0Key := keys.MakeFamilyKey(pkKey, 0)
unsafeSplitKey := roachpb.Key(family0Key).Next()

_, err := keys.EnsureSafeSplitKey(unsafeSplitKey)
require.Error(t, err)
//require.NotEqual(t, unsafeSplitKey, endRowKey)

splitReq := &kvpb.AdminSplitRequest{
RequestHeader: kvpb.RequestHeader{
Key: unsafeSplitKey,
},
SplitKey: unsafeSplitKey,
}

_, pErr := kv.SendWrapped(ctx, kvDB.NonTransactionalSender(), splitReq)
require.Nil(t, pErr, "AdminSplit at unsafe key should succeed")
}

// Use the first unsafe split key for subsequent checks.
//pkKey := encoding.EncodeVarintAscending(append([]byte{}, indexPrefix...), 1)
//family0Key := keys.MakeFamilyKey(pkKey, 0)
//unsafeSplitKey := roachpb.Key(family0Key).Next()
//t.Logf("Family 0 key for pk=1: %s", roachpb.Key(family0Key))
//t.Logf("Unsafe split key: %s", unsafeSplitKey)

tdb.Exec(t, `ALTER TABLE t SCATTER`)

// Clear the range cache to ensure it's updated with all the new splits.
// This ensures any subsequent operations have fresh range descriptor information.
s.DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache().Clear()

// Verify the split occurred by checking range boundaries.
//ranges := tdb.QueryStr(t, `SELECT start_key, end_key, replicas FROM [SHOW RANGES FROM TABLE t]`)
//t.Logf("Ranges after split:")
//for i, r := range ranges {
// t.Logf(" Range %d: [%s, %s), replicas %s", i, r[0], r[1], r[2])
//}

//tdb.Exec(t, `SET streamer_enabled = true`)

// Query each value of 'a' individually in a loop
for pk := 1; pk <= numRows; pk++ {
// Scan all rows on the secondary index, and then do the index join into
// the primary - the latter part should hit the assertion
// (non-deterministically).
tdb.QueryStr(t, `SELECT * FROM t@idx_a WHERE a > 0 AND a < 200`)
//expectedRow := [][]string{{fmt.Sprintf("%d", pk), fmt.Sprintf("%d", aValue), fmt.Sprintf("b-%d", pk)}}
//require.Equal(t, expectedRow, row, "row with pk=%d should have correct values", pk)
}

// Run EXPLAIN ANALYZE for the query with a = 10
//explainResults := tdb.QueryStr(t, `EXPLAIN ANALYZE SELECT * FROM t WHERE a = 10`)
//t.Logf("EXPLAIN ANALYZE for SELECT * FROM t WHERE a = 10:")
//for _, line := range explainResults {
// t.Logf(" %v", line)
//}

// Scan over the table key space and verify the row is split.
//tableSpan := tableDesc.TableSpan(s.Codec())

//kvs, err := kvDB.Scan(ctx, tableSpan.Key, unsafeSplitKey, 0 /* maxRows */)
//require.NoError(t, err)

//for i, kv := range kvs {
// t.Logf("(Before Split Key) Key %d: %s (len=%d, value len=%d)", i, kv.Key, len(kv.Key), len(kv.Value.RawBytes))
//}

//afterKvs, err := kvDB.Scan(ctx, unsafeSplitKey, tableSpan.EndKey, 0 /* maxRows */)
//require.NoError(t, err)

// for i, kv := range afterKvs {
// t.Logf("(After Split Key) Key %d: %s (len=%d, value len=%d)", i, kv.Key, len(kv.Key), len(kv.Value.RawBytes))
//}

//err := sqlutils.RunInspect(db, "defaultdb", "t")
//require.NoError(t, err)
}
Loading