diff --git a/pkg/kv/bulk/sst_batcher_test.go b/pkg/kv/bulk/sst_batcher_test.go index 4846c5343a03..b1c15d337870 100644 --- a/pkg/kv/bulk/sst_batcher_test.go +++ b/pkg/kv/bulk/sst_batcher_test.go @@ -790,3 +790,123 @@ func TestSSTBatcherCloseWithoutFlush(t *testing.T) { // There should be no rows written since all the flushes were blocked and cancelled. tdb.CheckQueryResults(t, `SELECT count(*) FROM kv`, [][]string{{"0"}}) } + +func TestSSTBatcherSplitBetweenColumnFamilies(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + s := tc.ApplicationLayer(0) + db := tc.ServerConn(0) + kvDB := s.DB() + + tdb := sqlutils.MakeSQLRunner(db) + + // Create a table with multiple column families and a secondary index. + tdb.Exec(t, ` + CREATE TABLE t ( + pk INT PRIMARY KEY, + a INT, + b STRING NOT NULL, + INDEX idx_a (a), + FAMILY f0 (pk, a), + FAMILY f1 (b) + ) + `) + + const numRows = 10 + + // Insert some rows. + for i := 1; i <= numRows; i++ { + tdb.Exec(t, `INSERT INTO t VALUES ($1, $2, $3)`, i, i*10, fmt.Sprintf("b-%d", i)) + } + + // Get the table descriptor to construct keys. + tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, s.Codec(), "defaultdb", "t") + tableID := tableDesc.GetID() + + // The primary index has ID 1. + indexPrefix := s.Codec().IndexPrefix(uint32(tableID), 1) + + // Create unsafe split keys between family 0 and family 1 for each row. + for pk := 1; pk <= numRows; pk++ { + pkKey := encoding.EncodeVarintAscending(append([]byte{}, indexPrefix...), int64(pk)) + family0Key := keys.MakeFamilyKey(pkKey, 0) + unsafeSplitKey := roachpb.Key(family0Key).Next() + + _, err := keys.EnsureSafeSplitKey(unsafeSplitKey) + require.Error(t, err) + //require.NotEqual(t, unsafeSplitKey, endRowKey) + + splitReq := &kvpb.AdminSplitRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: unsafeSplitKey, + }, + SplitKey: unsafeSplitKey, + } + + _, pErr := kv.SendWrapped(ctx, kvDB.NonTransactionalSender(), splitReq) + require.Nil(t, pErr, "AdminSplit at unsafe key should succeed") + } + + // Use the first unsafe split key for subsequent checks. + //pkKey := encoding.EncodeVarintAscending(append([]byte{}, indexPrefix...), 1) + //family0Key := keys.MakeFamilyKey(pkKey, 0) + //unsafeSplitKey := roachpb.Key(family0Key).Next() + //t.Logf("Family 0 key for pk=1: %s", roachpb.Key(family0Key)) + //t.Logf("Unsafe split key: %s", unsafeSplitKey) + + tdb.Exec(t, `ALTER TABLE t SCATTER`) + + // Clear the range cache to ensure it's updated with all the new splits. + // This ensures any subsequent operations have fresh range descriptor information. + s.DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache().Clear() + + // Verify the split occurred by checking range boundaries. + //ranges := tdb.QueryStr(t, `SELECT start_key, end_key, replicas FROM [SHOW RANGES FROM TABLE t]`) + //t.Logf("Ranges after split:") + //for i, r := range ranges { + // t.Logf(" Range %d: [%s, %s), replicas %s", i, r[0], r[1], r[2]) + //} + + //tdb.Exec(t, `SET streamer_enabled = true`) + + // Query each value of 'a' individually in a loop + for pk := 1; pk <= numRows; pk++ { + // Scan all rows on the secondary index, and then do the index join into + // the primary - the latter part should hit the assertion + // (non-deterministically). + tdb.QueryStr(t, `SELECT * FROM t@idx_a WHERE a > 0 AND a < 200`) + //expectedRow := [][]string{{fmt.Sprintf("%d", pk), fmt.Sprintf("%d", aValue), fmt.Sprintf("b-%d", pk)}} + //require.Equal(t, expectedRow, row, "row with pk=%d should have correct values", pk) + } + + // Run EXPLAIN ANALYZE for the query with a = 10 + //explainResults := tdb.QueryStr(t, `EXPLAIN ANALYZE SELECT * FROM t WHERE a = 10`) + //t.Logf("EXPLAIN ANALYZE for SELECT * FROM t WHERE a = 10:") + //for _, line := range explainResults { + // t.Logf(" %v", line) + //} + + // Scan over the table key space and verify the row is split. + //tableSpan := tableDesc.TableSpan(s.Codec()) + + //kvs, err := kvDB.Scan(ctx, tableSpan.Key, unsafeSplitKey, 0 /* maxRows */) + //require.NoError(t, err) + + //for i, kv := range kvs { + // t.Logf("(Before Split Key) Key %d: %s (len=%d, value len=%d)", i, kv.Key, len(kv.Key), len(kv.Value.RawBytes)) + //} + + //afterKvs, err := kvDB.Scan(ctx, unsafeSplitKey, tableSpan.EndKey, 0 /* maxRows */) + //require.NoError(t, err) + + // for i, kv := range afterKvs { + // t.Logf("(After Split Key) Key %d: %s (len=%d, value len=%d)", i, kv.Key, len(kv.Key), len(kv.Value.RawBytes)) + //} + + //err := sqlutils.RunInspect(db, "defaultdb", "t") + //require.NoError(t, err) +}