Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
ff6b2be
wip
asg0451 Nov 7, 2025
c2ba283
fix up and move stuff around
asg0451 Nov 7, 2025
25c08d7
add oid entry
asg0451 Nov 7, 2025
98dbecf
make qm
asg0451 Nov 7, 2025
b112ae6
working
asg0451 Nov 8, 2025
c4ccf01
rangefeed
asg0451 Nov 8, 2025
9ecedaa
make sure it doesn't panic
aerfrei Nov 10, 2025
03444e0
queuefeed: have crdb_internal.select_from_queue_feed return rows
aerfrei Nov 10, 2025
302e890
Merge pull request #3 from aerfrei/qf-table-based
aerfrei Nov 10, 2025
492de9e
queuefeed: add wrapper for partitions table
jeffswenson Nov 10, 2025
e3703c3
add a table to track existing queues and add test
aerfrei Nov 10, 2025
1bd941f
Merge pull request #5 from aerfrei/qf-feeds-table
aerfrei Nov 10, 2025
45e9f7f
read from rangefeed
aerfrei Nov 10, 2025
bacc013
Merge pull request #6 from asg0451/wip-queuefeed-rangefeed-and-stuff
asg0451 Nov 10, 2025
1cba809
fix build
asg0451 Nov 11, 2025
0824bcb
restore functionality
asg0451 Nov 11, 2025
4cb10de
fix test
asg0451 Nov 11, 2025
cfc38aa
cleanup
asg0451 Nov 11, 2025
d110c5d
wip
asg0451 Nov 11, 2025
2740ef1
use session context
asg0451 Nov 11, 2025
7b63782
fix
asg0451 Nov 11, 2025
4309a88
add test
asg0451 Nov 11, 2025
798fe0a
clean
asg0451 Nov 11, 2025
6e00acd
Merge pull request #7 from asg0451/fix-build
asg0451 Nov 11, 2025
35a189a
cleanup
asg0451 Nov 11, 2025
5226a91
Merge pull request #8 from asg0451/fix-ctx-and-other-stuff
asg0451 Nov 11, 2025
a3870cd
store readers
asg0451 Nov 11, 2025
187991c
queuefeed: implement simple assignment algorithm
jeffswenson Nov 10, 2025
0ab29c6
Merge pull request #9 from asg0451/store-readers
asg0451 Nov 12, 2025
6fd7c97
rollback support
asg0451 Nov 12, 2025
36a7ded
Merge pull request #11 from asg0451/wip-queuefeed-rollback
asg0451 Nov 12, 2025
26c65e7
fix context bug
asg0451 Nov 12, 2025
b91617c
queuefeed: add a smoke test
jeffswenson Nov 12, 2025
82db85f
Merge pull request #13 from asg0451/fix-context-more
asg0451 Nov 12, 2025
a8af856
queuefeed: create partition cache
jeffswenson Nov 12, 2025
17758ef
queuefeed: wire up the assigner
jeffswenson Nov 12, 2025
b5efef8
rudimentary reassignments
asg0451 Nov 13, 2025
e9e34da
Merge pull request #17 from asg0451/refresh-assignment
asg0451 Nov 13, 2025
d4da061
Create a partition per range
asg0451 Nov 13, 2025
9188113
queuefeed: Add checkpointing logic
aerfrei Nov 12, 2025
781a98d
Merge pull request #14 from aerfrei/qf-cursor
aerfrei Nov 13, 2025
d5ab9c3
move reader management to conn executor
asg0451 Nov 13, 2025
229ee22
cleanup
asg0451 Nov 13, 2025
6cab99a
Merge pull request #20 from asg0451/move-connex
asg0451 Nov 13, 2025
e08acd2
fix test
asg0451 Nov 13, 2025
8e41ad4
watch all my partitions
asg0451 Nov 13, 2025
a55880a
Merge pull request #23 from asg0451/watch-all-parts
asg0451 Nov 13, 2025
4f5a5b7
queuefeed: add smoke test with multiple partitions
jeffswenson Nov 13, 2025
623d774
unassign dead sessions
asg0451 Nov 13, 2025
7042e71
Merge pull request #25 from asg0451/reset-unalive-assignments
asg0451 Nov 13, 2025
cd24183
queuefeed: call close on manager in tests
jeffswenson Nov 14, 2025
b36402a
queuefeed: wire up partition cache to the assigner
jeffswenson Nov 13, 2025
3d5df16
queuefeed: support partitions in checkpointing
aerfrei Nov 14, 2025
f84b7f7
queuefeed: use frontier for tracking partition progress
aerfrei Nov 14, 2025
516bd9b
queuefeed: set the cursor to the queue creation time on creation
aerfrei Nov 14, 2025
3092a4a
Merge pull request #24 from aerfrei/qf-partition-checkpointing-support
aerfrei Nov 14, 2025
8642c18
add builtin for starting a changefeed from a cursor
aerfrei Nov 14, 2025
789c7c1
Merge pull request #28 from aerfrei/qf-cursor-builtin
aerfrei Nov 14, 2025
255443b
queuefeed: create a multi-reader smoke test
jeffswenson Nov 13, 2025
9b9c860
queuefeed: create queuefeed from table name
aerfrei Nov 14, 2025
c3c433a
Merge pull request #29 from aerfrei/qf-builtin-to-take-table-name
aerfrei Nov 14, 2025
4288fab
roachtest: add queuefeed roachtest
jeffswenson Nov 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@ ALL_TESTS = [
"//pkg/sql/privilege:privilege_test",
"//pkg/sql/protoreflect:protoreflect_test",
"//pkg/sql/querycache:querycache_test",
"//pkg/sql/queuefeed:queuefeed_test",
"//pkg/sql/randgen:randgen_test",
"//pkg/sql/regions:regions_test",
"//pkg/sql/row:row_disallowed_imports_test",
Expand Down Expand Up @@ -2228,6 +2229,9 @@ GO_TARGETS = [
"//pkg/sql/protoreflect:protoreflect_test",
"//pkg/sql/querycache:querycache",
"//pkg/sql/querycache:querycache_test",
"//pkg/sql/queuefeed/queuebase:queuebase",
"//pkg/sql/queuefeed:queuefeed",
"//pkg/sql/queuefeed:queuefeed_test",
"//pkg/sql/randgen:randgen",
"//pkg/sql/randgen:randgen_test",
"//pkg/sql/rangeprober:range_prober",
Expand Down
2 changes: 2 additions & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ go_library(
"ptp.go",
"query_comparison_util.go",
"queue.go",
"queuefeed.go",
"quit.go",
"rapid_restart.go",
"rebalance_load.go",
Expand Down Expand Up @@ -365,6 +366,7 @@ go_library(
"@org_golang_google_protobuf//proto",
"@org_golang_x_exp//maps",
"@org_golang_x_oauth2//clientcredentials",
"@org_golang_x_sync//errgroup",
"@org_golang_x_text//cases",
"@org_golang_x_text//language",
],
Expand Down
145 changes: 145 additions & 0 deletions pkg/cmd/roachtest/tests/queuefeed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright 2025 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package tests

import (
"context"
"database/sql"
"fmt"
"math/rand"
"strings"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

func registerQueuefeed(r registry.Registry) {
r.Add(registry.TestSpec{
Name: "queuefeed",
Owner: registry.OwnerCDC,
Cluster: r.MakeClusterSpec(4, spec.WorkloadNode()),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runQueuefeed(ctx, t, c)
},
CompatibleClouds: registry.AllClouds,
Suites: registry.Suites(registry.Nightly),
})
}

func runQueuefeed(ctx context.Context, t test.Test, c cluster.Cluster) {
c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(), c.CRDBNodes())

db := c.Conn(ctx, t.L(), 1)
defer db.Close()

_, err := db.ExecContext(ctx, "SET CLUSTER SETTING kv.rangefeed.enabled = true")
require.NoError(t, errors.Wrap(err, "enabling rangefeeds"))

t.Status("initializing kv workload")
c.Run(ctx, option.WithNodes(c.WorkloadNode()),
"./cockroach workload init kv --splits=100 {pgurl:1}")

var tableID int64
err = db.QueryRowContext(ctx, "SELECT id FROM system.namespace WHERE name = 'kv' and \"parentSchemaID\" <> 0;").Scan(&tableID)
require.NoError(t, err)

t.Status("creating kv_queue")
_, err = db.ExecContext(ctx, "SELECT crdb_internal.create_queue_feed('kv_queue', $1)", tableID)
require.NoError(t, err)

t.Status("running queue feed queries")

ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()

g, ctx := errgroup.WithContext(ctx)

const numReaders = 10
counters := make([]*atomic.Int64, numReaders)
for i := range counters {
counters[i] = &atomic.Int64{}
}

g.Go(func() error {
return c.RunE(ctx, option.WithNodes(c.WorkloadNode()),
"./cockroach workload run kv --duration=10m {pgurl:1}")
})

g.Go(func() error {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
lastCounts := make([]int64, numReaders)

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
qps := make([]string, numReaders)
for i := 0; i < numReaders; i++ {
currentCount := counters[i].Load()
ratePerSec := currentCount - lastCounts[i]
qps[i] = fmt.Sprintf("%d", ratePerSec)
lastCounts[i] = currentCount
}
t.L().Printf("qps: %s", strings.Join(qps, ","))
}
}
})

dbNodes := 1 // TODO fix bug that occurs with 3
nodePool := make([]*sql.DB, numReaders)
for i := range dbNodes {
nodePool[i] = c.Conn(ctx, t.L(), i+1)
}
defer func() {
for i := range dbNodes {
_ = nodePool[i].Close()
}
}()

for i := 0; i < numReaders; i++ {
readerIndex := i
g.Go(func() error {
// Stagger the readers a bit. This helps test re-distribution of
// partitions.
// TODO fix bug that occurs with jitter
// time.Sleep(time.Duration(rand.Intn(int(time.Minute))))

// Connect to a random node to simulate a tcp load balancer.
conn, err := nodePool[rand.Intn(dbNodes)].Conn(ctx)
if err != nil {
return errors.Wrap(err, "getting connection for the queuefeed reader")
}
defer func() { _ = conn.Close() }()

for ctx.Err() == nil {
var count int
err := conn.QueryRowContext(ctx,
"SELECT count(*) FROM crdb_internal.select_from_queue_feed('kv_queue', 10000)").Scan(&count)
if err != nil {
return err
}
counters[readerIndex].Add(int64(count))
}
return ctx.Err()
})
}

err = g.Wait()
if err != nil && ctx.Err() == nil {
t.Fatal(err)
}
}
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func RegisterTests(r registry.Registry) {
registerPruneDanglingSnapshotsAndDisks(r)
registerPTP(r)
registerQueue(r)
registerQueuefeed(r)
registerQuitTransfersLeases(r)
registerRebalanceLoad(r)
registerReplicaGC(r)
Expand Down
30 changes: 27 additions & 3 deletions pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ type Txn struct {
// commitTriggers are run upon successful commit.
commitTriggers []func(ctx context.Context)

// rollbackTriggers are run upon rollback/abort.
rollbackTriggers []func(ctx context.Context)

// mu holds fields that need to be synchronized for concurrent request execution.
mu struct {
syncutil.Mutex
Expand Down Expand Up @@ -1093,6 +1096,16 @@ func (txn *Txn) AddCommitTrigger(trigger func(ctx context.Context)) {
txn.commitTriggers = append(txn.commitTriggers, trigger)
}

// AddRollbackTrigger adds a closure to be executed on rollback/abort
// of the transaction.
func (txn *Txn) AddRollbackTrigger(trigger func(ctx context.Context)) {
if txn.typ != RootTxn {
panic(errors.AssertionFailedf("AddRollbackTrigger() called on leaf txn"))
}

txn.rollbackTriggers = append(txn.rollbackTriggers, trigger)
}

// endTxnReqAlloc is used to batch the heap allocations of an EndTxn request.
type endTxnReqAlloc struct {
req kvpb.EndTxnRequest
Expand Down Expand Up @@ -1243,6 +1256,9 @@ func (txn *Txn) PrepareForRetry(ctx context.Context) error {
// Reset commit triggers. These must be reconfigured by the client during the
// next retry.
txn.commitTriggers = nil
// Reset rollback triggers. These must be reconfigured by the client during the
// next retry.
txn.rollbackTriggers = nil

txn.mu.Lock()
defer txn.mu.Unlock()
Expand Down Expand Up @@ -1383,9 +1399,17 @@ func (txn *Txn) Send(
if pErr == nil {
// Invoking the commit triggers here ensures they run even in the case when a
// commit request is issued manually (not via Commit).
if et, ok := ba.GetArg(kvpb.EndTxn); ok && et.(*kvpb.EndTxnRequest).Commit {
for _, t := range txn.commitTriggers {
t(ctx)
if et, ok := ba.GetArg(kvpb.EndTxn); ok {
if et.(*kvpb.EndTxnRequest).Commit {
for _, t := range txn.commitTriggers {
t(ctx)
}
} else {
// Invoking the rollback triggers here ensures they run even in the case when a
// rollback request is issued manually (not via Rollback).
for _, t := range txn.rollbackTriggers {
t(ctx)
}
}
}
return br, nil
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ go_library(
"//pkg/sql/physicalplan",
"//pkg/sql/privilege",
"//pkg/sql/querycache",
"//pkg/sql/queuefeed",
"//pkg/sql/rangeprober",
"//pkg/sql/regions",
"//pkg/sql/rolemembershipcache",
Expand Down
7 changes: 7 additions & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/optionalnodeliveness"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire"
"github.com/cockroachdb/cockroach/pkg/sql/querycache"
"github.com/cockroachdb/cockroach/pkg/sql/queuefeed"
"github.com/cockroachdb/cockroach/pkg/sql/rangeprober"
"github.com/cockroachdb/cockroach/pkg/sql/regions"
"github.com/cockroachdb/cockroach/pkg/sql/rolemembershipcache"
Expand Down Expand Up @@ -1062,6 +1063,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
TenantReadOnly: cfg.SQLConfig.TenantReadOnly,
CidrLookup: cfg.BaseConfig.CidrLookup,
LicenseEnforcer: cfg.SQLConfig.LicenseEnforcer,
QueueManager: queuefeed.NewManager(ctx, cfg.internalDB, cfg.rangeFeedFactory, cfg.rangeDescIteratorFactory, codec, leaseMgr, cfg.sqlLivenessProvider.CachedReader()),
}

if codec.ForSystemTenant() {
Expand Down Expand Up @@ -1791,6 +1793,11 @@ func (s *SQLServer) preStart(

s.startLicenseEnforcer(ctx, knobs)

// Close queue manager when the stopper stops.
stopper.AddCloser(stop.CloserFn(func() {
s.execCfg.QueueManager.Close()
}))

// Report a warning if the server is being shut down via the stopper
// before it was gracefully drained. This warning may be innocuous
// in tests where there is no use of the test server/cluster after
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,8 @@ go_library(
"//pkg/sql/privilege",
"//pkg/sql/protoreflect",
"//pkg/sql/querycache",
"//pkg/sql/queuefeed",
"//pkg/sql/queuefeed/queuebase",
"//pkg/sql/regionliveness",
"//pkg/sql/regions",
"//pkg/sql/rolemembershipcache",
Expand Down
Loading
Loading